使用 OpenTelemetry Collector 收集 Kubernetes 日志数据

作者:阳明 2023-08-31 07:46:54云计算云原生 首先我们需要部署 Loki 来收集日志数据,同样我们这里使用 Helm Chart 来快速部署,不过需要注意同样不需要部署任何日志采集器,因为我们将使用 OpenTelemetry Collector 来收集日志数据,然后再将其发送到 Loki 中。

前面我们介绍了如何通过 OpenTelemetry Collector 来收集 Kubernetes 集群的指标数据,接下来我们再来了解下如何收集集群的日志记录数据。

安装 Loki

首先我们需要部署 Loki 来收集日志数据,同样我们这里使用 Helm Chart 来快速部署,不过需要注意同样不需要部署任何日志采集器,因为我们将使用 OpenTelemetry Collector 来收集日志数据,然后再将其发送到 Loki 中。

$ helm repo add grafana https://grafana.github.io/helm-chart$ helm repo update

我们这里创建一个loki-values.yaml文件来配置 Loki Helm Chart:

# loki-values.yamlloki:  commonConfig:    replication_factor: 1  auth_enabled: false  storage:    type: "filesystem"singleBinary:  replicas: 1  persistence:    enabled: true    size: 10Gi    storageClass: cfsautomonitoring:  lokiCanary:    enabled: false  selfMonitoring:    grafanaAgent:      installOperator: falsetest:  enabled: falsegateway:  ingress:    enabled: true    ingressClassName: nginx    tls: []    hosts:      - host: loki.k8s.local        paths:          - path: /            pathType: Prefix

然后直接使用下面的命令一键部署 Loki 即可:

$ helm upgrade --install loki grafana/loki -f loki-values.yaml --namespace kube-otel$ kubectl get pods -n kube-otel -l app.kubernetes.io/instance=lokiNAME                            READY   STATUS    RESTARTS   AGEloki-0                          1/1     Running   0          3m52sloki-gateway-5ffc9fbbf5-m5q75   1/1     Running   0          8m42s$ kubectl get ingress -n kube-otelNAME                 CLASS   HOSTS               ADDRESS       PORTS   AGEloki-gateway         nginx   loki.k8s.local      10.98.12.94   80      11m

启用 filelog 接收器

接下来我们就需要配置 OpenTelemetry Collector 来将日志数据发送到 Loki 中,首先更新otel-collector-ds-values.yaml文件,我们需要添加一个 Loki 的导出器,并开启filelogreceiver接收器:

# otel-collector-ds-values.yamlmode: daemonsetpresets:  hostMetrics:    enabled: true  kubernetesAttributes:    enabled: true  kubeletMetrics:    enabled: true  # 启用 filelogreceiver 收集器  logsCollection:    enabled: trueconfig:  exporters:    loki:      endpoint: http://loki-gateway/loki/api/v1/push      timeout: 10s # 超时时间      read_buffer_size: 200      write_buffer_size: 100      retry_on_failure: # 配置重试        enabled: true        initial_interval: 10s # 初始间隔        max_interval: 60s # 最大间隔        max_elapsed_time: 10m # 最大时间      default_labels_enabled:        exporter: false  processors:    resource:      attributes:        - action: insert          key: loki.resource.labels          value: k8s.namespace.name,k8s.pod.name,k8s.container.name  service:    pipelines:      logs:        exporters:          - loki        processors:          - memory_limiter          - k8sattributes          - resource          - batch

然后重新更新 OpenTelemetry Collector DaemonSet:

$ helm upgrade --install opentelemetry-collector ./opentelemetry-collector -f otel-ds-values.yaml --namespace kube-otel --create-namespace

同样更新后查看完整的配置信息,使用命令kubectl get cm -n opentelemetry-collector-agent -oyaml:

exporters:  logging:    loglevel: debug  loki:    endpoint: http://loki-gateway/loki/api/v1/push    timeout: 10s # 超时时间    read_buffer_size: 200    write_buffer_size: 100    retry_on_failure: # 配置重试      enabled: true      initial_interval: 10s # 初始间隔      max_interval: 60s # 最大间隔      max_elapsed_time: 10m # 最大时间    default_labels_enabled:      exporter: falseextensions:  health_check: {}  memory_ballast:    size_in_percentage: 40processors:  batch: {}  k8sattributes:    extract:      metadata:        - k8s.namespace.name        - k8s.deployment.name        - k8s.statefulset.name        - k8s.daemonset.name        - k8s.cronjob.name        - k8s.job.name        - k8s.node.name        - k8s.pod.name        - k8s.pod.uid        - k8s.pod.start_time    filter:      node_from_env_var: K8S_NODE_NAME    passthrough: false    pod_association:      - sources:          - from: resource_attribute            name: k8s.pod.ip      - sources:          - from: resource_attribute            name: k8s.pod.uid      - sources:          - from: connection  memory_limiter:    check_interval: 5s    limit_percentage: 80    spike_limit_percentage: 25  resource:    attributes:      - action: insert        key: loki.resource.labels        value: k8s.namespace.name,k8s.pod.name,k8s.container.namereceivers:  filelog:    exclude:      - /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log    include:      - /var/log/pods/*/*/*.log    include_file_name: false    include_file_path: true    operators:      - id: get-format        routes:          - expr: body matches "^\\{"            output: parser-docker          - expr: body matches "^[^ Z]+ "            output: parser-crio          - expr: body matches "^[^ Z]+Z"            output: parser-containerd        type: router      - id: parser-crio        regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$        timestamp:          layout: 2006-01-02T15:04:05.999999999Z07:00          layout_type: gotime          parse_from: attributes.time        type: regex_parser      - combine_field: attributes.log        combine_with: ""        id: crio-recombine        is_last_entry: attributes.logtag == 'F'        max_log_size: 102400        output: extract_metadata_from_filepath        source_identifier: attributes["log.file.path"]        type: recombine      - id: parser-containerd        regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$        timestamp:          layout: "%Y-%m-%dT%H:%M:%S.%LZ"          parse_from: attributes.time        type: regex_parser      - combine_field: attributes.log        combine_with: ""        id: containerd-recombine        is_last_entry: attributes.logtag == 'F'        max_log_size: 102400        output: extract_metadata_from_filepath        source_identifier: attributes["log.file.path"]        type: recombine      - id: parser-docker        output: extract_metadata_from_filepath        timestamp:          layout: "%Y-%m-%dT%H:%M:%S.%LZ"          parse_from: attributes.time        type: json_parser      - id: extract_metadata_from_filepath        parse_from: attributes["log.file.path"]        regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$        type: regex_parser      - from: attributes.stream        to: attributes["log.iostream"]        type: move      - from: attributes.container_name        to: resource["k8s.container.name"]        type: move      - from: attributes.namespace        to: resource["k8s.namespace.name"]        type: move      - from: attributes.pod_name        to: resource["k8s.pod.name"]        type: move      - from: attributes.restart_count        to: resource["k8s.container.restart_count"]        type: move      - from: attributes.uid        to: resource["k8s.pod.uid"]        type: move      - from: attributes.log        to: body        type: move    start_at: beginning  otlp:    protocols:      grpc:        endpoint: ${env:MY_POD_IP}:4317      http:        endpoint: ${env:MY_POD_IP}:4318service:  extensions:    - health_check    - memory_ballast  pipelines:    logs:      exporters:        - loki      processors:        - memory_limiter        - k8sattributes        - resource        - batch      receivers:        - otlp        - filelog# 同样只保留了和 logs 相关的配置,其他省略......

我们新增加了一个loki的导出器以及filelog接收器。

loki 导出器

该导出器是通过 HTTP 将数据导出到 Loki。该导出器可以做以下一些配置:

endpoint:Loki 的 HTTP 端点地址(如http://loki:3100/loki/api/v1/push)。default_labels_enabled(可选):允许禁用默认标签的映射:exporter、job、instance、level。如果省略default_labels_enabled,则会添加默认标签。如果在default_labels_enabled中省略了其中一个标签,则会添加该标签。

如果禁用了所有默认标签,并且没有添加其他标签,则日志条目将被丢弃,因为至少需要存在一个标签才能成功将日志记录写入 Loki 中。指标otelcol_lokiexporter_send_failed_due_to_missing_labels将会显示由于未指定标签而被丢弃的日志记录数量。

Loki 导出器可以将 OTLP 资源和日志属性转换为 Loki 标签,并对其进行索引。为此,需要配置提示,指定应将哪些属性设置为标签。提示本身就是属性,在导出到 Loki 时将被忽略。以下示例使用attributes处理器提示 Loki 导出器将event.domain属性设置为标签,并使用resource处理器提示 Loki 导出器将service.name设置为标签。

processors:  attributes:    actions:      - action: insert        key: loki.attribute.labels        value: event.domain  resource:    attributes:      - action: insert        key: loki.resource.labels        value: service.name

除非通过default_labels_enabled设置禁用,默认标签始终会被设置。

job=service.namespace/service.nameinstance=service.instance.idexporter=OTLPlevel=severity

如果service.name和service.namespace存在,那么设置job=service.namespace/service.name。如果service.name存在且service.namespace不存在,则会设置job=service.name。如果service.name不存在且service.namespace存在,则不会设置job标签。如果存在service.instance.id则设置instance=service.instance.id。如果service.instance.id不存在,则不设置instance标签。

我们这里的完整配置如下:

loki:  endpoint: http://loki-gateway/loki/api/v1/push  timeout: 10s # 超时时间  read_buffer_size: 200  write_buffer_size: 100  retry_on_failure: # 配置重试    enabled: true    initial_interval: 10s # 初始间隔    max_interval: 60s # 最大间隔    max_elapsed_time: 10m # 最大时间

我们这里配置了超时时间,读写缓冲区大小,发送队列,重试等。

read_buffer_size和write_buffer_size字段分别指定了 OpenTelemetry 导出器的读取和写入缓冲区的大小。这些缓冲区用于在发送数据之前缓存数据,以提高发送效率和可靠性。

read_buffer_size字段指定了导出器从数据源读取数据时使用的缓冲区大小。如果数据源产生的数据量超过了缓冲区的大小,导出器将分批读取数据并将其缓存到缓冲区中,直到缓冲区被填满或数据源没有更多数据为止。

write_buffer_size字段指定了导出器将指标数据写入目标时使用的缓冲区大小。如果导出器产生的数据量超过了缓冲区的大小,导出器将分批将数据写入目标,并将其缓存到缓冲区中,直到缓冲区被填满或目标不可用为止。

通过配置这些缓冲区的大小,您可以控制 OpenTelemetry 导出器的性能和可靠性。如果您的数据源产生的数据量很大,可以增加read_buffer_size和write_buffer_size的大小,以提高导出器的吞吐量和效率。如果您的目标不太稳定或网络不太可靠,可以减小write_buffer_size的大小,以减少数据丢失的风险。

另外添加了一个resource的处理器,将k8s.namespace.name、k8s.pod.name、k8s.container.name转换为 Loki 标签,这样我们就可以在 Loki 中对其进行索引了。

resource:  attributes:    - action: insert      key: loki.resource.labels      value: k8s.namespace.name,k8s.pod.name,k8s.container.name

filelog 接收器

该接收器用于从文件中收集并解析日志数据,它会从指定的文件中读取日志数据,然后将其发送到 OpenTelemetry Collector 中。

我们这里对该接收器的配置如下所示:

filelog:  exclude:    - /var/log/pods/kube-otel_opentelemetry-collector*_*/opentelemetry-collector/*.log  include:    - /var/log/pods/*/*/*.log  include_file_name: false  include_file_path: true  operators:    - id: get-format      routes:        - expr: body matches "^\\{"          output: parser-docker        - expr: body matches "^[^ Z]+ "          output: parser-crio        - expr: body matches "^[^ Z]+Z"          output: parser-containerd      type: router    - id: parser-crio      regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$      timestamp:        layout: 2006-01-02T15:04:05.999999999Z07:00        layout_type: gotime        parse_from: attributes.time      type: regex_parser    - combine_field: attributes.log      combine_with: ""      id: crio-recombine      is_last_entry: attributes.logtag == 'F'      max_log_size: 102400      output: extract_metadata_from_filepath      source_identifier: attributes["log.file.path"]      type: recombine    - id: parser-containerd      regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$      timestamp:        layout: "%Y-%m-%dT%H:%M:%S.%LZ"        parse_from: attributes.time      type: regex_parser    - combine_field: attributes.log      combine_with: ""      id: containerd-recombine      is_last_entry: attributes.logtag == 'F'      max_log_size: 102400      output: extract_metadata_from_filepath      source_identifier: attributes["log.file.path"]      type: recombine    - id: parser-docker      output: extract_metadata_from_filepath      timestamp:        layout: "%Y-%m-%dT%H:%M:%S.%LZ"        parse_from: attributes.time      type: json_parser    - id: extract_metadata_from_filepath      parse_from: attributes["log.file.path"]      regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$      type: regex_parser    - from: attributes.stream      to: attributes["log.iostream"]      type: move    - from: attributes.container_name      to: resource["k8s.container.name"]      type: move    - from: attributes.namespace      to: resource["k8s.namespace.name"]      type: move    - from: attributes.pod_name      to: resource["k8s.pod.name"]      type: move    - from: attributes.restart_count      to: resource["k8s.container.restart_count"]      type: move    - from: attributes.uid      to: resource["k8s.pod.uid"]      type: move    - from: attributes.log      to: body      type: move  start_at: beginning

可以看到配置非常长,首先通过exclude排除一些不需要收集的日志文件,然后通过include指定了需要收集的日志文件,由于我们的 Kubernetes 集群是基于 Containerd 容器运行时的,所以采集的日志目录为/var/log/pods/*/*/*.log,然后通过include_file_path来指定是否将文件路径添加为属性log.file.path,include_file_name指定是否将文件名添加为属性log.file.name。

start_at表示在启动时,从文件的哪个位置开始读取日志。选项有beginning或end,默认为end。

然后就是最重要的operators属性,用来指定如何处理日志文件,运算符是日志处理的最基本单元。每个运算符都完成一个单一的责任,比如从文件中读取行,或者从字段中解析 JSON。然后,这些运算符被链接在一起,形成一个管道,以实现所需的结果。

例如用户可以使用file_input操作符从文件中读取日志行。然后,这个操作的结果可以发送到regex_parser操作符,根据正则表达式创建字段。最后,这些结果可以发送到file_output操作符,将日志写入到磁盘上的文件中。

我们这里首先配置了一个router操作符:

id: get-formatroutes:  - expr: body matches "^\\{"    output: parser-docker  - expr: body matches "^[^ Z]+ "    output: parser-crio  - expr: body matches "^[^ Z]+Z"    output: parser-containerdtype: router

该操作符允许根据日志内容动态路由日志,我们这里是 Containerd 的容器运行时,产生的日志数据可以匹配body matches “^[^ Z]+Z”,然后将数据路由到parser-containerd操作符。

id: parser-containerdregex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$timestamp:  layout: "%Y-%m-%dT%H:%M:%S.%LZ"  parse_from: attributes.timetype: regex_parser

parser-containerd是一个regex_parser操作符,它使用指定的正则表达式来解析前面路由过来的日志数据,然后会将结果存储在time、stream、logtag、log等属性中,并格式化 timestamp 时间戳。

接下来再通过recombine操作符将连续的日志组合成单个日志。

combine_field: attributes.logcombine_with: ""id: containerd-recombineis_last_entry: attributes.logtag == 'F'max_log_size: 102400output: extract_metadata_from_filepathsource_identifier: attributes["log.file.path"]type: recombine

经过上面处理后进入extract_metadata_from_filepath这个操作符,该操作符使用正则表达式从文件路径中提取元数据,然后将其存储在namespace、pod_name、uid、container_name、restart_count等属性中。

id: extract_metadata_from_filepathparse_from: attributes["log.file.path"]regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$type: regex_parser

接下来就是通过move操作符将一个字段从一个位置移动(或重命名)到另一个位置。

- from: attributes.stream  to: attributes["log.iostream"]  type: move- from: attributes.container_name  to: resource["k8s.container.name"]  type: move- from: attributes.namespace  to: resource["k8s.namespace.name"]  type: move- from: attributes.pod_name  to: resource["k8s.pod.name"]  type: move- from: attributes.restart_count  to: resource["k8s.container.restart_count"]  type: move- from: attributes.uid  to: resource["k8s.pod.uid"]  type: move- from: attributes.log  to: body  type: move

最后我们可以将 Loki 数据源添加到 Grafana 中:

Loki 数据源

然后在 Explorer 页面切换到 Loki 数据源下面就可以看到 Loki 中的日志数据了:

Loki 日志

启用 k8sobject 接收器

同样对于 Gateway 模式的采集器我们还可以去开启k8sobject接收器来采集 Kubernetes Events 数据,然后更新otel-collector-deploy-values.yaml文件:

# otel-collector-deploy-values.yamlmode: deployment# 我们只需要一个收集器 - 多了就会产生重复数据replicaCount: 1presets:  clusterMetrics:    enabled: true  kubernetesEvents:    enabled: trueconfig:  exporters:    loki:      endpoint: http://loki-gateway/loki/api/v1/push      timeout: 10s # 超时时间      read_buffer_size: 200      write_buffer_size: 100      retry_on_failure: # 配置重试        enabled: true        initial_interval: 10s # 初始间隔        max_interval: 60s # 最大间隔        max_elapsed_time: 10m # 最大时间  service:    pipelines:      logs:        exporters:          - loki

然后重新更新 OpenTelemetry Collector Deployment:

$ helm upgrade --install opentelemetry-collector-cluster ./opentelemetry-collector -f otel-collector-deploy-values.yaml --namespace kube-otel --create-namespace

这里我们开启了kubernetesEvents预设,对应的配置如下所示:

k8sobjects:  objects:    - group: events.k8s.io      mode: watch      name: events

k8sobjects接收器可以用来拉取或 Watch Kubernetes API 服务器中的对象,我们这里通过group、mode、name来指定要拉取的 Kubernetes Events 对象。

最后我们也可以在 Loki 中查找到对应的 Events 日志数据。