PART-6 – Logstash
Logstash is nice for analyzing the events.
Prepare Logstash
We will create two different Logstash deployments, so we can scale them up or down individually. One deployment will be for the incoming events, which will simple be forwarded into the RabbitMQ without much groking or so. The second deployment will be mainly for filtering and pattern-analyzing of the events, it will take the events from the RabbitMQ out and put it into the Elasticsearch.
You have to add an extra logstash-user as described here (if you want) https://www.elastic.co/guide/en/logstash/current/ls-security.html – else you also can use the "elastic" user (who has all permissions). I created via Kibana a role "logstash_writer_user" and a user "logstash_writer". I saved the username and password in a secret.
# the key is base64 encoded
# echo -n 'HnWzJhQFxNuoUMPj8ufFxJPBcXFOh46m' | base64 -w 0
# this will output: SG5XekpoUUZ4TnVvVU1Qajh1ZkZ4SlBCY1hGT2g0Nm0K
apiVersion: v1
kind: Secret
metadata:
name: logstash-writer-user
namespace: elk
type: Opaque
data:
username: bG9nc3Rhc2hfd3JpdGVy
password: V005NFRwS1VtV2NRWTRx
"Filter"-Logstash
We will first deploy the "Filter"-Logstash pods as they will declare the exchange and the queues on RabbitMQ. As you will see, this yaml is quite long :o) – for the moment the "beat"-pipeline is compeltely disabled. Also the syslog-pipeline definitely could use some improvments.
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash-filter
# change if needed
namespace: elk
labels:
app: logstash-filter
spec:
replicas: 3
selector:
matchLabels:
app: logstash-filter
template:
metadata:
annotations:
co.elastic.logs/module: logstash
labels:
app: logstash-filter
stackmonitoring: logstash
spec:
# this makes sure that at least on every node a pod is running
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- logstash-buffer
topologyKey: "kubernetes.io/hostname"
containers:
- image: docker.elastic.co/logstash/logstash:7.10.0
name: logstash-filter
ports:
# the port where metricbeat will scrape from
- containerPort: 9600
name: http
protocol: TCP
# at the same time we use it for the healthchecks
livenessProbe:
httpGet:
path: /
port: 9600
initialDelaySeconds: 30
periodSeconds: 5
readinessProbe:
httpGet:
path: /
port: 9600
initialDelaySeconds: 30
periodSeconds: 5
env:
- name: ELASTICSEARCH_SERVICE
# change if needed
value: "https://elk-es-http:9200"
# Those credentials are used ONLY for connecting to Elastichsearch and checking for changes
# in pipelines managed via Kibana - the user has to be created e.g. via Kibana and the
# credentials have to be saved as a secret. Centralized pipeline management works with trial or enterprise license
# more info at https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#- name: LOGSTASH_ADMIN_USER
# value: "logstash_admin_user"
#- name: LOGSTASH_ADMIN_USER_PASS
# valueFrom:
# secretKeyRef:
# name: logstash-admin-user
# key: logstash_admin_user
# we can use those variables in centralized mangaged pipelines too
- name: LOGSTASH_WRITER_USER
valueFrom:
secretKeyRef:
name: logstash-writer-user
key: username
- name: LOGSTASH_WRITER_USER_PASS
valueFrom:
secretKeyRef:
name: logstash-writer-user
key: password
- name: LOGSTASH_RMQ_USER
valueFrom:
secretKeyRef:
name: rmq-cluster-logstash-user
key: username
- name: LOGSTASH_RMQ_USER_PASS
valueFrom:
secretKeyRef:
name: rmq-cluster-logstash-user
key: password
# we will point those to the rabbitmq-pods, we will create a service for each pod
# this will make it possible to bind queues directly to the rmq-nodes which makes event
# distribution a little bit better.
# the following variables are not needed for the logstash rabbitmq output plugin
# for sending the events to the exchange we will us the normal rmq-cluster service
- name: RMQ_NODE_1
value: "rmq-cluster-server-0"
- name: RMQ_NODE_2
value: "rmq-cluster-server-1"
- name: RMQ_NODE_3
value: "rmq-cluster-server-2"
# not needed for buffer
- name: LOGSTASH_PATTERNS_DIR
value: "/etc/logstash/patterns/"
- name: ELASTICSEARCH_HTTPS_CA_PATH
value: "/etc/logstash/certificates/elk-es-http-certs-public.crt"
# we have to check what are good resource levels
resources: {}
volumeMounts:
# the main config file for logstash
- name: config-volume
mountPath: /usr/share/logstash/config/logstash.yml
subPath: logstash.yml
readOnly: true
# we use multiple pipelines
- name: config-volume
mountPath: /usr/share/logstash/config/pipelines.yml
subPath: pipelines.yml
readOnly: true
# the syslog-pipeline config
- name: config-volume
mountPath: /usr/share/logstash/pipeline/syslog.conf
subPath: syslog.conf
readOnly: true
# the beat-pipeline config
- name: config-volume
mountPath: /usr/share/logstash/pipeline/beat.conf
subPath: beat.conf
readOnly: true
# we do not need the patterns in the logstash buffer
- name: config-volume
mountPath: /etc/logstash/patterns/containerlog_patterns
subPath: containerlog_patterns
readOnly: true
- name: config-volume
mountPath: /etc/logstash/patterns/syslog_patterns
subPath: syslog_patterns
readOnly: true
- name: cert-ca
mountPath: /etc/logstash/certificates/elk-es-http-certs-public.crt
subPath: tls.crt
readOnly: true
volumes:
- name: config-volume
configMap:
name: logstash-filter-configmap
- name: cert-ca
secret:
secretName: elk-es-http-certs-public
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-filter-configmap
namespace: elk
data:
logstash.yml: |
# this enables logstash's http endpoint - where metricbeat will scrape from
http.host: "0.0.0.0"
http.port: 9600
# we disable the internal monitoring - logstash will be scraped by metricbeat
monitoring.enabled: false
# you must add this parameter anyway so in stack monitoring it can be correctly assigned to the cluster :|
# GET / - it will show the UUID
# if not set a "Standalone Cluster" will pop up in "Stack Monitoring"
monitoring.cluster_uuid: n2KDDWUMS2q4h8P5F3_Z8Q
# if you want to enable centralized pipeline management (need trial or enterprise license)
#xpack.management.enabled: true
#xpack.management.elasticsearch.hosts: "${ELASTICSEARCH_SERVICE}"
#xpack.management.elasticsearch.username: ${LOGSTASH_ADMIN_USER}
#xpack.management.elasticsearch.password: ${LOGSTASH_ADMIN_USER_PASS}
#xpack.management.elasticsearch.ssl.certificate_authority: /etc/logstash/certificates/elk-es-http-certs-public.crt
#xpack.management.logstash.poll_interval: 5s
# you have to define all possible pipelines you want to manage centrally
# or create a pipeline first in Kibana and then here - high chance that the pipeline won't wrok as expected ;)
#xpack.management.pipeline.id: [ "dummy-pipeline", "test-pipeline" ]
# if you enable centralized pipeline management - pipeline configs via files are ignored!
pipelines.yml: |
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
# not needed - we disable that
#- pipeline.id: main
# path.config: "/etc/logstash/conf.d/*.conf"
# https://www.elastic.co/guide/en/logstash/current/tuning-logstash.html
# The pipeline.workers setting determines how many threads to run for filter and output processing.
# The pipeline.batch.size setting defines the maximum number of events an individual worker thread collects before attempting to execute filters and outputs. Larger batch sizes are generally more efficient, but increase memory overhead.
# The pipeline.batch.delay setting rarely needs to be tuned. This setting adjusts the latency of the Logstash pipeline.
- pipeline.id: syslog-filter
path.config: "/usr/share/logstash/pipeline/syslog.conf"
pipeline.workers: 5
pipeline.batch.size: 250
pipeline.batch.delay: 5
# for the meantime just disabled
#- pipeline.id: beat-filter
# path.config: "/usr/share/logstash/pipeline/beat.conf"
# pipeline.workers: 5
# pipeline.batch.size: 250
# pipeline.batch.delay: 5
syslog.conf: |
# the syslog-pipeline file
input {
# The queues are created as durable and are bound to a certain "node"
# this means if the "node" goes down for whatever reason, the queue is
# in the meantime not available. but messages are not lost. as soon as
# the node gets back online, logstash should reconnect.
# Why do I want to bind the queue to one node?
# As they are durable the queue should not be recreated on another "node"
# - this would delete the saved messages on the node as soon as it is
# coming back online.
rabbitmq {
id => "rmq-syslog-q1"
# set the HOST/IP accordingly!
host => [ "${RMQ_NODE_1}" ]
user => "${LOGSTASH_RMQ_USER}"
# set the password accordingly!
password => "${LOGSTASH_RMQ_USER_PASS}"
# set an exchange - it should theoretically be created
exchange_type => "x-modulus-hash"
exchange => "syslog"
# the queues should be created by logstash if not existing
key => "1"
# woah, woah - watch out! - if a queue exist with the same name and key,
# it WILL use it - and seeminlgy bind to the exchange configured above too!
queue => "syslog_q1"
tags => "syslog_q1"
prefetch_count => 2500
threads => 2
# yes, the queue should not disappear if no consumer is connected
durable => true
}
rabbitmq {
id => "rmq-syslog-q2"
# set the HOST/IP accordingly!
host => [ "${RMQ_NODE_2}" ]
user => "${LOGSTASH_RMQ_USER}"
# set the password accordingly!
password => "${LOGSTASH_RMQ_USER_PASS}"
# set an exchange - it should theoretically be created
exchange_type => "x-modulus-hash"
exchange => "syslog"
# the queues should be created by logstash if not existing
key => "2"
# woah, woah - watch out! - if a queue exist with the same name and key,
# it WILL use it - and seeminlgy bind to the exchange configured above too!
queue => "syslog_q2"
tags => "syslog_q2"
prefetch_count => 2500
threads => 2
# yes, the queue should not disappear if no consumer is connected
durable => true
}
rabbitmq {
id => "rmq-syslog-q3"
# set the HOST/IP accordingly!
host => [ "${RMQ_NODE_3}" ]
user => "${LOGSTASH_RMQ_USER}"
# set the password accordingly!
password => "${LOGSTASH_RMQ_USER_PASS}"
# set an exchange - it should theoretically be created
exchange_type => "x-modulus-hash"
exchange => "syslog"
# the queues should be created by logstash if not existing
key => "3"
# woah, woah - watch out! - if a queue exist with the same name and key,
# it WILL use it - and seeminlgy bind to the exchange configured above too!
queue => "syslog_q3"
tags => "syslog_q3"
prefetch_count => 2500
threads => 2
# yes, the queue should not disappear if no consumer is connected
durable => true
}
}
filter {
if "syslog" in [tags] {
# we save the message in a none indexed field and the original incoming date and time
# and we add a field which shows us where it was filtered
mutate {
id => "mutate-01"
# not sure if we should or need to save the raw message in both "original" fields
add_field => [ "[log][original]", "%{message}" ]
add_field => [ "received_at", "%{@timestamp}" ]
# this adds the pods name into a field, so we know which pod did the filtering
add_field => [ "[filter][node]", "${HOSTNAME}" ]
}
# Manually parse the log, as we want to support both RCF3164 and RFC5424
# first we try to check for RFC5424 events
grok {
id => "grok-01"
# load slightly changed default syslog patterns
patterns_dir => ["${LOGSTASH_PATTERNS_DIR}"]
match => { "message" => "%{SYSLOG5424LINE}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
overwrite => [ "message" ]
}
# if it RFC5424 we have to do a lot more to get a nice filtered event in kibana
if [syslog5424_ts] {
# Handle RFC5424 formatted Syslog messages
mutate {
id => "mutate-02"
remove_field => [ "message", "host" ]
add_tag => [ "syslog5424" ]
}
mutate {
id => "mutate-03"
# Use a friendlier naming scheme
rename => {
"syslog5424_app" => "syslogprog"
"syslog5424_msg" => "message"
"syslog5424_host" => "syslog_host"
}
remove_field => [ "syslog5424_ver", "syslog5424_proc" ]
}
if [syslog5424_sd] {
# All structured data needs to be in format [key=value,key=value,...]
mutate {
id => "mutate-04"
# split at "][" brackets
split => { "syslog5424_sd" => "][" }
}
mutate {
id => "mutate-05"
# Remove any brackets in this array-field
gsub => [ "syslog5424_sd", "[\[\]]", "" ]
}
mutate {
id => "mutate-06"
rename => { "syslog5424_sd" => "[log][syslog][syslog5424][sd_data]" }
}
# not working as expected for the moment
#kv {
# # Convert the structured data into Logstash fields
# source => "syslog5424_sd"
# field_split => ","
# value_split => "="
# remove_char_key => "\[\]"
# transform_key => "lowercase"
# target => "[syslog][sd_id]"
# remove_field => [ "syslog5424_sd" ]
#}
}
# saves the log's date in @timestamp
date {
id => "date-01"
match => [ "syslog5424_ts", "ISO8601" ]
remove_field => [ "syslog5424_ts", "timestamp" ]
}
} else {
# Handle RFC3164 formatted Syslog messages
grok {
id => "grok-02"
# load slightly changed default syslog patterns
patterns_dir => ["${LOGSTASH_PATTERNS_DIR}"]
match => { "message" => "%{SYSLOGLINE}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
add_tag => [ "syslog3164" ]
overwrite => [ "message" ]
}
}
# grok attempt for syslog messages with epoch-timestamps - none RFC conform but also seen
if "_grokparsesuccess" not in [tags] {
# Manually parse the log
grok {
id => "grok-03"
# load slightly changed default syslog patterns
patterns_dir => ["${LOGSTASH_PATTERNS_DIR}"]
match => { "message" => "%{SYSLOGLINEUNIX}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
add_tag => [ "syslogunixepoch" ]
overwrite => [ "message" ]
}
}
# last grok attempt for syslog messages in most simple format - e.g. sent via old version of logger command
# <5>Jul 14 15:20:25 root: some test message
if "_grokparsesuccess" not in [tags] {
# Manually parse the log
grok {
id => "grok-04"
match => { "message" => "\<%{NONNEGINT:priority}\>(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGPROG}: %{GREEDYDATA:message}" }
add_field => [ "received_from", "%{host}" ]
add_tag => [ "_grokparsesuccess" ]
add_tag => [ "simple_syslog" ]
overwrite => [ "message" ]
}
}
# this will replace the @timestamp with the timestamp from the event if in correct format
# check if log_timestamp is in correct format
date {
id => "date-02"
match => [ "log_timestamp", "MMM dd yyyy HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
if "_dateparsesuccess" not in [tags] {
date {
id => "date-03"
match => [ "log_timestamp", "MMM d yyyy HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
}
if "_dateparsesuccess" not in [tags] {
date {
id => "date-04"
match => [ "log_timestamp", "ISO8601" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
}
if "_dateparsesuccess" not in [tags] {
date {
id => "date-05"
match => [ "log_timestamp", "MMM dd HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
}
if "_dateparsesuccess" not in [tags] {
date {
id => "date-06"
match => [ "log_timestamp", "MMM d HH:mm:ss" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
}
if "_dateparsesuccess" not in [tags] {
date {
id => "date-07"
match => [ "log_timestamp", "UNIX" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
}
if "_dateparsesuccess" not in [tags] {
date {
id => "date-08"
match => [ "log_timestamp", "UNIX_MS" ]
add_tag => [ "_dateparsesuccess" ]
remove_field => [ "log_timestamp" ]
}
}
# because all grok-filters are taken into account there would be also a '_grokparsefailure'-tag,
# we don't need it if it was at least correctly filtered by one of the groks
if ("_grokparsesuccess" in [tags]) {
# syslog_pri extracts facility and loglevel from the "syslog_pri"ority-field
syslog_pri {
id => "syslog-pri-01"
syslog_pri_field_name => "priority"
}
mutate {
id => "mutate-07"
remove_tag => [ "_grokparsefailure" ]
}
}
###########################################################
###### Final preps for ECS
# remove the _dateparsefailure if we find a success-tag
if ("_dateparsesuccess" in [tags]) {
mutate {
id => "mutate-08"
remove_tag => [ "_dateparsefailure" ]
}
}
# we have to remove the host field first so we can reuse the field name for ECS style later
# it seems that we cannot do it in the same mutate-statement!
if "_grokparsesuccess" in [tags] {
# because we put the "host" already into the "received_from" field in the groks
mutate {
id => "mutate-09"
remove_field => [ "host" ]
}
# in simple_syslog messages there is no explicit syslog_host :|
if "simple_syslog" not in [tags] {
mutate {
id => "mutate-10"
# we add stuff to arrays
add_field => { "[host][ip]" => "%{received_from}" }
add_field => { "[host][name]" => "%{syslog_host}" }
}
} else {
mutate {
id => "mutate-11"
# we add stuff to arrays
add_field => { "[host][ip]" => "%{received_from}" }
add_field => { "[host][name]" => "%{received_from}" }
}
}
} else {
# if no grok matched, we have to get the stuff from the "host"-field and temporarily save
mutate {
id => "mutate-12"
add_field => { "received_from" => "%{host}" }
}
# we have to remove the initial host-field
mutate {
id => "mutate-13"
remove_field => [ "host" ]
}
# we have to add this to the host.ip array
mutate {
id => "mutate-14"
add_field => { "[host][ip]" => "%{received_from}" }
# as we do not have a hostname because of bad parsing we have to leave the host.name field empty
}
}
# for keeping to ECS
if [syslog_severity] {
mutate {
id => "mutate-15"
add_field => { "[log][level]" => "%{syslog_severity}" }
}
}
# finally we rename and remove fields
mutate {
id => "mutate-16"
# we can rename our simple string/text/number-fields ### better - change later https://www.elastic.co/guide/en/ecs/current/ecs-log.html
rename => {
"pid" => "[process][pid]"
"program" => "[process][name]"
"tid" => "[process][thread][id]"
"syslogprog" => "[log][logger]"
"priority" => "[log][syslog][priority]"
"syslog_facility" => "[log][syslog][facility][name]"
"syslog_facility_code" => "[log][syslog][facility][code]"
"syslog_severity" => "[log][syslog][severity][name]"
"syslog_severity_code" => "[log][syslog][severity][code]"
"syslog_ver" => "[log][syslog][version]"
"received_at" => "[event][created]"
"loglevel" => "[log][level]"
}
# removed from list above
#"syslog_timestamp" => "[log][syslog][timestamp]"
# we remove unneeded fields with info we already have somewhere else in
# it's in host.name and host.ip if applicable
remove_field => [ "syslog_host" ]
# is event.created
remove_field => [ "received_at" ]
# is in host.ip if applicable
remove_field => [ "received_from" ]
# we add event.dataset so SIEM part in Kibana looks nice
add_field => { "[event][dataset]" => "syslog" }
add_field => { "[event][type]" => "syslog" }
# and finally we remove the type (if there was one) as this is duplicated info in event.dataset
remove_field => [ "type" ]
# some other ECS best practices
add_field => { "[ecs][version]" => "1.7.0" }
}
# geoip
#if [source][ip] {
# geoip {
# id => "geoip-01"
# source => "[source][ip]"
# target => "[geo]"
# }
#}
}
}
output {
if [event][dataset] == "syslog" {
elasticsearch {
id => "elasticsearch-logstash-syslog-ilm"
# we do not want logstash to overwrite or manage templates
manage_template => false
# just to be on the save side
codec => json_lines
# as we use https we have to configure the path to the CA-cert
cacert => "${ELASTICSEARCH_HTTPS_CA_PATH}"
user => "${LOGSTASH_WRITER_USER}"
password => "${LOGSTASH_WRITER_USER_PASS}"
ssl => true
# we want to use ILM
ilm_enabled => "true"
ilm_rollover_alias => "logstash_syslog_ilm"
ilm_pattern => "{now/d}-000001"
ilm_policy => "logstash-syslog"
# kubernetes should theoretically do the balancing itself
hosts => [ "${ELASTICSEARCH_SERVICE}" ]
}
}
}
beat.conf: |
# the beat-pipeline file
#input {
# beat {
# id => 'beat-input'
# port => 5044
# tags => "beat"
# }
#}
#
#filter {
# if "beat" in [tags] {
# ruby {
# id => "ruby-random-number"
# code => "event.set('random_number', rand(1..100).to_s())"
# }
# }
#}
#
#output {
# if "beat" in [tags] {
# rabbitmq {
# id => "beat-rabbitmq-output"
# exchange => "beat"
# exchange_type => "x-modulus-hash"
# key => "%{random_number}"
# user => "${LOGSTASH_RMQ_USER}"
# password => "${LOGSTASH_RMQ_USER_PASSWORD}"
# host => [ "${RMQ_CLUSTER}" ]
# durable => true
# }
# }
#}
input {}
filter {}
output {}
containerlog_patterns: |
CONTAINERLOGTIMESTAMP (%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}|\[%{MONTHDAY}-%{MONTH}-%{YEAR} %{TIME}\])
SUPERVISORDLOGLEVEL (CRIT|ERRO|WARN|INFO|DEBG|TRAC|BLAT)
syslog_patterns: |
# orientation: match => { "message" => "\<%{NONNEGINT:priority}\>(%{NONNEGINT} | )(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGHOST:syslog_host} %{SYSLOGPROG:syslogprog}(: | )%{GREEDYDATA:message}" }
SYSLOG5424PRINTASCII [!-~]+
SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:syslog_pam_module}\(%{DATA:syslog_pam_caller}\): session %{WORD:syslog_pam_session_state} for user %{USERNAME:syslog_username}(?: by %{GREEDYDATA:syslog_pam_by})?
CRON_ACTION [A-Z ]+
CRONLOG %{SYSLOGBASE} \(%{USER:syslog_user}\) %{CRON_ACTION:syslog_action} \(%{DATA:message}\)
SYSLOGPRI <%{NONNEGINT:priority}>
# IETF 5424 syslog(8) format (see http://www.rfc-editor.org/info/rfc5424)
SYSLOG5424SD \[%{DATA}\]+
SYSLOG5424BASE %{SYSLOGPRI}%{NONNEGINT:syslog_ver} +(?:%{TIMESTAMP_ISO8601:syslog5424_ts}|-) +(?:%{IPORHOST:syslog_host}|-) +(-|%{SYSLOG5424PRINTASCII:syslog5424_app}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_proc}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_msgid}) +(?:%{SYSLOG5424SD:syslog5424_sd}|-|)
SYSLOG5424LINE %{SYSLOG5424BASE} +%{GREEDYDATA:syslog5424_msg}
# IETF 3164 syslog format
SYSLOGLINE (?:%{SYSLOGPRI})%{SYSLOGBASE2} %{GREEDYDATA:message}
# unix epoch time
UNIXEPOCH (\d){10}
UNIXEPOCHMS1 ((\d){10}\.(\d){3})
UNIXEPOCHMS2 (\d){13}
SYSLOGBASEUNIX (?:%{UNIXEPOCH:log_timestamp}|%{UNIXEPOCHMS1:log_timestamp}|%{UNIXEPOCHMS2:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
SYSLOGLINEUNIX (?:%{SYSLOGPRI})%{SYSLOGBASEUNIX} %{GREEDYDATA:message}
# Error logs
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:apache_error_timestamp}\] \[%{LOGLEVEL:loglevel}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:message}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:apache_error_timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel}\] \[pid %{POSINT:pid}(:tid %{NUMBER:tid})?\]( \(%{POSINT:proxy_errorcode}\)%{DATA:proxy_message}:)?( \[client %{IPORHOST:clientip}:%{POSINT:clientport}\])?( %{DATA:errorcode}:)? %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
"Buffer"-Logstash
will be configured with NodePort
listening for example on a "Syslog"-Port and a "Beat"-Port. The set podAntiAffinity
checks that at least on each k8s-node one pod is running. Again (as mentioned in the comments) this is important if running the service with externalTrafficPolicy: Local
. Else we would not be able to preserve the client’s IP from where the logs are sent. You can read more about it here: https://kubernetes.io/docs/tutorials/services/source-ip/
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash-buffer
# change if needed
namespace: elk
labels:
app: logstash-buffer
spec:
replicas: 3
selector:
matchLabels:
app: logstash-buffer
template:
metadata:
annotations:
co.elastic.logs/module: logstash
labels:
app: logstash-buffer
stackmonitoring: logstash
spec:
# this makes sure that at least on every node a pod is running
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: app
operator: In
values:
- logstash-buffer
topologyKey: "kubernetes.io/hostname"
containers:
- image: docker.elastic.co/logstash/logstash:7.10.0
name: logstash-buffer
ports:
# the port where metricbeat will scrape from
- containerPort: 9600
name: http
protocol: TCP
livenessProbe:
httpGet:
path: /
port: 9600
initialDelaySeconds: 60
periodSeconds: 5
readinessProbe:
httpGet:
path: /
port: 9600
initialDelaySeconds: 60
periodSeconds: 5
env:
- name: ELASTICSEARCH_SERVICE
# change if needed
value: "https://elk-es-http:9200"
# Those credentials are used ONLY for connecting to Elastichsearch and checking for changes
# in pipelines managed via Kibana - the user has to be created e.g. via Kibana and the
# credentials have to be saved as a secret. Centralized pipeline management works with trial or enterprise license
# more info at https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#- name: LOGSTASH_ADMIN_USER
# value: "logstash_admin_user"
#- name: LOGSTASH_ADMIN_USER_PASS
# valueFrom:
# secretKeyRef:
# name: logstash-admin-user
# key: logstash_admin_user
# we won't need the writer credentials as the logstash-buffer just forwards the events to rabbitmq
# we can use those variables in centralized mangaged pipelines too
#- name: LOGSTASH_WRITER_USER
# # we have to create those user first in elasticsearch
# value: "logstash_writer_user"
#- name: LOGSTASH_WRITER_USER_PASS
# valueFrom:
# secretKeyRef:
# name: logstash-writer-user
# key: logstash_writer_user
- name: LOGSTASH_RMQ_USER
valueFrom:
secretKeyRef:
name: rmq-cluster-logstash-user
key: username
- name: LOGSTASH_RMQ_USER_PASS
valueFrom:
secretKeyRef:
name: rmq-cluster-logstash-user
key: password
# for sending the events to the exchange we will us the normal rmq-cluster service
- name: RMQ_CLUSTER
value: "rmq-cluster"
resources: {}
volumeMounts:
# the main config file for logstash
- name: config-volume
mountPath: /usr/share/logstash/config/logstash.yml
subPath: logstash.yml
readOnly: true
# we use multiple pipelines
- name: config-volume
mountPath: /usr/share/logstash/config/pipelines.yml
subPath: pipelines.yml
readOnly: true
# the syslog-pipeline config
- name: config-volume
mountPath: /usr/share/logstash/pipeline/syslog.conf
subPath: syslog.conf
readOnly: true
# the beat-pipeline config
- name: config-volume
mountPath: /usr/share/logstash/pipeline/beat.conf
subPath: beat.conf
readOnly: true
volumes:
- name: config-volume
configMap:
name: logstash-buffer-configmap
- name: cert-ca
secret:
secretName: elk-es-http-certs-public
---
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-buffer-configmap
namespace: elk
data:
logstash.yml: |
# this enables logstash's http endpoint - where metricbeat will scrape from
http.host: "0.0.0.0"
http.port: 9600
# we disable the internal monitoring - logstash will be scraped by metricbeat
monitoring.enabled: false
# you must add this parameter anyway so in stack monitoring it can be correctly assigned to the cluster :|
# GET / - it will show the UUID
# if not set a "Standalone Cluster" will pop up in "Stack Monitoring"
monitoring.cluster_uuid: n2KDDWUMS2q4h8P5F3_Z8Q
# if you want to enable centralized pipeline management (need trial or enterprise license)
#xpack.management.enabled: true
#xpack.management.elasticsearch.hosts: "${ELASTICSEARCH_SERVICE}"
#xpack.management.elasticsearch.username: ${LOGSTASH_ADMIN_USER}
#xpack.management.elasticsearch.password: ${LOGSTASH_ADMIN_USER_PASS}
#xpack.management.elasticsearch.ssl.certificate_authority: /etc/logstash/certificates/elk-es-http-certs-public.crt
#xpack.management.logstash.poll_interval: 5s
# you have to define all possible pipelines you want to manage centrally
# or create a pipeline first in Kibana and then here - high chance that the pipeline won't wrok as expected ;)
#xpack.management.pipeline.id: [ "dummy-pipeline", "test-pipeline" ]
# if you enable centralized pipeline management - pipeline configs via files are ignored!
pipelines.yml: |
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
# not needed - we disable that
#- pipeline.id: main
# path.config: "/etc/logstash/conf.d/*.conf"
# https://www.elastic.co/guide/en/logstash/current/tuning-logstash.html
# The pipeline.workers setting determines how many threads to run for filter and output processing.
# The pipeline.batch.size setting defines the maximum number of events an individual worker thread collects before attempting to execute filters and outputs. Larger batch sizes are generally more efficient, but increase memory overhead.
# The pipeline.batch.delay setting rarely needs to be tuned. This setting adjusts the latency of the Logstash pipeline.
- pipeline.id: syslog-buffer
path.config: "/usr/share/logstash/pipeline/syslog.conf"
pipeline.workers: 5
pipeline.batch.size: 250
pipeline.batch.delay: 5
- pipeline.id: beat-buffer
path.config: "/usr/share/logstash/pipeline/beat.conf"
pipeline.workers: 5
pipeline.batch.size: 250
pipeline.batch.delay: 5
syslog.conf: |
# the syslog-pipeline file
input {
tcp {
id => 'syslog-tcp-input'
port => 1514
tags => [ "syslog", "tcp" ]
}
udp {
id => 'syslog-udp-input'
port => 1514
tags => [ "syslog", "udp" ]
}
}
filter {
if "syslog" in [tags] {
# so we get a random distribution to the different queues connected
# unfortunately the rng from jruby is a little bit strange, you will see if a lot of events are incoming
# this will generate randomly a number from 1 to 100 and save it into the field named random_number
ruby {
id => "ruby-random-number"
code => "event.set('random_number', rand(1..100).to_s())"
}
}
}
output {
if "syslog" in [tags] {
rabbitmq {
id => "syslog-rabbitmq-output"
exchange => "syslog"
exchange_type => "x-modulus-hash"
key => "%{random_number}"
user => "${LOGSTASH_RMQ_USER}"
password => "${LOGSTASH_RMQ_USER_PASS}"
host => [ "${RMQ_CLUSTER}" ]
durable => true
}
}
# for debugging
stdout {}
}
beat.conf: |
# the beat-pipeline file
input {
beats {
id => 'beats-input'
port => 5044
tags => "beat"
}
}
filter {
if "beat" in [tags] {
ruby {
id => "ruby-random-number"
code => "event.set('random_number', rand(1..100).to_s())"
}
}
}
output {
if "beat" in [tags] {
rabbitmq {
id => "beat-rabbitmq-output"
exchange => "beat"
exchange_type => "x-modulus-hash"
key => "%{random_key}"
user => "${LOGSTASH_RMQ_USER}"
password => "${LOGSTASH_RMQ_USER_PASS}"
host => [ "${RMQ_CLUSTER}" ]
durable => true
}
}
}
---
apiVersion: v1
kind: Service
metadata:
name: logstash-buffer
namespace: elk
labels:
app: logstash-buffer
spec:
selector:
app: logstash-buffer
type: NodePort
# this should let us see the IP of the host who sent events
# BUT this has one disadvantage! IF there is no pod running on
# the node where the packet was sent, it will just get dropped
#
# we try to mitigate this a little with the podAntiAffinity in the
# Deployment-section
#
# if all pods are running on one node, the traffic is loadbalanced
# at least i observed that.
externalTrafficPolicy: Local
ports:
- port: 1514
targetPort: 1514
nodePort: 31514
protocol: TCP
name: syslog-tcp
- port: 1514
targetPort: 1514
nodePort: 31514
protocol: UDP
name: syslog-udp
- port: 5044
targetPort: 5044
nodePort: 30044
protocol: TCP
name: beat
Finished So Far
Congrats you now have a full ELK-Stack with hot-warm-cold and buffer setup. If setup on two physical baremetal nodes you should be able to do maintenances without ELK downtimes (if you have a loadbalancer in front!).
The whole setup-structure should look similar to:
.
├── cerebro.yaml
├── elasticsearch.yaml
├── filebeat.yaml
├── kibana.yaml
├── kubernetes
│ ├── calico.yaml
│ └── ingress-nginx.yaml
├── logstash-buffer.yaml
├── logstash-filter.yaml
├── metricbeat.yaml
├── namespace.yaml
├── persistent-volumes
│ ├── cold
│ │ ├── zone-1
│ │ │ └── pv-001-elk-zone-1-cold-es-pod-0.yaml
│ │ └── zone-2
│ │ └── pv-002-elk-zone-2-cold-es-pod-0.yaml
│ ├── hot
│ │ ├── zone-1
│ │ │ └── pv-001-elk-zone-1-hot-es-pod-0.yaml
│ │ └── zone-2
│ │ └── pv-002-elk-zone-2-hot-es-pod-0.yaml
│ ├── rabbitmq
│ │ ├── zone-1
│ │ │ ├── pv-001-elk-rmq-cluster-server-pod-0.yaml
│ │ │ └── pv-001-elk-rmq-cluster-server-pod-2.yaml
│ │ └── zone-2
│ │ └── pv-002-elk-rmq-cluster-server-pod-1.yaml
│ ├── storage-class.yaml
│ └── warm
│ ├── zone-1
│ │ └── pv-001-elk-zone-1-warm-es-pod-0.yaml
│ └── zone-2
│ └── pv-002-elk-zone-2-warm-es-pod-0.yaml
├── rabbitmq.yaml
└── secrets
├── kibana-secret-settings.yaml
├── logstash-writer-user.yaml
└── rmq-cluster-logstash-user.yaml
Kommentare