Skip to content

Commit

Permalink
Merge branch 'refs/heads/dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
quyvu01 committed Jan 22, 2025
2 parents 503c990 + 128908a commit 753aa5b
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/OfX.EntityFrameworkCore/OfX.EntityFrameworkCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<LangVersion>default</LangVersion>
<Version>3.2.0</Version>
<Version>3.2.1</Version>
<Authors>Quy Vu</Authors>
<PackageId>OfX-EFCore</PackageId>
<Description>OfX extension. Use EntityFramework as Data Querying</Description>
Expand Down
2 changes: 1 addition & 1 deletion src/OfX.Grpc/OfX.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<LangVersion>default</LangVersion>
<Version>3.2.0</Version>
<Version>3.2.1</Version>
<Authors>Quy Vu</Authors>
<PackageId>OfX-gRPC</PackageId>
<Description>OfX extension. Use gRPC as Data transporting</Description>
Expand Down
2 changes: 1 addition & 1 deletion src/OfX.Kafka/OfX.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<LangVersion>default</LangVersion>
<Version>3.2.0</Version>
<Version>3.2.1</Version>
<Authors>Quy Vu</Authors>
<PackageId>OfX-Kafka</PackageId>
<Description>OfX-Kafka extension. Use Kafka as Data transporting</Description>
Expand Down
7 changes: 0 additions & 7 deletions src/OfX.Nats/Messages/NatsMessageReceived.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/OfX.Nats/OfX.Nats.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<LangVersion>default</LangVersion>
<Version>3.2.0</Version>
<Version>3.2.1</Version>
<Authors>Quy Vu</Authors>
<PackageId>OfX-Nats</PackageId>
<Description>Nats.io extension. Use Nats as Data transporting</Description>
Expand Down
6 changes: 3 additions & 3 deletions src/OfX.Nats/Servers/NatsServersListening.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using Microsoft.Extensions.DependencyInjection;
using OfX.Abstractions;
using OfX.ApplicationModels;
using OfX.Cached;
using OfX.Exceptions;
using OfX.Implementations;
using OfX.Nats.Extensions;
using OfX.Nats.Messages;
using OfX.Nats.Wrappers;
using OfX.Responses;

Expand All @@ -22,8 +22,8 @@ internal static void StartAsync(IServiceProvider serviceProvider)
.Select(a => a.GetGenericArguments()[1]).ToList();
attributeTypes.ForEach(attributeType => Task.Factory.StartNew(async () =>
{
var natsScribeAsync =
natsClient.NatsClient.SubscribeAsync<MessageRequestOf>(attributeType.GetNatsSubject());
var natsScribeAsync = natsClient.NatsClient
.SubscribeAsync<MessageDeserializable>(attributeType.GetNatsSubject());
await foreach (var message in natsScribeAsync)
{
if (message.Data is null) continue;
Expand Down
2 changes: 1 addition & 1 deletion src/OfX.RabbitMq/OfX.RabbitMq.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<LangVersion>default</LangVersion>
<Version>3.2.0</Version>
<Version>3.2.1</Version>
<Authors>Quy Vu</Authors>
<PackageId>OfX-RabbitMq</PackageId>
<Description>OfX-RabbitMq extension. Use RabbitMq as Data transporting</Description>
Expand Down
9 changes: 9 additions & 0 deletions src/OfX/Abstractions/IReceivedPipelinesBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using OfX.Attributes;
using OfX.Responses;

namespace OfX.Abstractions;

public interface IReceivedPipelinesBase<TAttribute> where TAttribute : OfXAttribute
{
Task<ItemsResponse<OfXDataResponse>> ExecuteAsync(RequestContext<TAttribute> requestContext);
}
2 changes: 2 additions & 0 deletions src/OfX/Extensions/OfXExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public static OfXRegister AddOfX(this IServiceCollection serviceCollection, Acti

serviceCollection.AddTransient(typeof(SendPipelinesImpl<>));

serviceCollection.AddTransient(typeof(ISendPipelineBehavior<>), typeof(SendPipelineRoutingBehavior<>));

return newOfRegister;
}

Expand Down
9 changes: 5 additions & 4 deletions src/OfX/Implementations/ReceivedPipelinesImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ namespace OfX.Implementations;

public class ReceivedPipelinesImpl<TModel, TAttribute>(
IEnumerable<IReceivedPipelineBehavior<TAttribute>> behaviors,
IQueryOfHandler<TModel, TAttribute> handler)
IQueryOfHandler<TModel, TAttribute> handler) :
IReceivedPipelinesBase<TAttribute>
where TAttribute : OfXAttribute where TModel : class
{
public async Task<ItemsResponse<OfXDataResponse>> ExecuteAsync(RequestContext<TAttribute> request)
public async Task<ItemsResponse<OfXDataResponse>> ExecuteAsync(RequestContext<TAttribute> requestContext)
{
var next = new Func<Task<ItemsResponse<OfXDataResponse>>>(() => handler.GetDataAsync(request));
var next = new Func<Task<ItemsResponse<OfXDataResponse>>>(() => handler.GetDataAsync(requestContext));

foreach (var behavior in behaviors.Reverse())
{
var current = next;
next = () => behavior.HandleAsync(request, current);
next = () => behavior.HandleAsync(requestContext, current);
}

return await next();
Expand Down
26 changes: 26 additions & 0 deletions src/OfX/Implementations/SendPipelineRoutingBehavior.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using OfX.Abstractions;
using OfX.Attributes;
using OfX.Cached;
using OfX.Responses;

namespace OfX.Implementations;

internal sealed class SendPipelineRoutingBehavior<TAttribute>(
IServiceProvider serviceProvider) :
ISendPipelineBehavior<TAttribute> where TAttribute : OfXAttribute
{
public async Task<ItemsResponse<OfXDataResponse>> HandleAsync(RequestContext<TAttribute> requestContext,
Func<Task<ItemsResponse<OfXDataResponse>>> next)
{
// Check if we have the inner handler for `TAttribute` or not. If have, we will call the ReceivedPipelinesImpl<,> instead of sending via message!
var existedHandler = OfXCached.AttributeMapHandler;
if (!existedHandler.TryGetValue(typeof(TAttribute), out var handlerType)) return await next.Invoke();
if (!handlerType.IsGenericType) return await next.Invoke();
var args = handlerType.GetGenericArguments();
var receivedPipelineBehavior = serviceProvider
.GetService(typeof(ReceivedPipelinesImpl<,>).MakeGenericType(args));
if (receivedPipelineBehavior is not IReceivedPipelinesBase<TAttribute> receivedPipelinesBase)
return await next.Invoke();
return await receivedPipelinesBase.ExecuteAsync(requestContext);
}
}
2 changes: 1 addition & 1 deletion src/OfX/OfX.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<LangVersion>default</LangVersion>
<Version>3.2.0</Version>
<Version>3.2.1</Version>
<Authors>Quy Vu</Authors>
<PackageId>OfX</PackageId>
<Description>The high performance and easiest way to play with microservices for .NET</Description>
Expand Down

0 comments on commit 753aa5b

Please sign in to comment.