Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sink OTLP] cannot ingest log batches #22188

Open
blezoray opened this issue Jan 13, 2025 · 16 comments
Open

[sink OTLP] cannot ingest log batches #22188

blezoray opened this issue Jan 13, 2025 · 16 comments
Assignees
Labels
sink: opentelemetry Anything `opentelemetry` sink related type: bug A code related bug.

Comments

@blezoray
Copy link

blezoray commented Jan 13, 2025

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

Hello,

I'm using an Openshift Cluster 4.15 with the logging stack configured with vector to collect logs on each node and forward them to a namespace vector, through HTTP source/sink.
My namespace vector send logs to a local Grafana Loki using the Loki sink and it works fine.
But, when I try to use the OTLP sink that target the OTLP Loki api, I regularly lost some messages, about 10%.

I have an application that generates 1 msg/s to a kafka topic and it generate 1 log/msg.
Vector and Loki don't report any error.

Is there something in my configuration that could explain this behavior ?

Rgds.

Configuration

api:
  address: 127.0.0.1:8686
  enabled: true
  playground: false
data_dir: /vector-data-dir
sinks:
  otelhttp:
    inputs:
    - manage_labels
    protocol:
      encoding:
        codec: json
      framing:
        method: newline_delimited
      method: post
      request:
        headers:
          content-type: application/json
      type: http
      uri: http://loki:3100/otlp/v1/logs
    type: opentelemetry
sources:
  http_server:
    address: 0.0.0.0:8443
    decoding:
      codec: json
    tls:
      ca_file: /etc/vector/tls/tls.crt
      crt_file: /etc/vector/tls/tls.crt
      enabled: true
      key_file: /etc/vector/tls/tls.key
      verify_certificate: false
      verify_hostname: true
    type: http_server
transforms:
  manage_labels:
    inputs:
    - remove_vector_logs
    source: |-
      # OTLP logs samples: https://github.com/open-telemetry/opentelemetry-proto/blob/main/examples/logs.json
      del(.kubernetes.annotations)
      del(.kubernetes.namespace_labels)
      del(.openshift)
      del(.path)

      # timestamp: 2024-04-11T13:38:41.844320191Z
      .timeunixnano = to_unix_timestamp!(.timestamp, unit: "nanoseconds")

      # Resource attributes
      .att.service_name.key = "service.name"
      if !is_null(.kubernetes.labels.app_kubernetes_io_name) {
        .att.service_name.value.stringValue = .kubernetes.labels.app_kubernetes_io_name
      } else {
        .att.service_name.value.stringValue = "unknown"
      }

      .att.namespace_name.key = "k8s.namespace.name"
      .att.namespace_name.value.stringValue = .kubernetes.namespace_name

      .att.pod_name.key = "k8s.pod.name"
      .att.pod_name.value.stringValue = .kubernetes.pod_name

      .att.container_name.key = "k8s.container.name"
      .att.container_name.value.stringValue = .kubernetes.container_name

      .resource.attributes = array([.att.service_name, .att.namespace_name, .att.pod_name, .att.container_name])

      # Attributes: only service.name
      .attritubes = array([.att.service_name])

      # scopeLogs
      .scope.name = "Vector"
      .scope.version = "0.43.0"
      .logRecords = array([{"timeUnixNano": .timeunixnano, "attributes": .attritubes, "body": { "stringValue": .message }, "service_name": .att.service_name.value.stringValue, "severityText": .level, "k8s": .kubernetes}])
      .scopeLogs = array([{"scope": .scope, "logRecords": .logRecords}])
      del(.att)
      del(.attritubes)
      del(.scope)
      del(.logRecords)
      del(.timeunixnano)

      # resourceLogs
      .resourceLog.resource.attributes = .resource.attributes
      .resourceLog.scopeLogs = .scopeLogs
      .resourceLogs = array([.resourceLog])
      del(.resourceLog)

      # cleanup
      del(.kubernetes)
      del(.hostname)
      del(.body)
      del(.resource)
      del(.scopeLogs)
      del(.source_type)
      del(.message)
      del(.log_type)
      del(.level)
      del(.@timestamp)
      del(.timestamp)
    type: remap
  remove_vector_logs:
    condition:
      source: .kubernetes.labels.app != "vector"
      type: vrl
    inputs:
    - http_server
    type: filter

Version

0.43.1-distroless-libc

Debug Output

2025-01-13T09:39:41.403627Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2025-01-13T09:39:41.403706Z  INFO vector::app: Log level is enabled. level="debug"
2025-01-13T09:39:41.403851Z DEBUG vector::app: messaged="Building runtime." worker_threads=1
2025-01-13T09:39:41.404522Z  INFO vector::app: Loading configs. paths=["/etc/vector"]
2025-01-13T09:39:41.407149Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2025-01-13T09:39:41.507247Z DEBUG vector::topology::builder: Building new source. component=http_server
2025-01-13T09:39:41.605401Z DEBUG vector::topology::builder: Building new transform. component=manage_labels
2025-01-13T09:39:41.708991Z DEBUG vector::topology::builder: Building new transform. component=remove_vector_logs
2025-01-13T09:39:41.709152Z DEBUG vector::topology::builder: Building new sink. component=otelhttp
2025-01-13T09:39:41.800780Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}: vector_core::tls::settings: Fetching system root certs.
2025-01-13T09:39:41.813536Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}: vector_core::tls::settings: Fetching system root certs.
2025-01-13T09:39:41.908727Z  INFO vector::topology::running: Running healthchecks.
2025-01-13T09:39:41.908788Z DEBUG vector::topology::running: Connecting changed/added component(s).
2025-01-13T09:39:41.908831Z DEBUG vector::topology::running: Configuring outputs for source. component=http_server
2025-01-13T09:39:41.908855Z DEBUG vector::topology::running: Configuring output for component. component=http_server output_id=None
2025-01-13T09:39:41.908865Z DEBUG vector::topology::running: Configuring outputs for transform. component=remove_vector_logs
2025-01-13T09:39:41.908874Z DEBUG vector::topology::running: Configuring output for component. component=remove_vector_logs output_id=None
2025-01-13T09:39:41.908880Z DEBUG vector::topology::running: Configuring outputs for transform. component=manage_labels
2025-01-13T09:39:41.908888Z DEBUG vector::topology::running: Configuring output for component. component=manage_labels output_id=None
2025-01-13T09:39:41.908898Z DEBUG vector::topology::running: Connecting inputs for transform. component=remove_vector_logs
2025-01-13T09:39:41.908917Z DEBUG vector::topology::running: Adding component input to fanout. component=remove_vector_logs fanout_id=http_server
2025-01-13T09:39:41.908938Z DEBUG vector::topology::running: Connecting inputs for transform. component=manage_labels
2025-01-13T09:39:41.908938Z  INFO vector::topology::builder: Healthcheck passed.
2025-01-13T09:39:41.908946Z DEBUG vector::topology::running: Adding component input to fanout. component=manage_labels fanout_id=remove_vector_logs
2025-01-13T09:39:41.908958Z DEBUG vector::topology::running: Connecting inputs for sink. component=otelhttp
2025-01-13T09:39:41.908975Z DEBUG vector::topology::running: Adding component input to fanout. component=otelhttp fanout_id=manage_labels
2025-01-13T09:39:41.909014Z DEBUG vector::topology::running: Spawning new source. key=http_server
2025-01-13T09:39:41.909058Z DEBUG vector::topology::running: Spawning new transform. key=remove_vector_logs
2025-01-13T09:39:41.909088Z DEBUG vector::topology::running: Spawning new transform. key=manage_labels
2025-01-13T09:39:41.909173Z  INFO vector: Vector has started. debug="false" version="0.43.1" arch="x86_64" revision="e30bf1f 2024-12-10 16:14:47.175528383"
2025-01-13T09:39:41.909406Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}: vector::topology::builder: Source pump supervisor starting.
2025-01-13T09:39:41.909435Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}: vector::topology::builder: Source pump starting.
2025-01-13T09:39:41.909447Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}: vector::topology::builder: Source starting.
2025-01-13T09:39:41.909493Z  INFO source{component_kind="source" component_id=http_server component_type=http_server}: vector::sources::util::http::prelude: Building HTTP server. address=0.0.0.0:8443
2025-01-13T09:39:41.996295Z  INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=off graphql=http://127.0.0.1:8686/graphql
2025-01-13T09:39:41.998113Z DEBUG transform{component_kind="transform" component_id=remove_vector_logs component_type=filter}: vector::topology::builder: Synchronous transform starting.
2025-01-13T09:39:41.998222Z DEBUG transform{component_kind="transform" component_id=manage_labels component_type=remap}: vector::topology::builder: Synchronous transform starting.
2025-01-13T09:39:41.998246Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}: vector::topology::builder: Sink starting.
2025-01-13T09:39:41.999524Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}: vector::utilization: utilization=0.06815665099773727
2025-01-13T09:39:42.330697Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:42.330732Z DEBUG hyper::proto::h1::conn: incoming body is content-length (2125 bytes)
2025-01-13T09:39:42.330798Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::internal_events::http: Received HTTP request. internal_log_rate_limit=true
2025-01-13T09:39:42.330851Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:42.330900Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:42.330913Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "2125"}
2025-01-13T09:39:42.331116Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:43.265602Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:43.265624Z DEBUG hyper::proto::h1::conn: incoming body is content-length (10553 bytes)
2025-01-13T09:39:43.265658Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::internal_events::http: Internal log [Received HTTP request.] is being suppressed to avoid flooding.
2025-01-13T09:39:43.265712Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:43.265747Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:43.265754Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "10553"}
2025-01-13T09:39:43.266063Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:43.280497Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:43.280526Z DEBUG hyper::proto::h1::conn: incoming body is content-length (6071 bytes)
2025-01-13T09:39:43.280625Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:43.280669Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:43.280688Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "6071"}
2025-01-13T09:39:43.280979Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:43.333420Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=1}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[12163 bytes]
2025-01-13T09:39:43.334558Z DEBUG hyper::client::connect::dns: resolving host="loki"
2025-01-13T09:39:43.337713Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=1}:http: hyper::client::connect::http: connecting to 172.30.182.236:3100
2025-01-13T09:39:43.339419Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=1}:http: hyper::client::connect::http: connected to 172.30.182.236:3100
2025-01-13T09:39:43.339714Z DEBUG hyper::proto::h1::io: flushed 12397 bytes
2025-01-13T09:39:43.342068Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:43.342149Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:43.342237Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=1}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:43.342266Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=1}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:43 GMT"} body=[empty]
2025-01-13T09:39:43.358672Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:43.358701Z DEBUG hyper::proto::h1::conn: incoming body is content-length (12987 bytes)
2025-01-13T09:39:43.358775Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:43.358793Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:43.358820Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "12987"}
2025-01-13T09:39:43.359201Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:44.113785Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:44.113826Z DEBUG hyper::proto::h1::conn: incoming body is content-length (6079 bytes)
2025-01-13T09:39:44.113885Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:44.113916Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:44.113924Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "6079"}
2025-01-13T09:39:44.114161Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:44.286922Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:44.286947Z DEBUG hyper::proto::h1::conn: incoming body is content-length (10553 bytes)
2025-01-13T09:39:44.287004Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:44.287034Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:44.287043Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "10553"}
2025-01-13T09:39:44.287338Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:44.308580Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:44.308609Z DEBUG hyper::proto::h1::conn: incoming body is content-length (6071 bytes)
2025-01-13T09:39:44.308686Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:44.308722Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:44.308739Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "6071"}
2025-01-13T09:39:44.309056Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:44.361385Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=2}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[23525 bytes]
2025-01-13T09:39:44.361437Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=2}:http: hyper::client::pool: reuse idle connection for ("http", loki:3100)
2025-01-13T09:39:44.361593Z DEBUG hyper::proto::h1::io: flushed 23759 bytes
2025-01-13T09:39:44.362902Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:44.362917Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:44.362946Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=2}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:44.362960Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=2}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:44 GMT"} body=[empty]
2025-01-13T09:39:44.389457Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:44.389480Z DEBUG hyper::proto::h1::conn: incoming body is content-length (2125 bytes)
2025-01-13T09:39:44.389534Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:44.389572Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:44.389580Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "2125"}
2025-01-13T09:39:44.389724Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:45.138962Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:45.138989Z DEBUG hyper::proto::h1::conn: incoming body is content-length (6079 bytes)
2025-01-13T09:39:45.139045Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:45.139070Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:45.139078Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "6079"}
2025-01-13T09:39:45.139277Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:45.322358Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:45.322383Z DEBUG hyper::proto::h1::conn: incoming body is content-length (10553 bytes)
2025-01-13T09:39:45.322438Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:45.322468Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:45.322476Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "10553"}
2025-01-13T09:39:45.322824Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:45.391852Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=3}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[12165 bytes]
2025-01-13T09:39:45.391900Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=3}:http: hyper::client::pool: reuse idle connection for ("http", loki:3100)
2025-01-13T09:39:45.392037Z DEBUG hyper::proto::h1::io: flushed 12399 bytes
2025-01-13T09:39:45.393160Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:45.393176Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:45.393212Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=3}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:45.393231Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=3}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:45 GMT"} body=[empty]
2025-01-13T09:39:45.421132Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:45.421156Z DEBUG hyper::proto::h1::conn: incoming body is content-length (2125 bytes)
2025-01-13T09:39:45.421215Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:45.421243Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:45.421251Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "2125"}
2025-01-13T09:39:45.421400Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:46.355972Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:46.356002Z DEBUG hyper::proto::h1::conn: incoming body is content-length (10553 bytes)
2025-01-13T09:39:46.356068Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:46.356097Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:46.356106Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "10553"}
2025-01-13T09:39:46.356386Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:46.422372Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=4}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[8237 bytes]
2025-01-13T09:39:46.422420Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=4}:http: hyper::client::pool: reuse idle connection for ("http", loki:3100)
2025-01-13T09:39:46.422615Z DEBUG hyper::proto::h1::io: flushed 8470 bytes
2025-01-13T09:39:46.423944Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:46.423962Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:46.423992Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=4}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:46.424010Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=4}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:46 GMT"} body=[empty]
2025-01-13T09:39:46.452937Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:46.452961Z DEBUG hyper::proto::h1::conn: incoming body is content-length (4249 bytes)
2025-01-13T09:39:46.453021Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:46.453055Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:46.453064Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "4249"}
2025-01-13T09:39:46.453250Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:46.978672Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:46.978697Z DEBUG hyper::proto::h1::conn: incoming body is content-length (6087 bytes)
2025-01-13T09:39:46.978749Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:46.978771Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:46.978780Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "6087"}
2025-01-13T09:39:46.979012Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:46.979050Z DEBUG transform{component_kind="transform" component_id=remove_vector_logs component_type=filter}: vector::utilization: utilization=0.05497128223824865
2025-01-13T09:39:46.979110Z DEBUG transform{component_kind="transform" component_id=manage_labels component_type=remap}: vector::utilization: utilization=0.05574322054690628
2025-01-13T09:39:46.999529Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}: vector::utilization: utilization=0.006822280616706687
2025-01-13T09:39:47.390524Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:47.390550Z DEBUG hyper::proto::h1::conn: incoming body is content-length (10553 bytes)
2025-01-13T09:39:47.390604Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:47.390629Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:47.390637Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "10553"}
2025-01-13T09:39:47.391012Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:47.455076Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=5}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[13553 bytes]
2025-01-13T09:39:47.455123Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=5}:http: hyper::client::pool: reuse idle connection for ("http", loki:3100)
2025-01-13T09:39:47.455262Z DEBUG hyper::proto::h1::io: flushed 13787 bytes
2025-01-13T09:39:47.456271Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:47.456357Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:47.456399Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=5}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:47.456421Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=5}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:47 GMT"} body=[empty]
2025-01-13T09:39:48.008663Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:48.008687Z DEBUG hyper::proto::h1::conn: incoming body is content-length (2125 bytes)
2025-01-13T09:39:48.008755Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:48.008824Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:48.008841Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "2125"}
2025-01-13T09:39:48.009070Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:48.421500Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:48.421529Z DEBUG hyper::proto::h1::conn: incoming body is content-length (10553 bytes)
2025-01-13T09:39:48.421595Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:48.421623Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:48.421632Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "10553"}
2025-01-13T09:39:48.421954Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:49.010428Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=6}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[8237 bytes]
2025-01-13T09:39:49.010489Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=6}:http: hyper::client::pool: reuse idle connection for ("http", loki:3100)
2025-01-13T09:39:49.010700Z DEBUG hyper::proto::h1::io: flushed 8470 bytes
2025-01-13T09:39:49.011795Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:49.011866Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:49.011903Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=6}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:49.011927Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=6}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:49 GMT"} body=[empty]
2025-01-13T09:39:49.040116Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:49.040140Z DEBUG hyper::proto::h1::conn: incoming body is content-length (2125 bytes)
2025-01-13T09:39:49.040205Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:49.040244Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:49.040256Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "2125"}
2025-01-13T09:39:49.040412Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:49.455027Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:49.455052Z DEBUG hyper::proto::h1::conn: incoming body is content-length (21105 bytes)
2025-01-13T09:39:49.455159Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:49.455185Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:49.455194Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "21105"}
2025-01-13T09:39:49.455589Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:50.041689Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=7}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[15089 bytes]
2025-01-13T09:39:50.041866Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=7}:http: hyper::client::pool: reuse idle connection for ("http", loki:3100)
2025-01-13T09:39:50.042057Z DEBUG hyper::proto::h1::io: flushed 15323 bytes
2025-01-13T09:39:50.043225Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:50.043306Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:50.043346Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=7}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:50.043365Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=7}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:50 GMT"} body=[empty]
2025-01-13T09:39:50.072686Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:50.072711Z DEBUG hyper::proto::h1::conn: incoming body is content-length (2125 bytes)
2025-01-13T09:39:50.072765Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:50.072837Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:50.072851Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "2125"}
2025-01-13T09:39:50.073057Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:51.007331Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:51.007361Z DEBUG hyper::proto::h1::conn: incoming body is content-length (10553 bytes)
2025-01-13T09:39:51.007435Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:51.007470Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:51.007484Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "10553"}
2025-01-13T09:39:51.007890Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:51.075143Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=8}:http: vector::internal_events::http_client: Sending HTTP request. uri=http://loki:3100/otlp/v1/logs method=POST version=HTTP/1.1 headers={"content-type": "application/json", "accept-encoding": "zstd,gzip,deflate,br", "user-agent": "Vector/0.43.1 (x86_64-unknown-linux-gnu e30bf1f 2024-12-10 16:14:47.175528383)"} body=[8237 bytes]
2025-01-13T09:39:51.075195Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=8}:http: hyper::client::pool: reuse idle connection for ("http", loki:3100)
2025-01-13T09:39:51.075371Z DEBUG hyper::proto::h1::io: flushed 8470 bytes
2025-01-13T09:39:51.076366Z DEBUG hyper::proto::h1::io: parsed 1 headers
2025-01-13T09:39:51.076381Z DEBUG hyper::proto::h1::conn: incoming body is empty
2025-01-13T09:39:51.076412Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=8}:http: hyper::client::pool: pooling idle connection for ("http", loki:3100)
2025-01-13T09:39:51.076429Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}:request{request_id=8}:http: vector::internal_events::http_client: HTTP response. status=204 No Content version=HTTP/1.1 headers={"date": "Mon, 13 Jan 2025 09:39:51 GMT"} body=[empty]
2025-01-13T09:39:51.106723Z DEBUG hyper::proto::h1::io: parsed 5 headers
2025-01-13T09:39:51.106750Z DEBUG hyper::proto::h1::conn: incoming body is content-length (2125 bytes)
2025-01-13T09:39:51.106832Z DEBUG hyper::proto::h1::conn: incoming body completed
2025-01-13T09:39:51.106872Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: warp::filters::query: route was called without a query string, defaulting to empty
2025-01-13T09:39:51.106884Z DEBUG source{component_kind="source" component_id=http_server component_type=http_server}:http-request{method=POST path=/}: vector::sources::util::http::prelude: Handling HTTP request. headers={"content-type": "application/json", "accept-encoding": "gzip", "user-agent": "Vector/0.34.1 (x86_64-unknown-linux-gnu)", "host": "vector-orange-vector.mpms-kafka-test.svc:8443", "content-length": "2125"}
2025-01-13T09:39:51.107070Z DEBUG hyper::proto::h1::io: flushed 75 bytes
2025-01-13T09:39:51.999612Z DEBUG sink{component_kind="sink" component_id=otelhttp component_type=opentelemetry}: vector::utilization: utilization=0.0006860070999672604

Example Data

# Source
{"kubernetes_io_metadata_name":"mpms-kafka-test","openshift-pipelines_tekton_dev_namespace-reconcile-version":"1.15.2","pod-security_kubernetes_io_audit":"baseline","pod-security_kubernetes_io_audit-version":"v1.24","pod-security_kubernetes_io_warn":"baseline","pod-security_kubernetes_io_warn-version":"v1.24"},"namespace_name":"mpms-kafka-test","pod_id":"2546f1d9-1e19-463a-98ed-98eccdc86e81","pod_ip":"10.128.4.83","pod_name":"java-kafka-producer-f674dd585-mqcs6","pod_owner":"ReplicaSet/java-kafka-producer-f674dd585"},"level":"info","log_type":"application","message":"2025-01-13 09:46:03 INFO  KafkaProducerExample:58 - Sending messages \"Hello world - 939\"","openshift":{"cluster_id":"6e61fd67-fbcc-4019-99bb-ec3acbe339e7","sequence":1736761563411623053},"path":"/","source_type":"http_server","timestamp":"2025-01-13T09:46:04.414169484Z"}
{"@timestamp":"2025-01-13T09:46:03.168964003Z","hostname":"paas-osd-mpms1-vxwkv-worker-b-blpkh","kubernetes":{"annotations":{"k8s.ovn.org/pod-networks":"{\"default\":{\"ip_addresses\":[\"10.130.4.117/23\"],\"mac_address\":\"0a:58:0a:82:04:75\",\"gateway_ips\":[\"10.130.4.1\"],\"routes\":[{\"dest\":\"10.128.0.0/14\",\"nextHop\":\"10.130.4.1\"},{\"dest\":\"172.30.0.0/16\",\"nextHop\":\"10.130.4.1\"},{\"dest\":\"100.64.0.0/16\",\"nextHop\":\"10.130.4.1\"}],\"ip_address\":\"10.130.4.117/23\",\"gateway_ip\":\"10.130.4.1\"}}","k8s.v1.cni.cncf.io/network-status":"[{\n    \"name\": \"ovn-kubernetes\",\n    \"interface\": \"eth0\",\n    \"ips\": [\n        \"10.130.4.117\"\n    ],\n    \"mac\": \"0a:58:0a:82:04:75\",\n    \"default\": true,\n    \"dns\": {}\n}]","openshift.io/scc":"restricted-v2","seccomp.security.alpha.kubernetes.io/pod":"runtime/default"},"container_id":"cri-o://ca1cba0a5e455954212853e6c761661e9942105eb7045b7d0fe9175984dc9d46","container_image":"quay.io/strimzi-examples/java-kafka-consumer:latest","container_image_id":"quay.io/strimzi-examples/java-kafka-consumer@sha256:275d91e566ccab13d4c035111662a375640e3451bf3671ac19b0e999e045096e","container_name":"java-kafka-consumer","labels":{"app":"java-kafka-consumer","domain":"appli","pod-template-hash":"688c77d78c"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_labels":{"kubernetes_io_metadata_name":"mpms-kafka-test","openshift-pipelines_tekton_dev_namespace-reconcile-version":"1.15.2","pod-security_kubernetes_io_audit":"baseline","pod-security_kubernetes_io_audit-version":"v1.24","pod-security_kubernetes_io_warn":"baseline","pod-security_kubernetes_io_warn-version":"v1.24"},"namespace_name":"mpms-kafka-test","pod_id":"d97620a1-6e75-4493-91a9-8ea27d39d5ba","pod_ip":"10.130.4.117","pod_name":"java-kafka-consumer-688c77d78c-fsdms","pod_owner":"ReplicaSet/java-kafka-consumer-688c77d78c"},"level":"info","log_type":"application","message":"2025-01-13 09:46:03 INFO  KafkaConsumerExample:52 - Received message:","openshift":{"cluster_id":"6e61fd67-fbcc-4019-99bb-ec3acbe339e7","sequence":1736761563622187830},"path":"/","source_type":"http_server","timestamp":"2025-01-13T09:46:04.625303692Z"}
{"@timestamp":"2025-01-13T09:46:03.168964003Z","hostname":"paas-osd-mpms1-vxwkv-worker-b-blpkh","kubernetes":{"annotations":{"k8s.ovn.org/pod-networks":"{\"default\":{\"ip_addresses\":[\"10.130.4.117/23\"],\"mac_address\":\"0a:58:0a:82:04:75\",\"gateway_ips\":[\"10.130.4.1\"],\"routes\":[{\"dest\":\"10.128.0.0/14\",\"nextHop\":\"10.130.4.1\"},{\"dest\":\"172.30.0.0/16\",\"nextHop\":\"10.130.4.1\"},{\"dest\":\"100.64.0.0/16\",\"nextHop\":\"10.130.4.1\"}],\"ip_address\":\"10.130.4.117/23\",\"gateway_ip\":\"10.130.4.1\"}}","k8s.v1.cni.cncf.io/network-status":"[{\n    \"name\": \"ovn-kubernetes\",\n    \"interface\": \"eth0\",\n    \"ips\": [\n        \"10.130.4.117\"\n    ],\n    \"mac\": \"0a:58:0a:82:04:75\",\n    \"default\": true,\n    \"dns\": {}\n}]","openshift.io/scc":"restricted-v2","seccomp.security.alpha.kubernetes.io/pod":"runtime/default"},"container_id":"cri-o://ca1cba0a5e455954212853e6c761661e9942105eb7045b7d0fe9175984dc9d46","container_image":"quay.io/strimzi-examples/java-kafka-consumer:latest","container_image_id":"quay.io/strimzi-examples/java-kafka-consumer@sha256:275d91e566ccab13d4c035111662a375640e3451bf3671ac19b0e999e045096e","container_name":"java-kafka-consumer","labels":{"app":"java-kafka-consumer","domain":"appli","pod-template-hash":"688c77d78c"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_labels":{"kubernetes_io_metadata_name":"mpms-kafka-test","openshift-pipelines_tekton_dev_namespace-reconcile-version":"1.15.2","pod-security_kubernetes_io_audit":"baseline","pod-security_kubernetes_io_audit-version":"v1.24","pod-security_kubernetes_io_warn":"baseline","pod-security_kubernetes_io_warn-version":"v1.24"},"namespace_name":"mpms-kafka-test","pod_id":"d97620a1-6e75-4493-91a9-8ea27d39d5ba","pod_ip":"10.130.4.117","pod_name":"java-kafka-consumer-688c77d78c-fsdms","pod_owner":"ReplicaSet/java-kafka-consumer-688c77d78c"},"level":"info","log_type":"application","message":"2025-01-13 09:46:03 INFO  KafkaConsumerExample:53 - \tpartition: 7","openshift":{"cluster_id":"6e61fd67-fbcc-4019-99bb-ec3acbe339e7","sequence":1736761563622263913},"path":"/","source_type":"http_server","timestamp":"2025-01-13T09:46:04.625303692Z"}
{"@timestamp":"2025-01-13T09:46:03.168964003Z","hostname":"paas-osd-mpms1-vxwkv-worker-b-blpkh","kubernetes":{"annotations":{"k8s.ovn.org/pod-networks":"{\"default\":{\"ip_addresses\":[\"10.130.4.117/23\"],\"mac_address\":\"0a:58:0a:82:04:75\",\"gateway_ips\":[\"10.130.4.1\"],\"routes\":[{\"dest\":\"10.128.0.0/14\",\"nextHop\":\"10.130.4.1\"},{\"dest\":\"172.30.0.0/16\",\"nextHop\":\"10.130.4.1\"},{\"dest\":\"100.64.0.0/16\",\"nextHop\":\"10.130.4.1\"}],\"ip_address\":\"10.130.4.117/23\",\"gateway_ip\":\"10.130.4.1\"}}","k8s.v1.cni.cncf.io/network-status":"[{\n    \"name\": \"ovn-kubernetes\",\n    \"interface\": \"eth0\",\n    \"ips\": [\n        \"10.130.4.117\"\n    ],\n    \"mac\": \"0a:58:0a:82:04:75\",\n    \"default\": true,\n    \"dns\": {}\n}]","openshift.io/scc":"restricted-v2","seccomp.security.alpha.kubernetes.io/pod":"runtime/default"},"container_id":"cri-o://ca1cba0a5e455954212853e6c761661e9942105eb7045b7d0fe9175984dc9d46","container_image":"quay.io/strimzi-examples/java-kafka-consumer:latest","container_image_id":"quay.io/strimzi-examples/java-kafka-consumer@sha256:275d91e566ccab13d4c035111662a375640e3451bf3671ac19b0e999e045096e","container_name":"java-kafka-consumer","labels":{"app":"java-kafka-consumer","domain":"appli","pod-template-hash":"688c77d78c"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_labels":{"kubernetes_io_metadata_name":"mpms-kafka-test","openshift-pipelines_tekton_dev_namespace-reconcile-version":"1.15.2","pod-security_kubernetes_io_audit":"baseline","pod-security_kubernetes_io_audit-version":"v1.24","pod-security_kubernetes_io_warn":"baseline","pod-security_kubernetes_io_warn-version":"v1.24"},"namespace_name":"mpms-kafka-test","pod_id":"d97620a1-6e75-4493-91a9-8ea27d39d5ba","pod_ip":"10.130.4.117","pod_name":"java-kafka-consumer-688c77d78c-fsdms","pod_owner":"ReplicaSet/java-kafka-consumer-688c77d78c"},"level":"info","log_type":"application","message":"2025-01-13 09:46:03 INFO  KafkaConsumerExample:54 - \toffset: 6875","openshift":{"cluster_id":"6e61fd67-fbcc-4019-99bb-ec3acbe339e7","sequence":1736761563622311674},"path":"/","source_type":"http_server","timestamp":"2025-01-13T09:46:04.625303692Z"}

Sink

{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"zookeeper"}},{"key":"k8s.namespace.name","value":{"stringValue":"mpms-kafka-test"}},{"key":"k8s.pod.name","value":{"stringValue":"my-cluster-zookeeper-1"}},{"key":"k8s.container.name","value":{"stringValue":"zookeeper"}}]},"scopeLogs":[{"logRecords":[{"attributes":[{"key":"service.name","value":{"stringValue":"zookeeper"}}],"body":{"stringValue":"2025-01-13 09:43:45,938 INFO Processing ruok command from /127.0.0.1:33988 (org.apache.zookeeper.server.NettyServerCnxn) [nioEventLoopGroup-4-1]"},"k8s":{"container_id":"cri-o://06ea458325dfbfa4c5aa5ecb5cf65045b82ad2aac8b94c981f5a29117fe9efa4","container_image":"quay.io/strimzi/kafka@sha256:184b10b80953c8f0895f5b84c8be481cd2a36884b837651170b816fc10eb94b2","container_image_id":"quay.io/strimzi/kafka@sha256:1262759b72e95023631d1bd3cedfd2fc88ea07d8a3b4aedbb3ced582000a8b57","container_name":"zookeeper","labels":{"app_kubernetes_io_instance":"my-cluster","app_kubernetes_io_managed-by":"strimzi-cluster-operator","app_kubernetes_io_name":"zookeeper","app_kubernetes_io_part-of":"strimzi-my-cluster","domain":"appli","name":"test-kafka","productname":"kafka","protocol":"http","setname":"my-cluster","statefulset_kubernetes_io_pod-name":"my-cluster-zookeeper-1","strimzi_io_cluster":"my-cluster","strimzi_io_component-type":"zookeeper","strimzi_io_controller":"strimzipodset","strimzi_io_controller-name":"my-cluster-zookeeper","strimzi_io_kind":"Kafka","strimzi_io_name":"my-cluster-zookeeper","strimzi_io_pod-name":"my-cluster-zookeeper-1"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_name":"mpms-kafka-test","pod_id":"f27e215b-3ee6-4f80-8e2c-955aabf62214","pod_ip":"10.131.0.128","pod_name":"my-cluster-zookeeper-1","pod_owner":"StrimziPodSet/my-cluster-zookeeper"},"service_name":"zookeeper","severityText":"info","timeUnixNano":1736761427723911765}],"scope":{"name":"Vector","version":"0.43.0"}}]}]}
{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"zookeeper"}},{"key":"k8s.namespace.name","value":{"stringValue":"mpms-kafka-test"}},{"key":"k8s.pod.name","value":{"stringValue":"my-cluster-zookeeper-1"}},{"key":"k8s.container.name","value":{"stringValue":"zookeeper"}}]},"scopeLogs":[{"logRecords":[{"attributes":[{"key":"service.name","value":{"stringValue":"zookeeper"}}],"body":{"stringValue":"2025-01-13 09:43:46,030 INFO Processing ruok command from /127.0.0.1:34000 (org.apache.zookeeper.server.NettyServerCnxn) [nioEventLoopGroup-4-2]"},"k8s":{"container_id":"cri-o://06ea458325dfbfa4c5aa5ecb5cf65045b82ad2aac8b94c981f5a29117fe9efa4","container_image":"quay.io/strimzi/kafka@sha256:184b10b80953c8f0895f5b84c8be481cd2a36884b837651170b816fc10eb94b2","container_image_id":"quay.io/strimzi/kafka@sha256:1262759b72e95023631d1bd3cedfd2fc88ea07d8a3b4aedbb3ced582000a8b57","container_name":"zookeeper","labels":{"app_kubernetes_io_instance":"my-cluster","app_kubernetes_io_managed-by":"strimzi-cluster-operator","app_kubernetes_io_name":"zookeeper","app_kubernetes_io_part-of":"strimzi-my-cluster","domain":"appli","name":"test-kafka","productname":"kafka","protocol":"http","setname":"my-cluster","statefulset_kubernetes_io_pod-name":"my-cluster-zookeeper-1","strimzi_io_cluster":"my-cluster","strimzi_io_component-type":"zookeeper","strimzi_io_controller":"strimzipodset","strimzi_io_controller-name":"my-cluster-zookeeper","strimzi_io_kind":"Kafka","strimzi_io_name":"my-cluster-zookeeper","strimzi_io_pod-name":"my-cluster-zookeeper-1"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_name":"mpms-kafka-test","pod_id":"f27e215b-3ee6-4f80-8e2c-955aabf62214","pod_ip":"10.131.0.128","pod_name":"my-cluster-zookeeper-1","pod_owner":"StrimziPodSet/my-cluster-zookeeper"},"service_name":"zookeeper","severityText":"info","timeUnixNano":1736761427723911765}],"scope":{"name":"Vector","version":"0.43.0"}}]}]}
{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"unknown"}},{"key":"k8s.namespace.name","value":{"stringValue":"mpms-kafka-test"}},{"key":"k8s.pod.name","value":{"stringValue":"java-kafka-producer-f674dd585-mqcs6"}},{"key":"k8s.container.name","value":{"stringValue":"java-kafka-producer"}}]},"scopeLogs":[{"logRecords":[{"attributes":[{"key":"service.name","value":{"stringValue":"unknown"}}],"body":{"stringValue":"2025-01-13 09:43:47 INFO  KafkaProducerExample:58 - Sending messages \"Hello world - 803\""},"k8s":{"container_id":"cri-o://f7d86eedea38f10bffb9f88584931164cae0fc74176be75d19a96be16da2ae06","container_image":"quay.io/strimzi-examples/java-kafka-producer:latest","container_image_id":"quay.io/strimzi-examples/java-kafka-producer@sha256:1a6bdae12a8b67bf2e8e27f61fd838e4f197d676cbd981ea009ec46350be55f4","container_name":"java-kafka-producer","labels":{"app":"java-kafka-producer","domain":"appli","pod-template-hash":"f674dd585"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_name":"mpms-kafka-test","pod_id":"2546f1d9-1e19-463a-98ed-98eccdc86e81","pod_ip":"10.128.4.83","pod_name":"java-kafka-producer-f674dd585-mqcs6","pod_owner":"ReplicaSet/java-kafka-producer-f674dd585"},"service_name":"unknown","severityText":"info","timeUnixNano":1736761428376536290}],"scope":{"name":"Vector","version":"0.43.0"}}]}]}
{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"unknown"}},{"key":"k8s.namespace.name","value":{"stringValue":"mpms-kafka-test"}},{"key":"k8s.pod.name","value":{"stringValue":"java-kafka-consumer-688c77d78c-fsdms"}},{"key":"k8s.container.name","value":{"stringValue":"java-kafka-consumer"}}]},"scopeLogs":[{"logRecords":[{"attributes":[{"key":"service.name","value":{"stringValue":"unknown"}}],"body":{"stringValue":"2025-01-13 09:43:47 INFO  KafkaConsumerExample:52 - Received message:"},"k8s":{"container_id":"cri-o://ca1cba0a5e455954212853e6c761661e9942105eb7045b7d0fe9175984dc9d46","container_image":"quay.io/strimzi-examples/java-kafka-consumer:latest","container_image_id":"quay.io/strimzi-examples/java-kafka-consumer@sha256:275d91e566ccab13d4c035111662a375640e3451bf3671ac19b0e999e045096e","container_name":"java-kafka-consumer","labels":{"app":"java-kafka-consumer","domain":"appli","pod-template-hash":"688c77d78c"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_name":"mpms-kafka-test","pod_id":"d97620a1-6e75-4493-91a9-8ea27d39d5ba","pod_ip":"10.130.4.117","pod_name":"java-kafka-consumer-688c77d78c-fsdms","pod_owner":"ReplicaSet/java-kafka-consumer-688c77d78c"},"service_name":"unknown","severityText":"info","timeUnixNano":1736761428495316795}],"scope":{"name":"Vector","version":"0.43.0"}}]}]}
{"resourceLogs":[{"resource":{"attributes":[{"key":"service.name","value":{"stringValue":"unknown"}},{"key":"k8s.namespace.name","value":{"stringValue":"mpms-kafka-test"}},{"key":"k8s.pod.name","value":{"stringValue":"java-kafka-consumer-688c77d78c-fsdms"}},{"key":"k8s.container.name","value":{"stringValue":"java-kafka-consumer"}}]},"scopeLogs":[{"logRecords":[{"attributes":[{"key":"service.name","value":{"stringValue":"unknown"}}],"body":{"stringValue":"2025-01-13 09:43:47 INFO  KafkaConsumerExample:53 - \tpartition: 7"},"k8s":{"container_id":"cri-o://ca1cba0a5e455954212853e6c761661e9942105eb7045b7d0fe9175984dc9d46","container_image":"quay.io/strimzi-examples/java-kafka-consumer:latest","container_image_id":"quay.io/strimzi-examples/java-kafka-consumer@sha256:275d91e566ccab13d4c035111662a375640e3451bf3671ac19b0e999e045096e","container_name":"java-kafka-consumer","labels":{"app":"java-kafka-consumer","domain":"appli","pod-template-hash":"688c77d78c"},"namespace_id":"4381a43a-0bf0-486d-b428-246a7ed75aea","namespace_name":"mpms-kafka-test","pod_id":"d97620a1-6e75-4493-91a9-8ea27d39d5ba","pod_ip":"10.130.4.117","pod_name":"java-kafka-consumer-688c77d78c-fsdms","pod_owner":"ReplicaSet/java-kafka-consumer-688c77d78c"},"service_name":"unknown","severityText":"info","timeUnixNano":1736761428495316795}],"scope":{"name":"Vector","version":"0.43.0"}}]}]}

Additional Context

No response

References

No response

@blezoray blezoray added the type: bug A code related bug. label Jan 13, 2025
@blezoray
Copy link
Author

Some additional screenshots:
Screenshot from 2025-01-13 10-27-51
Screenshot from 2025-01-13 10-32-03

@pront pront added the sink: opentelemetry Anything `opentelemetry` sink related label Jan 13, 2025
@pront
Copy link
Member

pront commented Jan 13, 2025

Do you see any errors on OTLP/Loki side? The message rate is quite low so I doubt this is due to backpressure.

I regularly lost some messages, about 10%.

In the picture above, are you missing "Hello world - 29" and so on?

@blezoray
Copy link
Author

blezoray commented Jan 14, 2025

In the picture above, are you missing "Hello world - 29" and so on?

Exactly.
Unfortunately, there is no error message on both vector and Loki sides.
Here are the logs (vector in trace mode, Loki in debug mode)
vector-traces.log.gz
loki-debug.log.gz

@pront
Copy link
Member

pront commented Jan 14, 2025

Also, can you add a console sink and verify that all logs are received and printed?

sinks:
  console:
    type: console
    inputs: ["manage_labels"]
    encoding:
      codec: json
      json:
        pretty: true

@blezoray
Copy link
Author

Also, can you add a console sink and verify that all logs are received and printed?

It's what I used to get the source format and also the sink format, after the managed_labels transform job.
In the console sink, I received all the logs.

@FredrikAugust
Copy link

Currently experiencing the same 🤔

@FredrikAugust
Copy link

FredrikAugust commented Jan 16, 2025

To elaborate a little bit; I'm also using Loki with the OTEL ingestion endpoint for logs. As you can see from the top, it says everything is ingested, but only a small number get through. Adding an example where I just curled my server 10 times and each of those generate a log entry.

Image

Image

And in my console sink I can see everything.

@FredrikAugust
Copy link

FredrikAugust commented Jan 16, 2025

Might be a coincidence, but none of my log messages containing emojis are going through. Thought it would be worth mentioning. I've confirmed the endpoint accepts it. I'm also losing way more than 10%. More than half I'd say.

@FredrikAugust
Copy link

I found out what the problem was for me. The protocol-level batching breaks ingestion into loki. Only the first message was registered, and the rest magically disappeared. See more in #22232. As I mentioned there, I reckon this is because of how the requests are merged together? Perhaps it would require a custom sink?

@blezoray
Copy link
Author

Thanks @FredrikAugust .

I also modify my opentelemetry sink with protocol.batch.max_events=1:

    sinks:
      otelhttp:
        inputs:
        - manage_labels
        protocol:
          batch:
            max_events: 1
          encoding:
            codec: json
          framing:
            method: newline_delimited
          method: post
          request:
            headers:
              content-type: application/json
          type: http
          uri: http://loki:3100/otlp/v1/logs
        type: opentelemetry

and now I receive all the messages in Loki.

But for me, this workaround should have an impact on the vector performance if it sends message one by one, isn't it ?

@FredrikAugust
Copy link

Yeah @blezoray I would assume this bumps up the resource usage quite a bit since it's got to send each log line as an individual HTTP req. Might it be because framing just merges the log messages into several lines in the HTTP req? As mentioned earlier, it would need to join them together in the JSON structure. I might be able to take a look at this tomorrow and get a PR up if that's possible?

@pront
Copy link
Member

pront commented Jan 21, 2025

Looks like this issue and #22232 are the same but keeping them both open for now until we are certain. We definitely want to support batching. There are several framing options here to try.

It does sound like the first log is decoded correctly and the rest are dropped. But I would be surprised if this happens silently, do you see any errors on the receiver side?

@blezoray
Copy link
Author

In this previous post, you have the Loki logs in debug.
It confirms what you said: streams=1 entries=1

level=debug ts=2025-01-14T08:32:26.468590126Z caller=push.go:162 org_id=fake msg="push request parsed" path=/otlp/v1/logs contentType=application/json contentEncoding= bodySize="18 kB" streams=1 entries=1 streamLabelsSize="101 B" entriesSize="387 B" structuredMetadataSize="69 B" totalSize="488 B" mostRecentLagMs=1003

@pront pront self-assigned this Jan 21, 2025
@FredrikAugust
Copy link

@pront I'm running this on Grafana Cloud so the debugging capabilities are limited, but I can't see anything indicating problems. Also regarding the framing, referring to the expected format in the OTEL spec, I don't think any of them would work.

A closer solution which I was planning to try out if this remains an issue is to use the reducer transformer to group on the instance ID (meaning the resource should be the same) and then concatenating the logRecords.

@pront
Copy link
Member

pront commented Jan 21, 2025

A closer solution which I was planning to try out if this remains an issue is to use the reducer transformer to group on the instance ID (meaning the resource should be the same) and then concatenating the logRecords.

Good thought, https://vector.dev/docs/reference/configuration/transforms/reduce/ should be able to aggregate multiple logs into a single encoded payload in an acceptable format. We eventually want to introduce some sort of native grouping at the OTEL sink.

But resources are limited this month. OTEL interoperability is high on the priority list and the community's participation in threads like this is very much appreciated.

@FredrikAugust
Copy link

Thanks for the response @pront! Glad to hear OTEL interop. is prioritized as it's a very nice feature.

@pront pront changed the title [sink OTLP] Vector lost logs [sink OTLP] cannot ingest log batches Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: opentelemetry Anything `opentelemetry` sink related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

3 participants