Skip to content

Commit

Permalink
Cache item size
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Jan 22, 2025
1 parent 9757ead commit bc525ed
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 30 deletions.
19 changes: 14 additions & 5 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ var (
)

type logsRequest struct {
ld plog.Logs
pusher consumer.ConsumeLogsFunc
ld plog.Logs
pusher consumer.ConsumeLogsFunc
cachedItemsCount int
}

func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request {
return &logsRequest{
ld: ld,
pusher: pusher,
ld: ld,
pusher: pusher,
cachedItemsCount: 0,
}
}

Expand Down Expand Up @@ -63,7 +65,14 @@ func (req *logsRequest) Export(ctx context.Context) error {
}

func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
if req.cachedItemsCount == 0 {
req.cachedItemsCount = req.ld.LogRecordCount()
}
return req.cachedItemsCount
}

func (req *logsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type logsExporter struct {
Expand Down
15 changes: 11 additions & 4 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,36 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
continue
}

srcCount := srcReq.ld.LogRecordCount()
srcCount := srcReq.ItemsCount()
if srcCount <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
destReq.setCachedItemsCount(srcCount)
srcReq.setCachedItemsCount(0)
}
capacityLeft -= srcCount
continue
}

for {
extractedLogs := extractLogs(srcReq.ld, capacityLeft)
if extractedLogs.LogRecordCount() == 0 {
extractedCount := extractedLogs.LogRecordCount()
if extractedCount == 0 {
break
}
capacityLeft -= extractedLogs.LogRecordCount()

if destReq == nil {
destReq = newLogsRequest(extractedLogs, srcReq.pusher).(*logsRequest)
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
} else {
extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
}

// Create new batch once capacity is reached.
capacityLeft -= extractedCount
if capacityLeft == 0 {
res = append(res, destReq)
destReq = nil
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/logs_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestMergeSplitLogs(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i, r := range res {
assert.Equal(t, tt.expected[i], r.(*logsRequest))
assert.Equal(t, tt.expected[i].ld, r.(*logsRequest).ld)
}
})
}
Expand Down Expand Up @@ -152,3 +152,43 @@ func TestExtractLogs(t *testing.T) {
assert.Equal(t, 10-i, ld.LogRecordCount())
}
}

func BenchmarkSplittingBasedOnItemCountManySmallLogs(b *testing.B) {
// All requests merge into a single batch.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&logsRequest{ld: testdata.GenerateLogs(10)}}
for j := 0; j < 1000; j++ {
lr2 := &logsRequest{ld: testdata.GenerateLogs(10)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 1)
}
}

func BenchmarkSplittingBasedOnItemCountManyLogsSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&logsRequest{ld: testdata.GenerateLogs(0)}}
for j := 0; j < 10; j++ {
lr2 := &logsRequest{ld: testdata.GenerateLogs(10001)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 11)
}
}

func BenchmarkSplittingBasedOnItemCountHugeLogs(b *testing.B) {
// One request splits into many batches.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&logsRequest{ld: testdata.GenerateLogs(0)}}
lr2 := &logsRequest{ld: testdata.GenerateLogs(100000)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
assert.Len(b, merged, 10)
}
}
19 changes: 14 additions & 5 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ var (
)

type metricsRequest struct {
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
md pmetric.Metrics
pusher consumer.ConsumeMetricsFunc
cachedItemsCount int
}

func newMetricsRequest(md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) Request {
return &metricsRequest{
md: md,
pusher: pusher,
md: md,
pusher: pusher,
cachedItemsCount: 0,
}
}

Expand Down Expand Up @@ -63,7 +65,14 @@ func (req *metricsRequest) Export(ctx context.Context) error {
}

func (req *metricsRequest) ItemsCount() int {
return req.md.DataPointCount()
if req.cachedItemsCount == 0 {
req.cachedItemsCount = req.md.DataPointCount()
}
return req.cachedItemsCount
}

func (req *metricsRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type metricsExporter struct {
Expand Down
15 changes: 11 additions & 4 deletions exporter/exporterhelper/metrics_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max
continue
}

srcCount := srcReq.md.DataPointCount()
srcCount := srcReq.ItemsCount()
if srcCount <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(srcCount)
srcReq.setCachedItemsCount(0)
srcReq.md.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics())
}
capacityLeft -= srcCount
Expand All @@ -51,16 +53,21 @@ func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.Max

for {
extractedMetrics := extractMetrics(srcReq.md, capacityLeft)
if extractedMetrics.DataPointCount() == 0 {
extractedCount := extractedMetrics.DataPointCount()
if extractedCount == 0 {
break
}
capacityLeft -= extractedMetrics.DataPointCount()

if destReq == nil {
destReq = newMetricsRequest(extractedMetrics, srcReq.pusher).(*metricsRequest)
destReq = &metricsRequest{md: extractedMetrics, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
} else {
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
extractedMetrics.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics())
}

// Create new batch once capacity is reached.
capacityLeft -= extractedCount
if capacityLeft == 0 {
res = append(res, destReq)
destReq = nil
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/metrics_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestMergeSplitMetrics(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i], res[i].(*metricsRequest))
assert.Equal(t, tt.expected[i].md, res[i].(*metricsRequest).md)
}
})
}
Expand Down Expand Up @@ -159,3 +159,43 @@ func TestExtractMetricsInvalidMetric(t *testing.T) {
assert.Equal(t, testdata.GenerateMetricsMetricTypeInvalid(), extractedMetrics)
assert.Equal(t, 0, md.ResourceMetrics().Len())
}

func BenchmarkSplittingBasedOnItemCountManySmallMetrics(b *testing.B) {
// All requests merge into a single batch.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
for i := 0; i < b.N; i++ {
merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(10)}}
for j := 0; j < 1000; j++ {
lr2 := &metricsRequest{md: testdata.GenerateMetrics(10)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 1)
}
}

func BenchmarkSplittingBasedOnItemCountManyMetricsSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
for i := 0; i < b.N; i++ {
merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(0)}}
for j := 0; j < 10; j++ {
lr2 := &metricsRequest{md: testdata.GenerateMetrics(10001)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 11)
}
}

func BenchmarkSplittingBasedOnItemCountHugeMetrics(b *testing.B) {
// One request splits into many batches.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 20000}
for i := 0; i < b.N; i++ {
merged := []Request{&metricsRequest{md: testdata.GenerateMetrics(0)}}
lr2 := &metricsRequest{md: testdata.GenerateMetrics(100000)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
assert.Len(b, merged, 10)
}
}
19 changes: 14 additions & 5 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ var (
)

type tracesRequest struct {
td ptrace.Traces
pusher consumer.ConsumeTracesFunc
td ptrace.Traces
pusher consumer.ConsumeTracesFunc
cachedItemsCount int
}

func newTracesRequest(td ptrace.Traces, pusher consumer.ConsumeTracesFunc) Request {
return &tracesRequest{
td: td,
pusher: pusher,
td: td,
pusher: pusher,
cachedItemsCount: 0,
}
}

Expand Down Expand Up @@ -63,7 +65,14 @@ func (req *tracesRequest) Export(ctx context.Context) error {
}

func (req *tracesRequest) ItemsCount() int {
return req.td.SpanCount()
if req.cachedItemsCount == 0 {
req.cachedItemsCount = req.td.SpanCount()
}
return req.cachedItemsCount
}

func (req *tracesRequest) setCachedItemsCount(count int) {
req.cachedItemsCount = count
}

type tracesExporter struct {
Expand Down
15 changes: 11 additions & 4 deletions exporter/exporterhelper/traces_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS
continue
}

srcCount := srcReq.td.SpanCount()
srcCount := srcReq.ItemsCount()
if srcCount <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
destReq.setCachedItemsCount(srcCount)
srcReq.setCachedItemsCount(0)
srcReq.td.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans())
}
capacityLeft -= srcCount
Expand All @@ -51,16 +53,21 @@ func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxS

for {
extractedTraces := extractTraces(srcReq.td, capacityLeft)
if extractedTraces.SpanCount() == 0 {
extractedCount := extractedTraces.SpanCount()
if extractedCount == 0 {
break
}
capacityLeft -= extractedTraces.SpanCount()

if destReq == nil {
destReq = newTracesRequest(extractedTraces, srcReq.pusher).(*tracesRequest)
destReq = &tracesRequest{td: extractedTraces, pusher: srcReq.pusher, cachedItemsCount: extractedCount}
} else {
destReq.setCachedItemsCount(destReq.ItemsCount() + extractedCount)
srcReq.setCachedItemsCount(srcReq.ItemsCount() - extractedCount)
extractedTraces.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans())
}

// Create new batch once capacity is reached.
capacityLeft -= extractedCount
if capacityLeft == 0 {
res = append(res, destReq)
destReq = nil
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/traces_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestMergeSplitTraces(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, len(tt.expected), len(res))
for i := range res {
assert.Equal(t, tt.expected[i], res[i].(*tracesRequest))
assert.Equal(t, tt.expected[i].td, res[i].(*tracesRequest).td)
}
})
}
Expand Down Expand Up @@ -159,3 +159,43 @@ func TestExtractTraces(t *testing.T) {
assert.Equal(t, 10-i, td.SpanCount())
}
}

func BenchmarkSplittingBasedOnItemCountManySmallTraces(b *testing.B) {
// All requests merge into a single batch.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&tracesRequest{td: testdata.GenerateTraces(10)}}
for j := 0; j < 1000; j++ {
lr2 := &tracesRequest{td: testdata.GenerateTraces(10)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 1)
}
}

func BenchmarkSplittingBasedOnItemCountManyTracesSlightlyAboveLimit(b *testing.B) {
// Every incoming request results in a split.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&tracesRequest{td: testdata.GenerateTraces(0)}}
for j := 0; j < 10; j++ {
lr2 := &tracesRequest{td: testdata.GenerateTraces(10001)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
}
assert.Len(b, merged, 11)
}
}

func BenchmarkSplittingBasedOnItemCountHugeTraces(b *testing.B) {
// One request splits into many batches.
cfg := exporterbatcher.MaxSizeConfig{MaxSizeItems: 10000}
for i := 0; i < b.N; i++ {
merged := []Request{&tracesRequest{td: testdata.GenerateTraces(0)}}
lr2 := &tracesRequest{td: testdata.GenerateTraces(100000)}
res, _ := merged[len(merged)-1].MergeSplit(context.Background(), cfg, lr2)
merged = append(merged[0:len(merged)-1], res...)
assert.Len(b, merged, 10)
}
}

0 comments on commit bc525ed

Please sign in to comment.