Skip to content
This repository has been archived by the owner on Jul 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #59 from GreenParrotGmbH/feature-batch-count
Browse files Browse the repository at this point in the history
Add maxBatchSize parameter to IntervalBatcher
  • Loading branch information
nblumhardt authored May 9, 2018
2 parents 7dc6860 + f458441 commit bec2ce3
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace InfluxDB.Collector.Configuration
{
public abstract class CollectorBatchConfiguration
{
public abstract CollectorConfiguration AtInterval(TimeSpan interval);
public CollectorConfiguration AtInterval(TimeSpan interval) => AtInterval(interval, 5000);

public abstract CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ class PipelinedCollectorBatchConfiguration : CollectorBatchConfiguration
{
readonly CollectorConfiguration _configuration;
TimeSpan? _interval;
int? _maxBatchSize;

public PipelinedCollectorBatchConfiguration(CollectorConfiguration configuration)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
_configuration = configuration;
}

public override CollectorConfiguration AtInterval(TimeSpan interval)
public override CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize)
{
_interval = interval;
_maxBatchSize = maxBatchSize;
return _configuration;
}

Expand All @@ -29,9 +31,9 @@ public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
return parent;
}

var batcher = new IntervalBatcher(_interval.Value, parent);
var batcher = new IntervalBatcher(_interval.Value, _maxBatchSize, parent);
dispose = batcher.Dispose;
return batcher;
}
}
}
}
18 changes: 16 additions & 2 deletions src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using InfluxDB.Collector.Diagnostics;
using InfluxDB.Collector.Platform;
using InfluxDB.Collector.Util;

namespace InfluxDB.Collector.Pipeline.Batch
{
Expand All @@ -12,17 +14,19 @@ class IntervalBatcher : IPointEmitter, IDisposable
Queue<PointData> _queue = new Queue<PointData>();

readonly TimeSpan _interval;
readonly int? _maxBatchSize;
readonly IPointEmitter _parent;

readonly object _stateLock = new object();
readonly PortableTimer _timer;
bool _unloading;
bool _started;

public IntervalBatcher(TimeSpan interval, IPointEmitter parent)
public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent)
{
_parent = parent;
_interval = interval;
_maxBatchSize = maxBatchSize;
_timer = new PortableTimer(cancel => OnTick());
}

Expand Down Expand Up @@ -60,7 +64,17 @@ Task OnTick()
_queue = new Queue<PointData>();
}

_parent.Emit(batch.ToArray());
if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value)
{
_parent.Emit(batch.ToArray());
}
else
{
foreach (var chunk in batch.Batch(_maxBatchSize.Value))
{
_parent.Emit(chunk.ToArray());
}
}
}
catch (Exception ex)
{
Expand Down
55 changes: 55 additions & 0 deletions src/InfluxDB.Collector/Util/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Collections.Generic;
using System.Linq;

namespace InfluxDB.Collector.Util
{
internal static class EnumerableExtensions
{
// Copied from https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs
// Original license below
//
// MoreLINQ - Extensions to LINQ to Objects
// Copyright (c) 2009 Atif Aziz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(this IEnumerable<TSource> source, int size)
{
TSource[] bucket = null;
var count = 0;

foreach (var item in source)
{
if (bucket == null)
{
bucket = new TSource[size];
}

bucket[count++] = item;
if (count != size)
{
continue;
}

yield return bucket;

bucket = null;
count = 0;
}

if (bucket != null && count > 0)
{
yield return bucket.Take(count);
}
}
}
}

0 comments on commit bec2ce3

Please sign in to comment.