From 6b8ab3344cc51d0064d7bc0e04ce4646e3d2ac67 Mon Sep 17 00:00:00 2001 From: PeterN <79838809+peter-quix@users.noreply.github.com> Date: Tue, 30 Jan 2024 19:17:38 +0000 Subject: [PATCH] Fix stream pipeline not working with multiple components (#29) - Also adjusted some file names to match class names - Fix stream pipeline not working with multiple components --- builds/csharp/nuget/build_nugets.py | 4 +- .../StreamPipelineShould.cs} | 50 +++++++++++++++++++ ...ould.cs => KafkaConsumerProducerShould.cs} | 2 - .../Core/StreamPipeline.cs | 2 +- 4 files changed, 53 insertions(+), 5 deletions(-) rename src/QuixStreams.Telemetry.UnitTests/{StreamProcessShould.cs => Core/StreamPipelineShould.cs} (64%) rename src/QuixStreams.Telemetry.UnitTests/{KafkaReaderProducerShould.cs => KafkaConsumerProducerShould.cs} (99%) diff --git a/builds/csharp/nuget/build_nugets.py b/builds/csharp/nuget/build_nugets.py index 665e2182..52060872 100644 --- a/builds/csharp/nuget/build_nugets.py +++ b/builds/csharp/nuget/build_nugets.py @@ -7,8 +7,8 @@ from typing import List version = "0.6.2.0" -informal_version = "0.6.3.0-dev1" -nuget_version = "0.6.3.0-dev1" +informal_version = "0.6.3.0-dev2" +nuget_version = "0.6.3.0-dev2" def updatecsproj(projfilepath): diff --git a/src/QuixStreams.Telemetry.UnitTests/StreamProcessShould.cs b/src/QuixStreams.Telemetry.UnitTests/Core/StreamPipelineShould.cs similarity index 64% rename from src/QuixStreams.Telemetry.UnitTests/StreamProcessShould.cs rename to src/QuixStreams.Telemetry.UnitTests/Core/StreamPipelineShould.cs index 651c07a6..c016ce23 100644 --- a/src/QuixStreams.Telemetry.UnitTests/StreamProcessShould.cs +++ b/src/QuixStreams.Telemetry.UnitTests/Core/StreamPipelineShould.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using FluentAssertions; using QuixStreams.Telemetry.Models; using QuixStreams.Telemetry.UnitTests.Helpers; @@ -99,6 +101,54 @@ public void Send_WithPackageSubscription_ShouldExecutePackageSubscribersHandlers Assert.Equal(testModel2, handledPackage.Value); } + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + public void Send_WithMultipleComponentRegistered_ReceiveExpected(int numberOfComponents) + { + // Arrange + var invokingComponents = new List>(); + IStreamPipeline pipeline = new StreamPipeline(); + TestModel1 testModel1 = new TestModel1(); + + void Callback(BypassComponent component, TestModel1 package) + { + if (package != testModel1) throw new Exception("Incorrect package raised"); + invokingComponents.Add(component); + } + + var components = new List>(); + for (int i = 0; i < numberOfComponents; i++) + { + var component = new BypassComponent(Callback); + components.Add(component); + pipeline.AddComponent(component); + } + + + // Act + pipeline.Send(testModel1); + + // Assert + invokingComponents.Select(y=> y.Id) + .Should().BeEquivalentTo(components.Select(y=> y.Id), o => o.WithStrictOrdering()); + } + + private class BypassComponent : StreamComponent + { + + public readonly Guid Id = Guid.NewGuid(); + + public BypassComponent(Action, T> callback) + { + this.Input.Subscribe(package => + { + callback(this, package); + }); + this.Input.LinkTo(this.Output); + } + } } diff --git a/src/QuixStreams.Telemetry.UnitTests/KafkaReaderProducerShould.cs b/src/QuixStreams.Telemetry.UnitTests/KafkaConsumerProducerShould.cs similarity index 99% rename from src/QuixStreams.Telemetry.UnitTests/KafkaReaderProducerShould.cs rename to src/QuixStreams.Telemetry.UnitTests/KafkaConsumerProducerShould.cs index f9ff2079..060fde1c 100644 --- a/src/QuixStreams.Telemetry.UnitTests/KafkaReaderProducerShould.cs +++ b/src/QuixStreams.Telemetry.UnitTests/KafkaConsumerProducerShould.cs @@ -2,8 +2,6 @@ using System.Collections.Generic; using System.Threading.Tasks; using Quix.TestBase.Extensions; -using QuixStreams; -using QuixStreams.Kafka.Transport.SerDes; using QuixStreams.Kafka.Transport.SerDes.Codecs; using QuixStreams.Kafka.Transport.SerDes.Codecs.DefaultCodecs; using QuixStreams.Kafka.Transport.Tests.Helpers; diff --git a/src/QuixStreams.Telemetry/Core/StreamPipeline.cs b/src/QuixStreams.Telemetry/Core/StreamPipeline.cs index 021929f3..e042f2cf 100644 --- a/src/QuixStreams.Telemetry/Core/StreamPipeline.cs +++ b/src/QuixStreams.Telemetry/Core/StreamPipeline.cs @@ -4,7 +4,6 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using QuixStreams; using QuixStreams.Telemetry.Models; namespace QuixStreams.Telemetry @@ -89,6 +88,7 @@ public IStreamPipeline AddComponent(StreamComponent component) { componentsList.Clear(); this.firstComponent = component; + this.defaultPipeline = false; } else {