Skip to content

Commit

Permalink
Fix stream pipeline not working with multiple components (#29)
Browse files Browse the repository at this point in the history
- Also adjusted some file names to match class names
- Fix stream pipeline not working with multiple components
  • Loading branch information
peter-quix authored Jan 30, 2024
1 parent 989a30e commit 6b8ab33
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 5 deletions.
4 changes: 2 additions & 2 deletions builds/csharp/nuget/build_nugets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from typing import List

version = "0.6.2.0"
informal_version = "0.6.3.0-dev1"
nuget_version = "0.6.3.0-dev1"
informal_version = "0.6.3.0-dev2"
nuget_version = "0.6.3.0-dev2"


def updatecsproj(projfilepath):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using FluentAssertions;
using QuixStreams.Telemetry.Models;
using QuixStreams.Telemetry.UnitTests.Helpers;
Expand Down Expand Up @@ -99,6 +101,54 @@ public void Send_WithPackageSubscription_ShouldExecutePackageSubscribersHandlers
Assert.Equal(testModel2, handledPackage.Value);
}

[Theory]
[InlineData(1)]
[InlineData(2)]
[InlineData(3)]
public void Send_WithMultipleComponentRegistered_ReceiveExpected(int numberOfComponents)
{
// Arrange
var invokingComponents = new List<BypassComponent<TestModel1>>();
IStreamPipeline pipeline = new StreamPipeline();
TestModel1 testModel1 = new TestModel1();

void Callback(BypassComponent<TestModel1> component, TestModel1 package)
{
if (package != testModel1) throw new Exception("Incorrect package raised");
invokingComponents.Add(component);
}

var components = new List<BypassComponent<TestModel1>>();
for (int i = 0; i < numberOfComponents; i++)
{
var component = new BypassComponent<TestModel1>(Callback);
components.Add(component);
pipeline.AddComponent(component);
}


// Act
pipeline.Send(testModel1);

// Assert
invokingComponents.Select(y=> y.Id)
.Should().BeEquivalentTo(components.Select(y=> y.Id), o => o.WithStrictOrdering());
}

private class BypassComponent<T> : StreamComponent
{

public readonly Guid Id = Guid.NewGuid();

public BypassComponent(Action<BypassComponent<T>, T> callback)
{
this.Input.Subscribe<T>(package =>
{
callback(this, package);
});
this.Input.LinkTo(this.Output);
}
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Quix.TestBase.Extensions;
using QuixStreams;
using QuixStreams.Kafka.Transport.SerDes;
using QuixStreams.Kafka.Transport.SerDes.Codecs;
using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs;
using QuixStreams.Kafka.Transport.Tests.Helpers;
Expand Down
2 changes: 1 addition & 1 deletion src/QuixStreams.Telemetry/Core/StreamPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using QuixStreams;
using QuixStreams.Telemetry.Models;

namespace QuixStreams.Telemetry
Expand Down Expand Up @@ -89,6 +88,7 @@ public IStreamPipeline AddComponent(StreamComponent component)
{
componentsList.Clear();
this.firstComponent = component;
this.defaultPipeline = false;
}
else
{
Expand Down

0 comments on commit 6b8ab33

Please sign in to comment.