FluentD configuration
all your base are belong to us
Event structure and tags
Fluentd event consists of tag, time and record.
- tag: Where an event comes from. For message routing
- time: When an event happens. Nanosecond resolution
- record: Actual log content. JSON object
The input plugin has responsibility for generating Fluentd event from data sources. For example, in_tail generates events from text lines. If you have following line in apache logs:
192.168.0.1 - - [28/Feb/2013:12:00:00 +0900] "GET / HTTP/1.1" 200 777
you got following event:
tag: apache.access # set by configuration
time: 1362020400.000000000 # 28/Feb/2013:12:00:00 +0900
record: {"user":"-","method":"GET","code":200,"size":777,"host":"192.168.0.1","path":"/"}
FluentD Tags vs filesystem location
#kubelet create a symlink to log
/var/log/containers/eventrouter-5dc5c565cd-dw7tj_openshift-logging_kube-eventrouter-ae666e925d23df257f2360c6bcadc1257a0913b2d98f377d9919085a71812bcf.log -> /var/log/pods/openshift-logging_eventrouter-5dc5c565cd-dw7tj_bf80aa43-5359-48fc-a4f1-9921569a4c43/kube-eventrouter/0.log
#struct
/var/log/containers/nameOfPod_nameOfNamespace_nameOfContainer-hash.log
#corresponding tag
var.log.containers.nameOfPod_nameOfNamespace_nameOfContainer-hash.log
SYSTEMD Journal logs transport
LOGS location on NODES
journalctl may be used to query the contents of the systemd(1) journal as written by systemd-journald.service(8)
Configuration files are: /etc/systemd/journald.conf
There the “Storage=” option controls whether to store journal data
- volatile, journal log data will be stored only in memory, i.e. below the /run/log/journal hierarchy (which is created if needed).
- persistent, data will be stored preferably on disk, i.e. below the /var/log/journal hierarchy (which is created if needed), with a fallback to /run/log/journal (which is created if needed), during early boot and if the disk is not writable.
- auto is similar to “persistent” but the directory /var/log/journal is not created if needed, so that its existence controls where log data goes.
- none turns off all storage, all log data received will be dropped.
In case of Openshift node configuration is set to persistent
# all units in journal
journalctl --field _SYSTEMD_UNIT
# journals path
journalctl --field JOURNAL_PATH
# journal disk-usage
journalctl --disk-usage
# current journals
journalctl --field JOURNAL_NAME
journalctl -o json-pretty --no-pager
API AUDIT LOGS
Audit se týká OpenShift API serveru, Kubernetes API serveru a the OAuth API serveru.
Enablujeme audit pro api servery
oc edit apiserver cluster
apiVersion: config.openshift.io/v1
kind: APIServer
metadata:
...
spec:
audit:
profile: WriteRequestBodies
# Default, WriteRequestBodies, or AllRequestBodies. The default profile is Default.
#fluent.conf
<source>
@type tail
path "/var/log/kube-apiserver/*.log"
pos_file "/var/log/kube-apiserver.log.pos"
refresh_interval 5
rotate_wait 5
tag audit.*
read_from_head "true"
@label @MEASURE
<parse>
@type json
<pattern>
format json
time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
keep_time_key true
</pattern>
</parse>
</source>
<source>
@type tail
path "/var/log/openshift-apiserver/*.log"
pos_file "/var/log/openshift-apiserver.log.pos"
refresh_interval 5
rotate_wait 5
tag audit.*
read_from_head "true"
@label @MEASURE
<parse>
@type json
<pattern>
format json
time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
keep_time_key true
</pattern>
</parse>
</source>
<source>
@type tail
path "/var/log/oauth-apiserver/*.log"
pos_file "/var/log/oauth-apiserver.log.pos"
refresh_interval 5
rotate_wait 5
tag audit.*
read_from_head "true"
@label @MEASURE
<parse>
@type json
<pattern>
format json
time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
keep_time_key true
</pattern>
</parse>
</source>
final tags will be defined as:
audit.var.log.kube-apiserver.audit.log
audit.var.log.openshift-apierver.audit.log
audit.var.log/oauth-apiserver.audit.log
for tag filtering we will keep audit logs at verbs: create,update,patch,get,delete for metadata and payload for all users with user@domain
<filter audit.var.log.openshift-apiserver.** audit.var.log.kube-apiserver.** audit.var.log.oauth-apiserver.**>
# <filter audit.**>
@type grep
<regexp>
key verb
pattern /create|update|patch|delete|get/
</regexp>
<regexp>
key user
pattern /.+@.+/
</regexp>
</filter>
Eventhub concept
Myšlenka je použít Evenhub jako store pro logy z OCP a pro transport těchto logů použít FluentD. Koncept Eventhubu je dost podobný kafka konceptu, slovníček terminologie je zde:
# eventhub vs kafka
Kafka Concept vs Event Hubs Concept
Cluster <----> Namespace
Topic <----> Event Hub
Partition <----> Partition
Consumer Group <----> Consumer Group
Offset <----> Offset
EventHub sizing
Minimální jednotka ThruPut Unit (TU) - znamená 1 MB/Sec(inbound) nebo 1000msgs/sec (co z toho bude první) a definuje se pro celý Namespace. TU se dá dynamicky měnit.
Zároveň každý EventHub má určité množství partitions (defaultně jednu) - jde definovat pouze při vytvoření a 1 partition vlastně dělá CAP pro daný EvenHub 1MB/Sec(inbound) nebo 1000msgs/sec (co z toho bude první).
Maximální velikost zprávy kterou EvenHub akceptuje je 1MB.
Zároveň se mi zdá že je limitováno množství zpráv v jednom “record batch” ale v oficiální dokumentaci jsem nic kolem toho nenašel.
Pokud je tento limit přesažen vrací se chyba:
- message_too_long nebo
- connection reset
FluentD with eventhub aspect
Pro konfiguraci fluentD použijeme kafka2 plugin
fluentD kafka output sample configuration:
#fluent.conf
# oc get cm fluentd -o yaml|yq e '(.data.fluent*)' -|vim -
<label @LOGS>
<match **>
@type copy
<store>
@type kafka2
brokers log.servicebus.windows.net:9093
# get some response from eventhub, usefull for debug
get_kafka_client_log true
ssl_ca_certs_from_system true
default_topic 'log.oaz1_test_in'
username $ConnectionString
password "Endpoint=sb://log.servicebus.windows.net/;SharedAccessKeyName=ack;SharedAccessKey=jjwYrmVQkk22fuXsadCQTHNSDrSW1wTSfq8rOOCkEMc="
# topic settings
<buffer topic>
@type file
path '/var/lib/fluentd/logs'
flush_interval 5s
flush_thread_count 2
#limit queues
queued_chunks_limit_size "#{ENV['BUFFER_QUEUE_LIMIT'] || '32' }"
#Once the total size of stored buffer reached this threshold, all append operations will fail with error (and data will be lost)
total_limit_size "#{ENV['TOTAL_LIMIT_SIZE'] || 8589934592 }" #8G
chunk_limit_size 819200
overflow_action throw_exception
retry_type exponential_backoff
retry_wait 5
retry_exponential_backoff_base 4
retry_max_interval 600
retry_forever
</buffer>
max_send_retries 2
required_acks -1
max_send_limit_bytes 1000000
<format>
@type json
</format>
</store>
</match>
</label>
Important Config Options
chunk_limit_size 819200 #800KB
Jelikož pro eventhub platí message.max.bytes=1048576 musíme použít menší chunk než 1MB.
chunk: A buffer plugin uses a chunk as a lightweight container, and fills it with events incoming from input sources. If a chunk becomes full, then it gets “shipped” to the destination. zároveň pro kafka2 plugin platí: kafka2 uses one buffer chunk for one record batch.
max_send_limit_bytes 1000000
max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped. Trochu sem se nechal zmýlit a myslel jsem že je to limit pro velikost zprávy.
Chceme abychom vždy zprávy odeslali ale pokud je cíl nedostupný nastavíme exponenciální zvyšování času mezi pokusy až do hodnoty 10min.
# sending options
max_send_retries 2 #send 2x to leader direct
#A chunk can fail to be written out to the destination for a number of reasons. The network can go down, or the traffic volumes can exceed the capacity of the destination node. To handle such common failures gracefully, buffer plugins are equipped with a built-in retry mechanism.
retry_type exponential_backoff
retry_max_interval 600 #10 min maximálně pro exponent
retry_exponential_backoff_base 4
retry_wait 5 #first wait 5..20..80...
retry_forever #never give up
Zároveň nastává problém s velikostí chunku pro audit (WriteBody).
Jelikož velikost chunku poslaného do eventhubu může být 1MB ale zároveň velikost vytvořeného objektu(tedy auditovaného) je také max 1MB. Chunk se však načítá paralelně a prostě není schopen to okamžitě utnout.
Pro audit logy je tedy zdá být vhodná konfigurace bufferu, kde limitujeme množství zpráv a co nejčastěji děláme flush
# audit log transport
flush_interval 1s
flush_thread_count 4
queued_chunks_limit_size "#{ENV['BUFFER_QUEUE_LIMIT'] || '32' }"
total_limit_size "#{ENV['TOTAL_LIMIT_SIZE'] || 8589934592 }" #8G
# chunk_limit_size "#{ENV['BUFFER_SIZE_LIMIT'] || '8m'}"
chunk_limit_size 819200
chunk_full_threshold 0.9
chunk_limit_records 10
DEBUG transport to EventHub
Pro kafka clienta je dobré použít, jinak je log dost hluchý
get_kafka_client_log true
pro samotný fluentD pak upravit fluent.conf
<system>
log_level "#{ENV['LOG_LEVEL'] || 'warn'}"
</system>
# klasicky fatal, error, warn, info, debug trace
# přičemž level info už je dostatečně obsáhlý
JAVA STACK multiline log
Testing env
apiVersion: apps/v1
kind: Deployment
namespace: logging-test
metadata:
name: multiline-java
labels:
app: multiline-java
spec:
replicas: 1
selector:
matchLabels:
app: multiline-java
template:
metadata:
labels:
app: multiline-java
spec:
containers:
- name: multiline-java
image: image-registry.openshift-image-registry.svc:5000/blaster/multilinejavaloggenerator:v1
# image: quay.io/dedtom/multilinejavaloggenerator:v1
imagePullPolicy: IfNotPresent
Definice problému
JAva stack Log pokud je v plainu je multine
2021-06-07 13:18:15.176 ERROR 1 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
java.lang.RuntimeException: Error happened
at com.fmja.FluentdMultilineJavaApplication.FluentdMultilineJavaApplication.logException(FluentdMultilineJavaApplication.java:37) ~[classes!/:0.0.1-SNAPSHOT]
at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.3.jar!/:5.3.3]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.3.jar!/:5.3.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
po pruchodu cri-o conmon je obohacen o metadata store do logu na nodu
2021-06-08T09:24:38.563261747+00:00 stdout F 2021-06-08 09:24:38.562 ERROR 1 --- [ scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler : Unexpected error occurred in scheduled task
2021-06-08T09:24:38.563261747+00:00 stdout F
2021-06-08T09:24:38.563261747+00:00 stdout F java.lang.RuntimeException: Error happened
2021-06-08T09:24:38.563261747+00:00 stdout F at com.fmja.FluentdMultilineJavaApplication.FluentdMultilineJavaApplication.logException(FluentdMultilineJavaApplication.java:37) ~[classes!/:0.0.1-SNAPSHOT]
2021-06-08T09:24:38.563261747+00:00 stdout F at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) ~[na:na]
2021-06-08T09:24:38.563261747+00:00 stdout F at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
2021-06-08T09:24:38.563261747+00:00 stdout F at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
2021-06-08T09:24:38.563261747+00:00 stdout F at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.3.jar!/:5.3.3]
fluentD to dále zpracuje line by line a nikdy nedojde ke spojení logů, v kibaně pak vypadají logy jako kibana-multiline-javastack
SOLUTION for FluentD
Všechny eventy z source “/var/log/containers/**” tagovány jako “kubernetes.**” budou filtrovány na multiline regexp match (timestamp -2021-06-10 12:09:05.176) proti fieldu log a následně concatovány.
Problém je s matchováním konce eventu pokud nepřijde další log, proto je nastaven flush_interval na 1
The number of seconds after which the last received event log will be flushed. If specified 0, wait for next line forever. Zároveň jak není schopen určit konec tak po překročení flushing intervalu přehodí zprávu do erroru, ale mi ten error vezmeme a přesměrujeme na standartní label.
<label @JAVACONCAT>
<filter kubernetes.**>
@type concat
key log
multiline_start_regexp /^(\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{0,3})/
flush_interval 1
timeout_label "@MEASURE"
</filter>
<match kubernetes.**>
@type relabel
@label @MEASURE
</match>
</label>
Problém tohoto řešení je ,že filtrujeme všechny logy z “/var/log/containers/**”. Uplně sem nedohlédnul na problémy které to může způsobovat ale fungujeme. kibana-multiline-done