FluentD configuration

all your base are belong to us

Content
*file: 00-common.md *

Event structure and tags

Fluentd event consists of tag, time and record.

  • tag: Where an event comes from. For message routing
  • time: When an event happens. Nanosecond resolution
  • record: Actual log content. JSON object
The input plugin has responsibility for generating Fluentd event from data sources. For example, in_tail generates events from text lines. If you have following line in apache logs:
192.168.0.1 - - [28/Feb/2013:12:00:00 +0900] "GET / HTTP/1.1" 200 777
you got following event:
tag: apache.access         # set by configuration
time: 1362020400.000000000 # 28/Feb/2013:12:00:00 +0900
record: {"user":"-","method":"GET","code":200,"size":777,"host":"192.168.0.1","path":"/"}

FluentD Tags vs filesystem location

#kubelet create a symlink to log
/var/log/containers/eventrouter-5dc5c565cd-dw7tj_openshift-logging_kube-eventrouter-ae666e925d23df257f2360c6bcadc1257a0913b2d98f377d9919085a71812bcf.log -> /var/log/pods/openshift-logging_eventrouter-5dc5c565cd-dw7tj_bf80aa43-5359-48fc-a4f1-9921569a4c43/kube-eventrouter/0.log
#struct
/var/log/containers/nameOfPod_nameOfNamespace_nameOfContainer-hash.log  
#corresponding tag
var.log.containers.nameOfPod_nameOfNamespace_nameOfContainer-hash.log
*file: 01-systemD-logs.md *

SYSTEMD Journal logs transport

LOGS location on NODES

journalctl may be used to query the contents of the systemd(1) journal as written by systemd-journald.service(8)

Configuration files are: /etc/systemd/journald.conf
There the “Storage=” option controls whether to store journal data

  • volatile, journal log data will be stored only in memory, i.e. below the /run/log/journal hierarchy (which is created if needed).
  • persistent, data will be stored preferably on disk, i.e. below the /var/log/journal hierarchy (which is created if needed), with a fallback to /run/log/journal (which is created if needed), during early boot and if the disk is not writable.
  • auto is similar to “persistent” but the directory /var/log/journal is not created if needed, so that its existence controls where log data goes.
  • none turns off all storage, all log data received will be dropped.

In case of Openshift node configuration is set to persistent

 # all units in journal
journalctl --field _SYSTEMD_UNIT
 # journals path
journalctl --field JOURNAL_PATH
 # journal disk-usage 
journalctl --disk-usage
 # current journals
journalctl --field JOURNAL_NAME
journalctl -o json-pretty --no-pager
*file: 02-apiaudit-logs.md *

API AUDIT LOGS

Audit se týká OpenShift API serveru, Kubernetes API serveru a the OAuth API serveru.

Enablujeme audit pro api servery

oc edit apiserver cluster

apiVersion: config.openshift.io/v1
  kind: APIServer
  metadata:
  ...
  spec:
    audit:
      profile: WriteRequestBodies 
# Default, WriteRequestBodies, or AllRequestBodies. The default profile is Default.
#fluent.conf
    <source>
      @type tail
      path "/var/log/kube-apiserver/*.log"
      pos_file "/var/log/kube-apiserver.log.pos"
      refresh_interval 5
      rotate_wait 5
      tag audit.*
      read_from_head "true"
      @label @MEASURE
      <parse>
        @type json
        <pattern>
          format json
          time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
          keep_time_key true
        </pattern>
      </parse>
    </source>

    <source>
      @type tail
      path  "/var/log/openshift-apiserver/*.log"
      pos_file "/var/log/openshift-apiserver.log.pos"
      refresh_interval 5
      rotate_wait 5
      tag audit.*
      read_from_head "true"
      @label @MEASURE
      <parse>
        @type json
        <pattern>
          format json
          time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
          keep_time_key true
        </pattern>
      </parse>
    </source>

    <source>
      @type tail
      path  "/var/log/oauth-apiserver/*.log"
      pos_file "/var/log/oauth-apiserver.log.pos"
      refresh_interval 5
      rotate_wait 5
      tag audit.*
      read_from_head "true"
      @label @MEASURE
      <parse>
        @type json
        <pattern>
          format json
          time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
          keep_time_key true
        </pattern>
      </parse>
    </source>

final tags will be defined as:

audit.var.log.kube-apiserver.audit.log
audit.var.log.openshift-apierver.audit.log
audit.var.log/oauth-apiserver.audit.log

for tag filtering we will keep audit logs at verbs: create,update,patch,get,delete for metadata and payload for all users with user@domain

      <filter audit.var.log.openshift-apiserver.** audit.var.log.kube-apiserver.** audit.var.log.oauth-apiserver.**>
      # <filter audit.**>
         @type grep
          <regexp>
            key verb
            pattern /create|update|patch|delete|get/
          </regexp>
          <regexp>
             key user
             pattern /.+@.+/
          </regexp>
       </filter>
*file: 03-eventHubTransport.md *

Eventhub concept

Myšlenka je použít Evenhub jako store pro logy z OCP a pro transport těchto logů použít FluentD. Koncept Eventhubu je dost podobný kafka konceptu, slovníček terminologie je zde:

# eventhub vs kafka
Kafka Concept vs Event Hubs Concept
Cluster        <---->     Namespace
Topic          <---->     Event Hub
Partition      <---->     Partition
Consumer Group <---->     Consumer Group
Offset         <---->     Offset

EventHub sizing

Minimální jednotka ThruPut Unit (TU) - znamená 1 MB/Sec(inbound) nebo 1000msgs/sec (co z toho bude první) a definuje se pro celý Namespace. TU se dá dynamicky měnit.
Zároveň každý EventHub má určité množství partitions (defaultně jednu) - jde definovat pouze při vytvoření a 1 partition vlastně dělá CAP pro daný EvenHub 1MB/Sec(inbound) nebo 1000msgs/sec (co z toho bude první).
Maximální velikost zprávy kterou EvenHub akceptuje je 1MB.
Zároveň se mi zdá že je limitováno množství zpráv v jednom “record batch” ale v oficiální dokumentaci jsem nic kolem toho nenašel.

Pokud je tento limit přesažen vrací se chyba:

  • message_too_long nebo
  • connection reset

FluentD with eventhub aspect

Pro konfiguraci fluentD použijeme kafka2 plugin

fluentD kafka output sample configuration:

#fluent.conf
# oc get cm fluentd -o yaml|yq e '(.data.fluent*)' -|vim -
    <label @LOGS>
      <match **>
        @type copy
        <store>
          @type kafka2
          brokers log.servicebus.windows.net:9093
          # get some response from eventhub, usefull for debug
          get_kafka_client_log true
          ssl_ca_certs_from_system true
          default_topic 'log.oaz1_test_in'
          username $ConnectionString
          password "Endpoint=sb://log.servicebus.windows.net/;SharedAccessKeyName=ack;SharedAccessKey=jjwYrmVQkk22fuXsadCQTHNSDrSW1wTSfq8rOOCkEMc="
          # topic settings
          <buffer topic>
            @type file
            path '/var/lib/fluentd/logs'
            flush_interval 5s
            flush_thread_count 2
            #limit queues
            queued_chunks_limit_size "#{ENV['BUFFER_QUEUE_LIMIT'] || '32' }"
            #Once the total size of stored buffer reached this threshold, all append operations will fail with error (and data will be lost)
            total_limit_size "#{ENV['TOTAL_LIMIT_SIZE'] ||  8589934592 }" #8G
            chunk_limit_size 819200
            overflow_action throw_exception
            retry_type exponential_backoff
            retry_wait 5
            retry_exponential_backoff_base 4
            retry_max_interval 600
            retry_forever
          </buffer>
          max_send_retries 2
          required_acks -1
          max_send_limit_bytes 1000000
          <format>
            @type json
          </format>

        </store>
      </match>
    </label>

Important Config Options

chunk_limit_size 819200 #800KB

Jelikož pro eventhub platí message.max.bytes=1048576 musíme použít menší chunk než 1MB.

chunk: A buffer plugin uses a chunk as a lightweight container, and fills it with events incoming from input sources. If a chunk becomes full, then it gets “shipped” to the destination. zároveň pro kafka2 plugin platí: kafka2 uses one buffer chunk for one record batch.

max_send_limit_bytes 1000000

max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped. Trochu sem se nechal zmýlit a myslel jsem že je to limit pro velikost zprávy.

Chceme abychom vždy zprávy odeslali ale pokud je cíl nedostupný nastavíme exponenciální zvyšování času mezi pokusy až do hodnoty 10min.

# sending options 
max_send_retries 2 #send 2x to leader direct
#A chunk can fail to be written out to the destination for a number of reasons. The network can go down, or the traffic volumes can exceed the capacity of the destination node. To handle such common failures gracefully, buffer plugins are equipped with a built-in retry mechanism.
retry_type exponential_backoff
retry_max_interval 600 #10 min maximálně pro exponent
retry_exponential_backoff_base 4
retry_wait 5 #first wait 5..20..80...
retry_forever #never give up

Zároveň nastává problém s velikostí chunku pro audit (WriteBody).
Jelikož velikost chunku poslaného do eventhubu může být 1MB ale zároveň velikost vytvořeného objektu(tedy auditovaného) je také max 1MB. Chunk se však načítá paralelně a prostě není schopen to okamžitě utnout.
Pro audit logy je tedy zdá být vhodná konfigurace bufferu, kde limitujeme množství zpráv a co nejčastěji děláme flush

# audit log transport 
            flush_interval 1s
            flush_thread_count 4
            queued_chunks_limit_size "#{ENV['BUFFER_QUEUE_LIMIT'] || '32' }"
            total_limit_size "#{ENV['TOTAL_LIMIT_SIZE'] ||  8589934592 }" #8G
            # chunk_limit_size "#{ENV['BUFFER_SIZE_LIMIT'] || '8m'}"
            chunk_limit_size 819200
            chunk_full_threshold 0.9
            chunk_limit_records 10

DEBUG transport to EventHub

Pro kafka clienta je dobré použít, jinak je log dost hluchý

get_kafka_client_log true

pro samotný fluentD pak upravit fluent.conf

    <system>
      log_level "#{ENV['LOG_LEVEL'] || 'warn'}"
    </system>
# klasicky fatal, error, warn, info, debug trace
# přičemž level info už je dostatečně obsáhlý
*file: 04-multilineLog.md *

JAVA STACK multiline log

Testing env

apiVersion: apps/v1
kind: Deployment
namespace: logging-test
metadata:
  name: multiline-java
  labels:
    app: multiline-java
spec:
  replicas: 1
  selector:
    matchLabels:
      app: multiline-java
  template:
    metadata:
      labels:
        app: multiline-java
    spec:
      containers:
        - name: multiline-java
          image: image-registry.openshift-image-registry.svc:5000/blaster/multilinejavaloggenerator:v1
          # image: quay.io/dedtom/multilinejavaloggenerator:v1
          imagePullPolicy: IfNotPresent

Definice problému

JAva stack Log pokud je v plainu je multine

2021-06-07 13:18:15.176 ERROR 1 --- [   scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task
java.lang.RuntimeException: Error happened
        at com.fmja.FluentdMultilineJavaApplication.FluentdMultilineJavaApplication.logException(FluentdMultilineJavaApplication.java:37) ~[classes!/:0.0.1-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
        at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.3.jar!/:5.3.3]
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.3.jar!/:5.3.3]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

po pruchodu cri-o conmon je obohacen o metadata store do logu na nodu

2021-06-08T09:24:38.563261747+00:00 stdout F 2021-06-08 09:24:38.562 ERROR 1 --- [   scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler    : Unexpected error occurred in scheduled task
2021-06-08T09:24:38.563261747+00:00 stdout F
2021-06-08T09:24:38.563261747+00:00 stdout F java.lang.RuntimeException: Error happened
2021-06-08T09:24:38.563261747+00:00 stdout F    at com.fmja.FluentdMultilineJavaApplication.FluentdMultilineJavaApplication.logException(FluentdMultilineJavaApplication.java:37) ~[classes!/:0.0.1-SNAPSHOT]
2021-06-08T09:24:38.563261747+00:00 stdout F    at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[na:na]
2021-06-08T09:24:38.563261747+00:00 stdout F    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
2021-06-08T09:24:38.563261747+00:00 stdout F    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
2021-06-08T09:24:38.563261747+00:00 stdout F    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.3.jar!/:5.3.3]

kubelet-cri-logging

fluentD to dále zpracuje line by line a nikdy nedojde ke spojení logů, v kibaně pak vypadají logy jako

kibana-multiline-javastack

kibana-multiline-javastack

SOLUTION for FluentD

Všechny eventy z source “/var/log/containers/**” tagovány jako “kubernetes.**” budou filtrovány na multiline regexp match (timestamp -2021-06-10 12:09:05.176) proti fieldu log a následně concatovány.
Problém je s matchováním konce eventu pokud nepřijde další log, proto je nastaven flush_interval na 1

The number of seconds after which the last received event log will be flushed. If specified 0, wait for next line forever. Zároveň jak není schopen určit konec tak po překročení flushing intervalu přehodí zprávu do erroru, ale mi ten error vezmeme a přesměrujeme na standartní label.

    <label @JAVACONCAT>
      <filter kubernetes.**>
        @type concat
        key log
        multiline_start_regexp /^(\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{0,3})/
        flush_interval 1
        timeout_label "@MEASURE"
      </filter>
      <match kubernetes.**>
        @type relabel
        @label @MEASURE
      </match>
    </label>

Problém tohoto řešení je ,že filtrujeme všechny logy z “/var/log/containers/**”. Uplně sem nedohlédnul na problémy které to může způsobovat ale fungujeme.

kibana-multiline-done

kibana-multiline-done