PART-6 – Logstash

Logstash is nice for analyzing the events.

Prepare Logstash

We will create two different Logstash deployments, so we can scale them up or down individually. One deployment will be for the incoming events, which will simple be forwarded into the RabbitMQ without much groking or so. The second deployment will be mainly for filtering and pattern-analyzing of the events, it will take the events from the RabbitMQ out and put it into the Elasticsearch.

You have to add an extra logstash-user as described here (if you want) https://www.elastic.co/guide/en/logstash/current/ls-security.html – else you also can use the "elastic" user (who has all permissions). I created via Kibana a role "logstash_writer_user" and a user "logstash_writer". I saved the username and password in a secret.

# the key is base64 encoded
# echo -n 'HnWzJhQFxNuoUMPj8ufFxJPBcXFOh46m' | base64 -w 0
# this will output: SG5XekpoUUZ4TnVvVU1Qajh1ZkZ4SlBCY1hGT2g0Nm0K
apiVersion: v1
kind: Secret
metadata:
  name: logstash-writer-user
  namespace: elk
type: Opaque
data:
  username: bG9nc3Rhc2hfd3JpdGVy
  password: V005NFRwS1VtV2NRWTRx

"Filter"-Logstash

We will first deploy the "Filter"-Logstash pods as they will declare the exchange and the queues on RabbitMQ. As you will see, this yaml is quite long :o) – for the moment the "beat"-pipeline is compeltely disabled. Also the syslog-pipeline definitely could use some improvments.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash-filter
  # change if needed
  namespace: elk
  labels:
    app: logstash-filter
spec:
  replicas: 3
  selector:
    matchLabels:
      app: logstash-filter
  template:
    metadata:
      annotations:
        co.elastic.logs/module: logstash
      labels:
        app: logstash-filter
        stackmonitoring: logstash
    spec:
      # this makes sure that at least on every node a pod is running
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - logstash-buffer
              topologyKey: "kubernetes.io/hostname"
      containers:
      - image: docker.elastic.co/logstash/logstash:7.10.0
        name: logstash-filter
        ports:
        # the port where metricbeat will scrape from
        - containerPort: 9600
          name: http
          protocol: TCP
        # at the same time we use it for the healthchecks
        livenessProbe:
          httpGet:
            path: /
            port: 9600
          initialDelaySeconds: 30
          periodSeconds: 5
        readinessProbe:
          httpGet:
            path: /
            port: 9600
          initialDelaySeconds: 30
          periodSeconds: 5
        env:
        - name: ELASTICSEARCH_SERVICE
          # change if needed
          value: "https://elk-es-http:9200"
        # Those credentials are used ONLY for connecting to Elastichsearch and checking for changes
        # in pipelines managed via Kibana - the user has to be created e.g. via Kibana and the
        # credentials have to be saved as a secret. Centralized pipeline management works with trial or enterprise license
        # more info at https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
        #- name: LOGSTASH_ADMIN_USER
        #  value: "logstash_admin_user"
        #- name: LOGSTASH_ADMIN_USER_PASS
        #  valueFrom:
        #    secretKeyRef:
        #      name: logstash-admin-user
        #      key: logstash_admin_user
        # we can use those variables in centralized mangaged pipelines too
        - name: LOGSTASH_WRITER_USER
          valueFrom:
            secretKeyRef:
              name: logstash-writer-user
              key: username
        - name: LOGSTASH_WRITER_USER_PASS
          valueFrom:
            secretKeyRef:
              name: logstash-writer-user
              key: password
        - name: LOGSTASH_RMQ_USER
          valueFrom:
            secretKeyRef:
              name: rmq-cluster-logstash-user
              key: username
        - name: LOGSTASH_RMQ_USER_PASS
          valueFrom:
            secretKeyRef:
              name: rmq-cluster-logstash-user
              key: password
        # we will point those to the rabbitmq-pods, we will create a service for each pod
        # this will make it possible to bind queues directly to the rmq-nodes which makes event
        # distribution a little bit better.
        # the following variables are not needed for the logstash rabbitmq output plugin
        # for sending the events to the exchange we will us the normal rmq-cluster service
        - name: RMQ_NODE_1
          value: "rmq-cluster-server-0"
        - name: RMQ_NODE_2
          value: "rmq-cluster-server-1"
        - name: RMQ_NODE_3
          value: "rmq-cluster-server-2"
        # not needed for buffer
        - name: LOGSTASH_PATTERNS_DIR
          value: "/etc/logstash/patterns/"
        - name: ELASTICSEARCH_HTTPS_CA_PATH
          value: "/etc/logstash/certificates/elk-es-http-certs-public.crt"
        # we have to check what are good resource levels
        resources: {}
        volumeMounts:
        # the main config file for logstash
        - name: config-volume
          mountPath: /usr/share/logstash/config/logstash.yml
          subPath: logstash.yml
          readOnly: true
        # we use multiple pipelines
        - name: config-volume
          mountPath: /usr/share/logstash/config/pipelines.yml
          subPath: pipelines.yml
          readOnly: true
        # the syslog-pipeline config
        - name: config-volume
          mountPath: /usr/share/logstash/pipeline/syslog.conf
          subPath: syslog.conf
          readOnly: true
        # the beat-pipeline config
        - name: config-volume
          mountPath: /usr/share/logstash/pipeline/beat.conf
          subPath: beat.conf
          readOnly: true
        # we do not need the patterns in the logstash buffer
        - name: config-volume
          mountPath: /etc/logstash/patterns/containerlog_patterns
          subPath: containerlog_patterns
          readOnly: true
        - name: config-volume
          mountPath: /etc/logstash/patterns/syslog_patterns
          subPath: syslog_patterns
          readOnly: true
        - name: cert-ca
          mountPath: /etc/logstash/certificates/elk-es-http-certs-public.crt
          subPath: tls.crt
          readOnly: true
      volumes:
      - name: config-volume
        configMap:
          name: logstash-filter-configmap
      - name: cert-ca
        secret:
          secretName: elk-es-http-certs-public 
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-filter-configmap
  namespace: elk
data:
  logstash.yml: |
    # this enables logstash's http endpoint - where metricbeat will scrape from
    http.host: "0.0.0.0"
    http.port: 9600
    # we disable the internal monitoring - logstash will be scraped by metricbeat
    monitoring.enabled: false
    # you must add this parameter anyway so in stack monitoring it can be correctly assigned to the cluster :|
    # GET / - it will show the UUID
    # if not set a "Standalone Cluster" will pop up in "Stack Monitoring"
    monitoring.cluster_uuid: n2KDDWUMS2q4h8P5F3_Z8Q
    # if you want to enable centralized pipeline management (need trial or enterprise license)
    #xpack.management.enabled: true
    #xpack.management.elasticsearch.hosts: "${ELASTICSEARCH_SERVICE}"
    #xpack.management.elasticsearch.username: ${LOGSTASH_ADMIN_USER}
    #xpack.management.elasticsearch.password: ${LOGSTASH_ADMIN_USER_PASS}
    #xpack.management.elasticsearch.ssl.certificate_authority: /etc/logstash/certificates/elk-es-http-certs-public.crt
    #xpack.management.logstash.poll_interval: 5s
    # you have to define all possible pipelines you want to manage centrally
    # or create a pipeline first in Kibana and then here - high chance that the pipeline won't wrok as expected ;)
    #xpack.management.pipeline.id: [ "dummy-pipeline", "test-pipeline" ]
    # if you enable centralized pipeline management - pipeline configs via files are ignored!
  pipelines.yml: |
    # This file is where you define your pipelines. You can define multiple. 
    # For more information on multiple pipelines, see the documentation: 
    #   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html 

    # not needed - we disable that
    #- pipeline.id: main 
    #  path.config: "/etc/logstash/conf.d/*.conf" 

    # https://www.elastic.co/guide/en/logstash/current/tuning-logstash.html 
    # The pipeline.workers setting determines how many threads to run for filter and output processing. 
    # The pipeline.batch.size setting defines the maximum number of events an individual worker thread collects before attempting to execute filters and outputs. Larger batch sizes are generally more efficient, but increase memory overhead. 
    # The pipeline.batch.delay setting rarely needs to be tuned. This setting adjusts the latency of the Logstash pipeline. 

    - pipeline.id: syslog-filter
      path.config: "/usr/share/logstash/pipeline/syslog.conf" 
      pipeline.workers: 5
      pipeline.batch.size: 250 
      pipeline.batch.delay: 5
    # for the meantime just disabled
    #- pipeline.id: beat-filter
    #  path.config: "/usr/share/logstash/pipeline/beat.conf" 
    #  pipeline.workers: 5
    #  pipeline.batch.size: 250 
    #  pipeline.batch.delay: 5 
  syslog.conf: |
    # the syslog-pipeline file
    input {
      # The queues are created as durable and are bound to a certain "node"
      # this means if the "node" goes down for whatever reason, the queue is
      # in the meantime not available. but messages are not lost. as soon as
      # the node gets back online, logstash should reconnect.
      # Why do I want to bind the queue to one node?
      # As they are durable the queue should not be recreated on another "node"
      # - this would delete the saved messages on the node as soon as it is
      # coming back online.
      rabbitmq {
        id => "rmq-syslog-q1"
        # set the HOST/IP accordingly!
        host => [ "${RMQ_NODE_1}" ]
        user => "${LOGSTASH_RMQ_USER}"
        # set the password accordingly!
        password => "${LOGSTASH_RMQ_USER_PASS}"
        # set an exchange - it should theoretically be created
        exchange_type => "x-modulus-hash"
        exchange => "syslog"
        # the queues should be created by logstash if not existing
        key => "1"
        # woah, woah - watch out! - if a queue exist with the same name and key,
        # it WILL use it - and seeminlgy bind to the exchange configured above too!
        queue => "syslog_q1"
        tags => "syslog_q1"
        prefetch_count => 2500
        threads => 2
        # yes, the queue should not disappear if no consumer is connected
        durable => true
      }

      rabbitmq {
        id => "rmq-syslog-q2"
        # set the HOST/IP accordingly!
        host => [ "${RMQ_NODE_2}" ]
        user => "${LOGSTASH_RMQ_USER}"
        # set the password accordingly!
        password => "${LOGSTASH_RMQ_USER_PASS}"
        # set an exchange - it should theoretically be created
        exchange_type => "x-modulus-hash"
        exchange => "syslog"
        # the queues should be created by logstash if not existing
        key => "2"
        # woah, woah - watch out! - if a queue exist with the same name and key,
        # it WILL use it - and seeminlgy bind to the exchange configured above too!
        queue => "syslog_q2"
        tags => "syslog_q2"
        prefetch_count => 2500
        threads => 2
        # yes, the queue should not disappear if no consumer is connected
        durable => true
      }

      rabbitmq {
        id => "rmq-syslog-q3"
        # set the HOST/IP accordingly!
        host => [ "${RMQ_NODE_3}" ]
        user => "${LOGSTASH_RMQ_USER}"
        # set the password accordingly!
        password => "${LOGSTASH_RMQ_USER_PASS}"
        # set an exchange - it should theoretically be created
        exchange_type => "x-modulus-hash"
        exchange => "syslog"
        # the queues should be created by logstash if not existing
        key => "3"
        # woah, woah - watch out! - if a queue exist with the same name and key,
        # it WILL use it - and seeminlgy bind to the exchange configured above too!
        queue => "syslog_q3"
        tags => "syslog_q3"
        prefetch_count => 2500
        threads => 2
        # yes, the queue should not disappear if no consumer is connected
        durable => true
      }
    }

    filter {
      if "syslog" in [tags] {
        # we save the message in a none indexed field and the original incoming date and time
        # and we add a field which shows us where it was filtered
        mutate {
          id => "mutate-01"
          # not sure if we should or need to save the raw message in both "original" fields
          add_field => [ "[log][original]", "%{message}" ]
          add_field => [ "received_at", "%{@timestamp}" ]
          # this adds the pods name into a field, so we know which pod did the filtering
          add_field => [ "[filter][node]", "${HOSTNAME}" ]
        }

        # Manually parse the log, as we want to support both RCF3164 and RFC5424
        # first we try to check for RFC5424 events
        grok {
          id => "grok-01"
          # load slightly changed default syslog patterns
          patterns_dir => ["${LOGSTASH_PATTERNS_DIR}"]
          match => { "message" => "%{SYSLOG5424LINE}" }
          add_field => [ "received_from", "%{host}" ]
          add_tag => [ "_grokparsesuccess" ]
          overwrite => [ "message" ]
        }

        # if it RFC5424 we have to do a lot more to get a nice filtered event in kibana
        if [syslog5424_ts] {
          # Handle RFC5424 formatted Syslog messages
          mutate {
            id => "mutate-02"
            remove_field => [ "message", "host" ]
            add_tag => [ "syslog5424" ]
          }

          mutate {
            id => "mutate-03"
            # Use a friendlier naming scheme
            rename => { 
                "syslog5424_app"  => "syslogprog"
                "syslog5424_msg"  => "message"
                "syslog5424_host" => "syslog_host"
            }
            remove_field => [ "syslog5424_ver", "syslog5424_proc" ]
          }

          if [syslog5424_sd] {
            # All structured data needs to be in format [key=value,key=value,...]
            mutate {
                id => "mutate-04"
                # split at "][" brackets
                split => { "syslog5424_sd" => "][" }
            }

            mutate {
                id => "mutate-05"
                # Remove any brackets in this array-field
                gsub => [ "syslog5424_sd", "[\[\]]", "" ]
            }

            mutate {
                id => "mutate-06"
                rename => { "syslog5424_sd" => "[log][syslog][syslog5424][sd_data]" }
            }

            # not working as expected for the moment
            #kv {
            #    # Convert the structured data into Logstash fields
            #    source => "syslog5424_sd"
            #    field_split => ","
            #    value_split => "="
            #    remove_char_key => "\[\]"
            #    transform_key => "lowercase"
            #    target => "[syslog][sd_id]"
            #    remove_field => [ "syslog5424_sd" ]
            #}
          }

          # saves the log's date in @timestamp
          date {
            id => "date-01"
            match => [ "syslog5424_ts", "ISO8601" ]
            remove_field => [ "syslog5424_ts", "timestamp" ]
          }
        } else {
          # Handle RFC3164 formatted Syslog messages
          grok {
            id => "grok-02"
            # load slightly changed default syslog patterns
            patterns_dir => ["${LOGSTASH_PATTERNS_DIR}"]
            match => { "message" => "%{SYSLOGLINE}" }
            add_field => [ "received_from", "%{host}" ]
            add_tag => [ "_grokparsesuccess" ]
            add_tag => [ "syslog3164" ]
            overwrite => [ "message" ]
          }
        }

        # grok attempt for syslog messages with epoch-timestamps - none RFC conform but also seen
        if "_grokparsesuccess" not in [tags] {
          # Manually parse the log
          grok {
            id => "grok-03"
            # load slightly changed default syslog patterns
            patterns_dir => ["${LOGSTASH_PATTERNS_DIR}"]
            match => { "message" => "%{SYSLOGLINEUNIX}" }
            add_field => [ "received_from", "%{host}" ]
            add_tag => [ "_grokparsesuccess" ]
            add_tag => [ "syslogunixepoch" ]
            overwrite => [ "message" ]
          }
        }

        # last grok attempt for syslog messages in most simple format - e.g. sent via old version of logger command
        # <5>Jul 14 15:20:25 root: some test message
        if "_grokparsesuccess" not in [tags] {
          # Manually parse the log
          grok {
            id => "grok-04"
            match => { "message" => "\<%{NONNEGINT:priority}\>(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGPROG}: %{GREEDYDATA:message}" }
            add_field => [ "received_from", "%{host}" ]
            add_tag => [ "_grokparsesuccess" ]
            add_tag => [ "simple_syslog" ]
            overwrite => [ "message" ]
          }
        }

        # this will replace the @timestamp with the timestamp from the event if in correct format
        # check if log_timestamp is in correct format
        date {
          id => "date-02"
          match => [ "log_timestamp", "MMM dd yyyy HH:mm:ss" ]
          add_tag => [ "_dateparsesuccess" ]
          remove_field => [ "log_timestamp" ]
        }

        if "_dateparsesuccess" not in [tags] {
          date {
            id => "date-03"
            match => [ "log_timestamp", "MMM  d yyyy HH:mm:ss" ]
            add_tag => [ "_dateparsesuccess" ]
            remove_field => [ "log_timestamp" ]
          }
        }

        if "_dateparsesuccess" not in [tags] {
          date {
            id => "date-04"
            match => [ "log_timestamp", "ISO8601" ]
            add_tag => [ "_dateparsesuccess" ]
            remove_field => [ "log_timestamp" ]
          }
        }

        if "_dateparsesuccess" not in [tags] {
          date {
            id => "date-05"
            match => [ "log_timestamp", "MMM dd HH:mm:ss" ]
            add_tag => [ "_dateparsesuccess" ]
            remove_field => [ "log_timestamp" ]
          }
        }

        if "_dateparsesuccess" not in [tags] {
          date {
            id => "date-06"
            match => [ "log_timestamp", "MMM  d HH:mm:ss" ]
            add_tag => [ "_dateparsesuccess" ]
            remove_field => [ "log_timestamp" ]
          }
        }

        if "_dateparsesuccess" not in [tags] {
          date {
            id => "date-07"
            match => [ "log_timestamp", "UNIX" ]
            add_tag => [ "_dateparsesuccess" ]
            remove_field => [ "log_timestamp" ]
          }
        }

        if "_dateparsesuccess" not in [tags] {
          date {
            id => "date-08"
            match => [ "log_timestamp", "UNIX_MS" ]
            add_tag => [ "_dateparsesuccess" ]
            remove_field => [ "log_timestamp" ]
          }
        }

        # because all grok-filters are taken into account there would be also a '_grokparsefailure'-tag,
        # we don't need it if it was at least correctly filtered by one of the groks
        if ("_grokparsesuccess" in [tags]) {
          # syslog_pri extracts facility and loglevel from the "syslog_pri"ority-field
          syslog_pri { 
            id => "syslog-pri-01"
            syslog_pri_field_name => "priority" 
          }

          mutate {
            id => "mutate-07"
            remove_tag => [ "_grokparsefailure" ]
          }
        }

        ###########################################################
        ###### Final preps for ECS
        # remove the _dateparsefailure if we find a success-tag
        if ("_dateparsesuccess" in [tags]) {
          mutate {
            id => "mutate-08"
            remove_tag => [ "_dateparsefailure" ]
          }
        }

        # we have to remove the host field first so we can reuse the field name for ECS style later
        # it seems that we cannot do it in the same mutate-statement!
        if "_grokparsesuccess" in [tags] {
          # because we put the "host" already into the "received_from" field in the groks
          mutate {
            id => "mutate-09"
            remove_field => [ "host" ]
          }

          # in simple_syslog messages there is no explicit syslog_host :| 
          if "simple_syslog" not in [tags] {
            mutate {
              id => "mutate-10"
              # we add stuff to arrays
              add_field => { "[host][ip]" => "%{received_from}" }
              add_field => { "[host][name]" => "%{syslog_host}" }
            }
          } else {
            mutate {
              id => "mutate-11"
              # we add stuff to arrays
              add_field => { "[host][ip]" => "%{received_from}" }
              add_field => { "[host][name]" => "%{received_from}" }
            }
          }
        } else {
          # if no grok matched, we have to get the stuff from the "host"-field and temporarily save
          mutate {
            id => "mutate-12"
            add_field => { "received_from" => "%{host}" }
          }

          # we have to remove the initial host-field
          mutate {
            id => "mutate-13"
            remove_field => [ "host" ]
          }

          # we have to add this to the host.ip array
          mutate {
            id => "mutate-14"
            add_field => { "[host][ip]" => "%{received_from}" }
            # as we do not have a hostname because of bad parsing we have to leave the host.name field empty
          }
        }

        # for keeping to ECS
        if [syslog_severity] {
          mutate {
            id => "mutate-15"
            add_field => { "[log][level]" => "%{syslog_severity}" }
          }
        }

        # finally we rename and remove fields
        mutate {
          id => "mutate-16"
          # we can rename our simple string/text/number-fields ### better - change later https://www.elastic.co/guide/en/ecs/current/ecs-log.html
          rename => {
            "pid"                       => "[process][pid]"
            "program"                   => "[process][name]"
            "tid"                       => "[process][thread][id]"
            "syslogprog"                => "[log][logger]"
            "priority"                  => "[log][syslog][priority]"
            "syslog_facility"           => "[log][syslog][facility][name]"
            "syslog_facility_code"      => "[log][syslog][facility][code]"
            "syslog_severity"           => "[log][syslog][severity][name]"
            "syslog_severity_code"      => "[log][syslog][severity][code]"
            "syslog_ver"                => "[log][syslog][version]"
            "received_at"               => "[event][created]"
            "loglevel"                  => "[log][level]"
          }
          # removed from list above
          #"syslog_timestamp"          => "[log][syslog][timestamp]"

          # we remove unneeded fields with info we already have somewhere else in
          # it's in host.name and host.ip if applicable
          remove_field => [ "syslog_host" ]

          # is event.created
          remove_field => [ "received_at" ]

          # is in host.ip if applicable
          remove_field => [ "received_from" ]

          # we add event.dataset so SIEM part in Kibana looks nice
          add_field => { "[event][dataset]" => "syslog" }
          add_field => { "[event][type]" => "syslog" }

          # and finally we remove the type (if there was one) as this is duplicated info in event.dataset
          remove_field => [ "type" ]

          # some other ECS best practices
          add_field => { "[ecs][version]" => "1.7.0" }
        }

        # geoip
        #if [source][ip] {
        #  geoip {
        #    id => "geoip-01"
        #    source => "[source][ip]"
        #    target => "[geo]"
        #  }
        #}
      } 
    }

    output {
      if [event][dataset] == "syslog"  {
        elasticsearch {
          id => "elasticsearch-logstash-syslog-ilm"
          # we do not want logstash to overwrite or manage templates
          manage_template => false
          # just to be on the save side
          codec => json_lines
          # as we use https we have to configure the path to the CA-cert
          cacert => "${ELASTICSEARCH_HTTPS_CA_PATH}"
          user => "${LOGSTASH_WRITER_USER}"
          password => "${LOGSTASH_WRITER_USER_PASS}"
          ssl => true
          # we want to use ILM
          ilm_enabled => "true"
          ilm_rollover_alias => "logstash_syslog_ilm"
          ilm_pattern => "{now/d}-000001"
          ilm_policy => "logstash-syslog"
          # kubernetes should theoretically do the balancing itself
          hosts => [ "${ELASTICSEARCH_SERVICE}" ]
        }
      }
    }
  beat.conf: |
    # the beat-pipeline file
    #input {
    #  beat {
    #    id => 'beat-input'
    #    port => 5044
    #    tags => "beat"
    #  }
    #}
    #
    #filter {
    #  if "beat" in [tags] {
    #    ruby {
    #      id => "ruby-random-number"
    #      code => "event.set('random_number', rand(1..100).to_s())"
    #    }
    #  }
    #}
    #
    #output {
    #  if "beat" in [tags] {
    #    rabbitmq {
    #      id => "beat-rabbitmq-output"
    #      exchange => "beat"
    #      exchange_type => "x-modulus-hash"
    #      key => "%{random_number}"
    #      user => "${LOGSTASH_RMQ_USER}"
    #      password => "${LOGSTASH_RMQ_USER_PASSWORD}"
    #      host => [ "${RMQ_CLUSTER}" ]
    #      durable => true
    #    }
    #  }
    #}
    input {}
    filter {}
    output {}
  containerlog_patterns: |
    CONTAINERLOGTIMESTAMP (%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}|\[%{MONTHDAY}-%{MONTH}-%{YEAR} %{TIME}\])
    SUPERVISORDLOGLEVEL (CRIT|ERRO|WARN|INFO|DEBG|TRAC|BLAT)
  syslog_patterns: |
    # orientation: match => { "message" => "\<%{NONNEGINT:priority}\>(%{NONNEGINT} | )(%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) %{SYSLOGHOST:syslog_host} %{SYSLOGPROG:syslogprog}(: | )%{GREEDYDATA:message}" }
    SYSLOG5424PRINTASCII [!-~]+

    SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:log_timestamp}|%{TIMESTAMP_ISO8601:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
    SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:syslog_pam_module}\(%{DATA:syslog_pam_caller}\): session %{WORD:syslog_pam_session_state} for user %{USERNAME:syslog_username}(?: by %{GREEDYDATA:syslog_pam_by})?

    CRON_ACTION [A-Z ]+
    CRONLOG %{SYSLOGBASE} \(%{USER:syslog_user}\) %{CRON_ACTION:syslog_action} \(%{DATA:message}\)

    SYSLOGPRI <%{NONNEGINT:priority}>

    # IETF 5424 syslog(8) format (see http://www.rfc-editor.org/info/rfc5424)
    SYSLOG5424SD \[%{DATA}\]+
    SYSLOG5424BASE %{SYSLOGPRI}%{NONNEGINT:syslog_ver} +(?:%{TIMESTAMP_ISO8601:syslog5424_ts}|-) +(?:%{IPORHOST:syslog_host}|-) +(-|%{SYSLOG5424PRINTASCII:syslog5424_app}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_proc}) +(-|%{SYSLOG5424PRINTASCII:syslog5424_msgid}) +(?:%{SYSLOG5424SD:syslog5424_sd}|-|)

    SYSLOG5424LINE %{SYSLOG5424BASE} +%{GREEDYDATA:syslog5424_msg}

    # IETF 3164 syslog format
    SYSLOGLINE (?:%{SYSLOGPRI})%{SYSLOGBASE2} %{GREEDYDATA:message}

    # unix epoch time
    UNIXEPOCH (\d){10}
    UNIXEPOCHMS1 ((\d){10}\.(\d){3})
    UNIXEPOCHMS2 (\d){13}
    SYSLOGBASEUNIX (?:%{UNIXEPOCH:log_timestamp}|%{UNIXEPOCHMS1:log_timestamp}|%{UNIXEPOCHMS2:log_timestamp}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:syslog_host}+(?: %{SYSLOGPROG:syslogprog}:|)
    SYSLOGLINEUNIX (?:%{SYSLOGPRI})%{SYSLOGBASEUNIX} %{GREEDYDATA:message}

    # Error logs
    HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:apache_error_timestamp}\] \[%{LOGLEVEL:loglevel}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:message}
    HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:apache_error_timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel}\] \[pid %{POSINT:pid}(:tid %{NUMBER:tid})?\]( \(%{POSINT:proxy_errorcode}\)%{DATA:proxy_message}:)?( \[client %{IPORHOST:clientip}:%{POSINT:clientport}\])?( %{DATA:errorcode}:)? %{GREEDYDATA:message}
    HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}

"Buffer"-Logstash

will be configured with NodePort listening for example on a "Syslog"-Port and a "Beat"-Port. The set podAntiAffinity checks that at least on each k8s-node one pod is running. Again (as mentioned in the comments) this is important if running the service with externalTrafficPolicy: Local. Else we would not be able to preserve the client’s IP from where the logs are sent. You can read more about it here: https://kubernetes.io/docs/tutorials/services/source-ip/

apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash-buffer
  # change if needed
  namespace: elk
  labels:
    app: logstash-buffer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: logstash-buffer
  template:
    metadata:
      annotations:
        co.elastic.logs/module: logstash
      labels:
        app: logstash-buffer
        stackmonitoring: logstash
    spec:
      # this makes sure that at least on every node a pod is running
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - logstash-buffer
              topologyKey: "kubernetes.io/hostname"
      containers:
      - image: docker.elastic.co/logstash/logstash:7.10.0
        name: logstash-buffer
        ports:
        # the port where metricbeat will scrape from
        - containerPort: 9600
          name: http
          protocol: TCP
        livenessProbe:
          httpGet:
            path: /
            port: 9600
          initialDelaySeconds: 60
          periodSeconds: 5
        readinessProbe:
          httpGet:
            path: /
            port: 9600
          initialDelaySeconds: 60
          periodSeconds: 5
        env:
        - name: ELASTICSEARCH_SERVICE
          # change if needed
          value: "https://elk-es-http:9200"
        # Those credentials are used ONLY for connecting to Elastichsearch and checking for changes
        # in pipelines managed via Kibana - the user has to be created e.g. via Kibana and the
        # credentials have to be saved as a secret. Centralized pipeline management works with trial or enterprise license
        # more info at https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
        #- name: LOGSTASH_ADMIN_USER
        #  value: "logstash_admin_user"
        #- name: LOGSTASH_ADMIN_USER_PASS
        #  valueFrom:
        #    secretKeyRef:
        #      name: logstash-admin-user
        #      key: logstash_admin_user
        # we won't need the writer credentials as the logstash-buffer just forwards the events to rabbitmq
        # we can use those variables in centralized mangaged pipelines too
        #- name: LOGSTASH_WRITER_USER
        #  # we have to create those user first in elasticsearch
        #  value: "logstash_writer_user"
        #- name: LOGSTASH_WRITER_USER_PASS
        #  valueFrom:
        #    secretKeyRef:
        #      name: logstash-writer-user
        #      key: logstash_writer_user
        - name: LOGSTASH_RMQ_USER
          valueFrom:
            secretKeyRef:
              name: rmq-cluster-logstash-user
              key: username
        - name: LOGSTASH_RMQ_USER_PASS
          valueFrom:
            secretKeyRef:
              name: rmq-cluster-logstash-user
              key: password
        # for sending the events to the exchange we will us the normal rmq-cluster service
        - name: RMQ_CLUSTER
          value: "rmq-cluster"
        resources: {}
        volumeMounts:
        # the main config file for logstash
        - name: config-volume
          mountPath: /usr/share/logstash/config/logstash.yml
          subPath: logstash.yml
          readOnly: true
        # we use multiple pipelines
        - name: config-volume
          mountPath: /usr/share/logstash/config/pipelines.yml
          subPath: pipelines.yml
          readOnly: true
        # the syslog-pipeline config
        - name: config-volume
          mountPath: /usr/share/logstash/pipeline/syslog.conf
          subPath: syslog.conf
          readOnly: true
        # the beat-pipeline config
        - name: config-volume
          mountPath: /usr/share/logstash/pipeline/beat.conf
          subPath: beat.conf
          readOnly: true
      volumes:
      - name: config-volume
        configMap:
          name: logstash-buffer-configmap
      - name: cert-ca
        secret:
          secretName: elk-es-http-certs-public 
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-buffer-configmap
  namespace: elk
data:
  logstash.yml: |
    # this enables logstash's http endpoint - where metricbeat will scrape from
    http.host: "0.0.0.0"
    http.port: 9600
    # we disable the internal monitoring - logstash will be scraped by metricbeat
    monitoring.enabled: false
    # you must add this parameter anyway so in stack monitoring it can be correctly assigned to the cluster :|
    # GET / - it will show the UUID
    # if not set a "Standalone Cluster" will pop up in "Stack Monitoring"
    monitoring.cluster_uuid: n2KDDWUMS2q4h8P5F3_Z8Q
    # if you want to enable centralized pipeline management (need trial or enterprise license)
    #xpack.management.enabled: true
    #xpack.management.elasticsearch.hosts: "${ELASTICSEARCH_SERVICE}"
    #xpack.management.elasticsearch.username: ${LOGSTASH_ADMIN_USER}
    #xpack.management.elasticsearch.password: ${LOGSTASH_ADMIN_USER_PASS}
    #xpack.management.elasticsearch.ssl.certificate_authority: /etc/logstash/certificates/elk-es-http-certs-public.crt
    #xpack.management.logstash.poll_interval: 5s
    # you have to define all possible pipelines you want to manage centrally
    # or create a pipeline first in Kibana and then here - high chance that the pipeline won't wrok as expected ;)
    #xpack.management.pipeline.id: [ "dummy-pipeline", "test-pipeline" ]
    # if you enable centralized pipeline management - pipeline configs via files are ignored!
  pipelines.yml: |
    # This file is where you define your pipelines. You can define multiple. 
    # For more information on multiple pipelines, see the documentation: 
    #   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html 

    # not needed - we disable that
    #- pipeline.id: main 
    #  path.config: "/etc/logstash/conf.d/*.conf" 

    # https://www.elastic.co/guide/en/logstash/current/tuning-logstash.html 
    # The pipeline.workers setting determines how many threads to run for filter and output processing. 
    # The pipeline.batch.size setting defines the maximum number of events an individual worker thread collects before attempting to execute filters and outputs. Larger batch sizes are generally more efficient, but increase memory overhead. 
    # The pipeline.batch.delay setting rarely needs to be tuned. This setting adjusts the latency of the Logstash pipeline. 

    - pipeline.id: syslog-buffer
      path.config: "/usr/share/logstash/pipeline/syslog.conf" 
      pipeline.workers: 5
      pipeline.batch.size: 250 
      pipeline.batch.delay: 5 
    - pipeline.id: beat-buffer
      path.config: "/usr/share/logstash/pipeline/beat.conf" 
      pipeline.workers: 5
      pipeline.batch.size: 250 
      pipeline.batch.delay: 5 
  syslog.conf: |
    # the syslog-pipeline file
    input {
      tcp {
        id => 'syslog-tcp-input'
        port => 1514
        tags => [ "syslog", "tcp" ]
      }

      udp {
        id => 'syslog-udp-input'
        port => 1514
        tags => [ "syslog", "udp" ]
      }
    }

    filter {
      if "syslog" in [tags] {
        # so we get a random distribution to the different queues connected
        # unfortunately the rng from jruby is a little bit strange, you will see if a lot of events are incoming
        # this will generate randomly a number from 1 to 100 and save it into the field named random_number
        ruby {
          id => "ruby-random-number"
          code => "event.set('random_number', rand(1..100).to_s())"
        }
      }
    }

    output {
      if "syslog" in [tags] {
        rabbitmq {
          id => "syslog-rabbitmq-output"
          exchange => "syslog"
          exchange_type => "x-modulus-hash"
          key => "%{random_number}"
          user => "${LOGSTASH_RMQ_USER}"
          password => "${LOGSTASH_RMQ_USER_PASS}"
          host => [ "${RMQ_CLUSTER}" ]
          durable => true
        }
      }

      # for debugging
      stdout {} 
    }
  beat.conf: |
    # the beat-pipeline file
    input {
      beats {
        id => 'beats-input'
        port => 5044
        tags => "beat"
      }
    }

    filter {
      if "beat" in [tags] {
        ruby {
          id => "ruby-random-number"
          code => "event.set('random_number', rand(1..100).to_s())"
        }
      }
    }

    output {
      if "beat" in [tags] {
        rabbitmq {
          id => "beat-rabbitmq-output"
          exchange => "beat"
          exchange_type => "x-modulus-hash"
          key => "%{random_key}"
          user => "${LOGSTASH_RMQ_USER}"
          password => "${LOGSTASH_RMQ_USER_PASS}"
          host => [ "${RMQ_CLUSTER}" ]
          durable => true
        }
      }
    }
---
apiVersion: v1
kind: Service
metadata:
  name: logstash-buffer
  namespace: elk
  labels:
    app: logstash-buffer
spec:
  selector:
    app: logstash-buffer
  type: NodePort
  # this should let us see the IP of the host who sent events
  # BUT this has one disadvantage! IF there is no pod running on
  # the node where the packet was sent, it will just get dropped
  #
  # we try to mitigate this a little with the podAntiAffinity in the
  # Deployment-section
  #
  # if all pods are running on one node, the traffic is loadbalanced
  # at least i observed that.
  externalTrafficPolicy: Local
  ports:
  - port: 1514
    targetPort: 1514
    nodePort: 31514
    protocol: TCP
    name: syslog-tcp
  - port: 1514
    targetPort: 1514
    nodePort: 31514
    protocol: UDP
    name: syslog-udp
  - port: 5044
    targetPort: 5044
    nodePort: 30044
    protocol: TCP
    name: beat

Finished So Far

Congrats you now have a full ELK-Stack with hot-warm-cold and buffer setup. If setup on two physical baremetal nodes you should be able to do maintenances without ELK downtimes (if you have a loadbalancer in front!).

The whole setup-structure should look similar to:

.
├── cerebro.yaml
├── elasticsearch.yaml
├── filebeat.yaml
├── kibana.yaml
├── kubernetes
│   ├── calico.yaml
│   └── ingress-nginx.yaml
├── logstash-buffer.yaml
├── logstash-filter.yaml
├── metricbeat.yaml
├── namespace.yaml
├── persistent-volumes
│   ├── cold
│   │   ├── zone-1
│   │   │   └── pv-001-elk-zone-1-cold-es-pod-0.yaml
│   │   └── zone-2
│   │       └── pv-002-elk-zone-2-cold-es-pod-0.yaml
│   ├── hot
│   │   ├── zone-1
│   │   │   └── pv-001-elk-zone-1-hot-es-pod-0.yaml
│   │   └── zone-2
│   │       └── pv-002-elk-zone-2-hot-es-pod-0.yaml
│   ├── rabbitmq
│   │   ├── zone-1
│   │   │   ├── pv-001-elk-rmq-cluster-server-pod-0.yaml
│   │   │   └── pv-001-elk-rmq-cluster-server-pod-2.yaml
│   │   └── zone-2
│   │       └── pv-002-elk-rmq-cluster-server-pod-1.yaml
│   ├── storage-class.yaml
│   └── warm
│       ├── zone-1
│       │   └── pv-001-elk-zone-1-warm-es-pod-0.yaml
│       └── zone-2
│           └── pv-002-elk-zone-2-warm-es-pod-0.yaml
├── rabbitmq.yaml
└── secrets
    ├── kibana-secret-settings.yaml
    ├── logstash-writer-user.yaml
    └── rmq-cluster-logstash-user.yaml

Proceed to PART-7

Zuletzt bearbeitet: Dezember 19, 2020

Autor

Kommentare

Kommentar verfassen

Diese Website verwendet Akismet, um Spam zu reduzieren. Erfahre mehr darüber, wie deine Kommentardaten verarbeitet werden.