From 167abc271d7563b5645e33703b78e08f1db00b85 Mon Sep 17 00:00:00 2001 From: James Geisler Date: Mon, 11 Nov 2024 17:29:28 -0600 Subject: [PATCH] [receiver/datadogreceiver] add json handling for the api/v2/series endpoint (#36218) #### Description Adding json handling for the `api/v2/series` endpoint. The datadog api client libraries use json messages, however only protobuf messages are currently supported in the` api/v2/series` endpoint, so requests fail with `proto: illegal wireType 6` If `Content-Type: application/json` is set, then we handle the json message. Otherwise, we handle the protobuf message. #### Link to tracking issue Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36079 #### Testing Added test with a json metric payload that now passes. Additionally, I also tested these changes in my own image and confirmed that the datadog api client libraries can now successfully ship metrics to the `api/v2/series` endpoint. I also confirmed with the following curl: ``` curl -X POST \ -H "Content-Type: application/json" \ -H "DD-API-KEY: your_api_key_here" \ -d '{ "series": [ { "resources": [ { "name": "dummyhost", "type": "host" } ], "tags": ["env:test"], "metric": "test.metric", "points": [ { "timestamp": 1730829575, "value": 1.0 } ], "type": 3 } ] }' \ https://datadog-receiver/api/v2/series {"errors":[]} ``` --------- Co-authored-by: Sean Marciniak <30928402+MovieStoreGuy@users.noreply.github.com> --- .../36079-add-datadog-json-handling.yaml | 27 +++++++ .../internal/translator/series.go | 17 +++- receiver/datadogreceiver/receiver_test.go | 79 +++++++++++++++++++ 3 files changed, 119 insertions(+), 4 deletions(-) create mode 100644 .chloggen/36079-add-datadog-json-handling.yaml diff --git a/.chloggen/36079-add-datadog-json-handling.yaml b/.chloggen/36079-add-datadog-json-handling.yaml new file mode 100644 index 000000000000..2d55b8e4056e --- /dev/null +++ b/.chloggen/36079-add-datadog-json-handling.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: 'datadogreceiver' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add json handling for the `api/v2/series` endpoint in the datadogreceiver" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36079] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/datadogreceiver/internal/translator/series.go b/receiver/datadogreceiver/internal/translator/series.go index 9588839ebdb8..f6fac8b75210 100644 --- a/receiver/datadogreceiver/internal/translator/series.go +++ b/receiver/datadogreceiver/internal/translator/series.go @@ -4,6 +4,7 @@ package translator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator" import ( + "encoding/json" "io" "net/http" "strings" @@ -27,7 +28,6 @@ type SeriesList struct { Series []datadogV1.Series `json:"series"` } -// TODO: add handling for JSON format in additional to protobuf? func (mt *MetricsTranslator) HandleSeriesV2Payload(req *http.Request) (mp []*gogen.MetricPayload_MetricSeries, err error) { buf := GetBuffer() defer PutBuffer(buf) @@ -35,11 +35,20 @@ func (mt *MetricsTranslator) HandleSeriesV2Payload(req *http.Request) (mp []*gog return mp, err } + contentType := req.Header.Get("Content-Type") + pl := new(gogen.MetricPayload) - if err := pl.Unmarshal(buf.Bytes()); err != nil { - return mp, err - } + // handle json messages if set, otherwise handle protobuf + if contentType == "application/json" { + if err := json.Unmarshal(buf.Bytes(), &pl); err != nil { + return mp, err + } + } else { + if err := pl.Unmarshal(buf.Bytes()); err != nil { + return mp, err + } + } return pl.GetSeries(), nil } diff --git a/receiver/datadogreceiver/receiver_test.go b/receiver/datadogreceiver/receiver_test.go index 7283c8ba2f77..c9fc5c9b00f9 100644 --- a/receiver/datadogreceiver/receiver_test.go +++ b/receiver/datadogreceiver/receiver_test.go @@ -459,6 +459,85 @@ func TestDatadogMetricsV2_EndToEnd(t *testing.T) { assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(1).StartTimestamp()) } +func TestDatadogMetricsV2_EndToEndJSON(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.Endpoint = "localhost:0" // Using a randomly assigned address + sink := new(consumertest.MetricsSink) + + dd, err := newDataDogReceiver( + cfg, + receivertest.NewNopSettings(), + ) + require.NoError(t, err, "Must not error when creating receiver") + dd.(*datadogReceiver).nextMetricsConsumer = sink + + require.NoError(t, dd.Start(context.Background(), componenttest.NewNopHost())) + defer func() { + require.NoError(t, dd.Shutdown(context.Background())) + }() + + metricsPayloadV2 := []byte(`{ + "series": [ + { + "metric": "system.load.1", + "type": 1, + "points": [ + { + "timestamp": 1636629071, + "value": 1.5 + }, + { + "timestamp": 1636629081, + "value": 2.0 + } + ], + "resources": [ + { + "name": "dummyhost", + "type": "host" + } + ] + } + ] + }`) + + req, err := http.NewRequest( + http.MethodPost, + fmt.Sprintf("http://%s/api/v2/series", dd.(*datadogReceiver).address), + io.NopCloser(bytes.NewReader(metricsPayloadV2)), + ) + + req.Header.Set("Content-Type", "application/json") + + require.NoError(t, err, "Must not error when creating request") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err, "Must not error performing request") + + body, err := io.ReadAll(resp.Body) + require.NoError(t, multierr.Combine(err, resp.Body.Close()), "Must not error when reading body") + require.JSONEq(t, `{"errors": []}`, string(body), "Expected JSON response to be `{\"errors\": []}`, got %s", string(body)) + require.Equal(t, http.StatusAccepted, resp.StatusCode) + + mds := sink.AllMetrics() + require.Len(t, mds, 1) + got := mds[0] + require.Equal(t, 1, got.ResourceMetrics().Len()) + metrics := got.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + assert.Equal(t, 1, metrics.Len()) + metric := metrics.At(0) + assert.Equal(t, pmetric.MetricTypeSum, metric.Type()) + assert.Equal(t, "system.load.1", metric.Name()) + assert.Equal(t, pmetric.AggregationTemporalityDelta, metric.Sum().AggregationTemporality()) + assert.False(t, metric.Sum().IsMonotonic()) + assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(0).Timestamp()) + assert.Equal(t, 1.5, metric.Sum().DataPoints().At(0).DoubleValue()) + assert.Equal(t, pcommon.Timestamp(0), metric.Sum().DataPoints().At(0).StartTimestamp()) + assert.Equal(t, pcommon.Timestamp(1636629081*1_000_000_000), metric.Sum().DataPoints().At(1).Timestamp()) + assert.Equal(t, 2.0, metric.Sum().DataPoints().At(1).DoubleValue()) + assert.Equal(t, pcommon.Timestamp(1636629071*1_000_000_000), metric.Sum().DataPoints().At(1).StartTimestamp()) +} + func TestDatadogSketches_EndToEnd(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Endpoint = "localhost:0" // Using a randomly assigned address