使用 OpenTelemetry Collector 收集 Kubernetes 日志数据
作者:阳明 2023-08-31 07:46:54云计算云原生 首先我们需要部署 Loki 来收集日志数据,同样我们这里使用 Helm Chart 来快速部署,不过需要注意同样不需要部署任何日志采集器,因为我们将使用 OpenTelemetry Collector 来收集日志数据,然后再将其发送到 Loki 中。
前面我们介绍了如何通过 OpenTelemetry Collector 来收集 Kubernetes 集群的指标数据,接下来我们再来了解下如何收集集群的日志记录数据。
安装 Loki
首先我们需要部署 Loki 来收集日志数据,同样我们这里使用 Helm Chart 来快速部署,不过需要注意同样不需要部署任何日志采集器,因为我们将使用 OpenTelemetry Collector 来收集日志数据,然后再将其发送到 Loki 中。
$ helm repo add grafana https://grafana.github.io/helm-chart$ helm repo update
我们这里创建一个loki-values.yaml文件来配置 Loki Helm Chart:
# loki-values.yamlloki: commonConfig: replication_factor: 1 auth_enabled: false storage: type: "filesystem"singleBinary: replicas: 1 persistence: enabled: true size: 10Gi storageClass: cfsautomonitoring: lokiCanary: enabled: false selfMonitoring: grafanaAgent: installOperator: falsetest: enabled: falsegateway: ingress: enabled: true ingressClassName: nginx tls: [] hosts: - host: loki.k8s.local paths: - path: / pathType: Prefix
然后直接使用下面的命令一键部署 Loki 即可:
$ helm upgrade --install loki grafana/loki -f loki-values.yaml --namespace kube-otel$ kubectl get pods -n kube-otel -l app.kubernetes.io/instance=lokiNAME READY STATUS RESTARTS AGEloki-0 1/1 Running 0 3m52sloki-gateway-5ffc9fbbf5-m5q75 1/1 Running 0 8m42s$ kubectl get ingress -n kube-otelNAME CLASS HOSTS ADDRESS PORTS AGEloki-gateway nginx loki.k8s.local 10.98.12.94 80 11m
启用 filelog 接收器
接下来我们就需要配置 OpenTelemetry Collector 来将日志数据发送到 Loki 中,首先更新otel-collector-ds-values.yaml文件,我们需要添加一个 Loki 的导出器,并开启filelogreceiver接收器:
# otel-collector-ds-values.yamlmode: daemonsetpresets: hostMetrics: enabled: true kubernetesAttributes: enabled: true kubeletMetrics: enabled: true # 启用 filelogreceiver 收集器 logsCollection: enabled: trueconfig: exporters: loki: endpoint: http://loki-gateway/loki/api/v1/push timeout: 10s # 超时时间 read_buffer_size: 200 write_buffer_size: 100 retry_on_failure: # 配置重试 enabled: true initial_interval: 10s # 初始间隔 max_interval: 60s # 最大间隔 max_elapsed_time: 10m # 最大时间 default_labels_enabled: exporter: false processors: resource: attributes: - action: insert key: loki.resource.labels value: k8s.namespace.name,k8s.pod.name,k8s.container.name service: pipelines: logs: exporters: - loki processors: - memory_limiter - k8sattributes - resource - batch
然后重新更新 OpenTelemetry Collector DaemonSet:
$ helm upgrade --install opentelemetry-collector ./opentelemetry-collector -f otel-ds-values.yaml --namespace kube-otel --create-namespace
同样更新后查看完整的配置信息,使用命令kubectl get cm -n opentelemetry-collector-agent -oyaml:
exporters: logging: loglevel: debug loki: endpoint: http://loki-gateway/loki/api/v1/push timeout: 10s # 超时时间 read_buffer_size: 200 write_buffer_size: 100 retry_on_failure: # 配置重试 enabled: true initial_interval: 10s # 初始间隔 max_interval: 60s # 最大间隔 max_elapsed_time: 10m # 最大时间 default_labels_enabled: exporter: falseextensions: health_check: {} memory_ballast: size_in_percentage: 40processors: batch: {} k8sattributes: extract: metadata: - k8s.namespace.name - k8s.deployment.name - k8s.statefulset.name - k8s.daemonset.name - k8s.cronjob.name - k8s.job.name - k8s.node.name - k8s.pod.name - k8s.pod.uid - k8s.pod.start_time filter: node_from_env_var: K8S_NODE_NAME passthrough: false pod_association: - sources: - from: resource_attribute name: k8s.pod.ip - sources: - from: resource_attribute name: k8s.pod.uid - sources: - from: connection memory_limiter: check_interval: 5s limit_percentage: 80 spike_limit_percentage: 25 resource: attributes: - action: insert key: loki.resource.labels value: k8s.namespace.name,k8s.pod.name,k8s.container.namereceivers: filelog: exclude: - /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log include: - /var/log/pods/*/*/*.log include_file_name: false include_file_path: true operators: - id: get-format routes: - expr: body matches "^\\{" output: parser-docker - expr: body matches "^[^ Z]+ " output: parser-crio - expr: body matches "^[^ Z]+Z" output: parser-containerd type: router - id: parser-crio regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$ timestamp: layout: 2006-01-02T15:04:05.999999999Z07:00 layout_type: gotime parse_from: attributes.time type: regex_parser - combine_field: attributes.log combine_with: "" id: crio-recombine is_last_entry: attributes.logtag == 'F' max_log_size: 102400 output: extract_metadata_from_filepath source_identifier: attributes["log.file.path"] type: recombine - id: parser-containerd regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$ timestamp: layout: "%Y-%m-%dT%H:%M:%S.%LZ" parse_from: attributes.time type: regex_parser - combine_field: attributes.log combine_with: "" id: containerd-recombine is_last_entry: attributes.logtag == 'F' max_log_size: 102400 output: extract_metadata_from_filepath source_identifier: attributes["log.file.path"] type: recombine - id: parser-docker output: extract_metadata_from_filepath timestamp: layout: "%Y-%m-%dT%H:%M:%S.%LZ" parse_from: attributes.time type: json_parser - id: extract_metadata_from_filepath parse_from: attributes["log.file.path"] regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$ type: regex_parser - from: attributes.stream to: attributes["log.iostream"] type: move - from: attributes.container_name to: resource["k8s.container.name"] type: move - from: attributes.namespace to: resource["k8s.namespace.name"] type: move - from: attributes.pod_name to: resource["k8s.pod.name"] type: move - from: attributes.restart_count to: resource["k8s.container.restart_count"] type: move - from: attributes.uid to: resource["k8s.pod.uid"] type: move - from: attributes.log to: body type: move start_at: beginning otlp: protocols: grpc: endpoint: ${env:MY_POD_IP}:4317 http: endpoint: ${env:MY_POD_IP}:4318service: extensions: - health_check - memory_ballast pipelines: logs: exporters: - loki processors: - memory_limiter - k8sattributes - resource - batch receivers: - otlp - filelog# 同样只保留了和 logs 相关的配置,其他省略......
我们新增加了一个loki的导出器以及filelog接收器。
loki 导出器
该导出器是通过 HTTP 将数据导出到 Loki。该导出器可以做以下一些配置:
endpoint:Loki 的 HTTP 端点地址(如http://loki:3100/loki/api/v1/push)。default_labels_enabled(可选):允许禁用默认标签的映射:exporter、job、instance、level。如果省略default_labels_enabled,则会添加默认标签。如果在default_labels_enabled中省略了其中一个标签,则会添加该标签。如果禁用了所有默认标签,并且没有添加其他标签,则日志条目将被丢弃,因为至少需要存在一个标签才能成功将日志记录写入 Loki 中。指标otelcol_lokiexporter_send_failed_due_to_missing_labels将会显示由于未指定标签而被丢弃的日志记录数量。
Loki 导出器可以将 OTLP 资源和日志属性转换为 Loki 标签,并对其进行索引。为此,需要配置提示,指定应将哪些属性设置为标签。提示本身就是属性,在导出到 Loki 时将被忽略。以下示例使用attributes处理器提示 Loki 导出器将event.domain属性设置为标签,并使用resource处理器提示 Loki 导出器将service.name设置为标签。
processors: attributes: actions: - action: insert key: loki.attribute.labels value: event.domain resource: attributes: - action: insert key: loki.resource.labels value: service.name
除非通过default_labels_enabled设置禁用,默认标签始终会被设置。
job=service.namespace/service.nameinstance=service.instance.idexporter=OTLPlevel=severity如果service.name和service.namespace存在,那么设置job=service.namespace/service.name。如果service.name存在且service.namespace不存在,则会设置job=service.name。如果service.name不存在且service.namespace存在,则不会设置job标签。如果存在service.instance.id则设置instance=service.instance.id。如果service.instance.id不存在,则不设置instance标签。
我们这里的完整配置如下:
loki: endpoint: http://loki-gateway/loki/api/v1/push timeout: 10s # 超时时间 read_buffer_size: 200 write_buffer_size: 100 retry_on_failure: # 配置重试 enabled: true initial_interval: 10s # 初始间隔 max_interval: 60s # 最大间隔 max_elapsed_time: 10m # 最大时间
我们这里配置了超时时间,读写缓冲区大小,发送队列,重试等。
read_buffer_size和write_buffer_size字段分别指定了 OpenTelemetry 导出器的读取和写入缓冲区的大小。这些缓冲区用于在发送数据之前缓存数据,以提高发送效率和可靠性。
read_buffer_size字段指定了导出器从数据源读取数据时使用的缓冲区大小。如果数据源产生的数据量超过了缓冲区的大小,导出器将分批读取数据并将其缓存到缓冲区中,直到缓冲区被填满或数据源没有更多数据为止。
write_buffer_size字段指定了导出器将指标数据写入目标时使用的缓冲区大小。如果导出器产生的数据量超过了缓冲区的大小,导出器将分批将数据写入目标,并将其缓存到缓冲区中,直到缓冲区被填满或目标不可用为止。
通过配置这些缓冲区的大小,您可以控制 OpenTelemetry 导出器的性能和可靠性。如果您的数据源产生的数据量很大,可以增加read_buffer_size和write_buffer_size的大小,以提高导出器的吞吐量和效率。如果您的目标不太稳定或网络不太可靠,可以减小write_buffer_size的大小,以减少数据丢失的风险。
另外添加了一个resource的处理器,将k8s.namespace.name、k8s.pod.name、k8s.container.name转换为 Loki 标签,这样我们就可以在 Loki 中对其进行索引了。
resource: attributes: - action: insert key: loki.resource.labels value: k8s.namespace.name,k8s.pod.name,k8s.container.name
filelog 接收器
该接收器用于从文件中收集并解析日志数据,它会从指定的文件中读取日志数据,然后将其发送到 OpenTelemetry Collector 中。
我们这里对该接收器的配置如下所示:
filelog: exclude: - /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log include: - /var/log/pods/*/*/*.log include_file_name: false include_file_path: true operators: - id: get-format routes: - expr: body matches "^\\{" output: parser-docker - expr: body matches "^[^ Z]+ " output: parser-crio - expr: body matches "^[^ Z]+Z" output: parser-containerd type: router - id: parser-crio regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$ timestamp: layout: 2006-01-02T15:04:05.999999999Z07:00 layout_type: gotime parse_from: attributes.time type: regex_parser - combine_field: attributes.log combine_with: "" id: crio-recombine is_last_entry: attributes.logtag == 'F' max_log_size: 102400 output: extract_metadata_from_filepath source_identifier: attributes["log.file.path"] type: recombine - id: parser-containerd regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$ timestamp: layout: "%Y-%m-%dT%H:%M:%S.%LZ" parse_from: attributes.time type: regex_parser - combine_field: attributes.log combine_with: "" id: containerd-recombine is_last_entry: attributes.logtag == 'F' max_log_size: 102400 output: extract_metadata_from_filepath source_identifier: attributes["log.file.path"] type: recombine - id: parser-docker output: extract_metadata_from_filepath timestamp: layout: "%Y-%m-%dT%H:%M:%S.%LZ" parse_from: attributes.time type: json_parser - id: extract_metadata_from_filepath parse_from: attributes["log.file.path"] regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$ type: regex_parser - from: attributes.stream to: attributes["log.iostream"] type: move - from: attributes.container_name to: resource["k8s.container.name"] type: move - from: attributes.namespace to: resource["k8s.namespace.name"] type: move - from: attributes.pod_name to: resource["k8s.pod.name"] type: move - from: attributes.restart_count to: resource["k8s.container.restart_count"] type: move - from: attributes.uid to: resource["k8s.pod.uid"] type: move - from: attributes.log to: body type: move start_at: beginning
可以看到配置非常长,首先通过exclude排除一些不需要收集的日志文件,然后通过include指定了需要收集的日志文件,由于我们的 Kubernetes 集群是基于 Containerd 容器运行时的,所以采集的日志目录为/var/log/pods/*/*/*.log,然后通过include_file_path来指定是否将文件路径添加为属性log.file.path,include_file_name指定是否将文件名添加为属性log.file.name。
start_at表示在启动时,从文件的哪个位置开始读取日志。选项有beginning或end,默认为end。
然后就是最重要的operators属性,用来指定如何处理日志文件,运算符是日志处理的最基本单元。每个运算符都完成一个单一的责任,比如从文件中读取行,或者从字段中解析 JSON。然后,这些运算符被链接在一起,形成一个管道,以实现所需的结果。
例如用户可以使用file_input操作符从文件中读取日志行。然后,这个操作的结果可以发送到regex_parser操作符,根据正则表达式创建字段。最后,这些结果可以发送到file_output操作符,将日志写入到磁盘上的文件中。
我们这里首先配置了一个router操作符:
id: get-formatroutes: - expr: body matches "^\\{" output: parser-docker - expr: body matches "^[^ Z]+ " output: parser-crio - expr: body matches "^[^ Z]+Z" output: parser-containerdtype: router
该操作符允许根据日志内容动态路由日志,我们这里是 Containerd 的容器运行时,产生的日志数据可以匹配body matches “^[^ Z]+Z”,然后将数据路由到parser-containerd操作符。
id: parser-containerdregex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$timestamp: layout: "%Y-%m-%dT%H:%M:%S.%LZ" parse_from: attributes.timetype: regex_parser
parser-containerd是一个regex_parser操作符,它使用指定的正则表达式来解析前面路由过来的日志数据,然后会将结果存储在time、stream、logtag、log等属性中,并格式化 timestamp 时间戳。
接下来再通过recombine操作符将连续的日志组合成单个日志。
combine_field: attributes.logcombine_with: ""id: containerd-recombineis_last_entry: attributes.logtag == 'F'max_log_size: 102400output: extract_metadata_from_filepathsource_identifier: attributes["log.file.path"]type: recombine
经过上面处理后进入extract_metadata_from_filepath这个操作符,该操作符使用正则表达式从文件路径中提取元数据,然后将其存储在namespace、pod_name、uid、container_name、restart_count等属性中。
id: extract_metadata_from_filepathparse_from: attributes["log.file.path"]regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$type: regex_parser
接下来就是通过move操作符将一个字段从一个位置移动(或重命名)到另一个位置。
- from: attributes.stream to: attributes["log.iostream"] type: move- from: attributes.container_name to: resource["k8s.container.name"] type: move- from: attributes.namespace to: resource["k8s.namespace.name"] type: move- from: attributes.pod_name to: resource["k8s.pod.name"] type: move- from: attributes.restart_count to: resource["k8s.container.restart_count"] type: move- from: attributes.uid to: resource["k8s.pod.uid"] type: move- from: attributes.log to: body type: move
最后我们可以将 Loki 数据源添加到 Grafana 中:
Loki 数据源
然后在 Explorer 页面切换到 Loki 数据源下面就可以看到 Loki 中的日志数据了:
Loki 日志
启用 k8sobject 接收器
同样对于 Gateway 模式的采集器我们还可以去开启k8sobject接收器来采集 Kubernetes Events 数据,然后更新otel-collector-deploy-values.yaml文件:
# otel-collector-deploy-values.yamlmode: deployment# 我们只需要一个收集器 - 多了就会产生重复数据replicaCount: 1presets: clusterMetrics: enabled: true kubernetesEvents: enabled: trueconfig: exporters: loki: endpoint: http://loki-gateway/loki/api/v1/push timeout: 10s # 超时时间 read_buffer_size: 200 write_buffer_size: 100 retry_on_failure: # 配置重试 enabled: true initial_interval: 10s # 初始间隔 max_interval: 60s # 最大间隔 max_elapsed_time: 10m # 最大时间 service: pipelines: logs: exporters: - loki
然后重新更新 OpenTelemetry Collector Deployment:
$ helm upgrade --install opentelemetry-collector-cluster ./opentelemetry-collector -f otel-collector-deploy-values.yaml --namespace kube-otel --create-namespace
这里我们开启了kubernetesEvents预设,对应的配置如下所示:
k8sobjects: objects: - group: events.k8s.io mode: watch name: events
k8sobjects接收器可以用来拉取或 Watch Kubernetes API 服务器中的对象,我们这里通过group、mode、name来指定要拉取的 Kubernetes Events 对象。
最后我们也可以在 Loki 中查找到对应的 Events 日志数据。