Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Panama.Core v6 #22

Closed
mrogunlana opened this issue Feb 12, 2023 · 51 comments · Fixed by #23
Closed

Upgrade to Panama.Core v6 #22

mrogunlana opened this issue Feb 12, 2023 · 51 comments · Fixed by #23
Assignees
Labels
enhancement New feature or request

Comments

@mrogunlana
Copy link
Owner

Update based on:

image

@mrogunlana
Copy link
Owner Author

mrogunlana commented Feb 12, 2023

Here is how the root directory of the repository should look:
image

note: the Store.cs will be an in-memory IStore implementation

@mrogunlana
Copy link
Owner Author

mrogunlana commented Feb 12, 2023

Here is how the Interfaces directory would look:

image

Notes:

  1. .NET Core/DI with HttpContext: https://www.telerik.com/blogs/how-to-get-httpcontext-asp-net-core (the Handler should consume the .NET core DI through its constructor, it should also integrate with the HttpContext through the accessor middleware)
  2. Create an abstraction around the HttpContext called IContext that the IHandler uses to prevent the need of having a Microsoft.AspNetCore reference in Panama.Core
  3. I'd like to avoid the use as many nuget packages in core as possible.

@AbdulmueezEmiola
Copy link
Collaborator

image

@AbdulmueezEmiola
Copy link
Collaborator

@mrogunlana
On tryiing to run tests, some currently fail. I'm guessing they are db related and can wait till we finish the DI

@mrogunlana
Copy link
Owner Author

mrogunlana commented Feb 15, 2023 via email

@mrogunlana mrogunlana added the enhancement New feature or request label Feb 15, 2023
@mrogunlana mrogunlana pinned this issue Feb 15, 2023
@AbdulmueezEmiola
Copy link
Collaborator

Alright,

@AbdulmueezEmiola
Copy link
Collaborator

So, Reading and looking through the package.
I can see that we are currently making use of a service locator pattern and the aim is to replace it with DI right?

@AbdulmueezEmiola
Copy link
Collaborator

I think the above assumption is wrong since we are loading the assemblies outside the package, we just need to replace the dependency on autofac right?

@mrogunlana
Copy link
Owner Author

I think the above assumption is wrong since we are loading the assemblies outside the package, we just need to replace the dependency on autofac right?

I believe having a service locator would still be valid in some cases so replacing the dependency on autofac would be the right move. The Handler itself should depend on the DI/IoC .NET container in its constructor though (e.g. instead of referencing the service locator). Does this make sense?

@mrogunlana
Copy link
Owner Author

Use this branch for the updates: https://github.com/mrogunlana/Panama.Core/tree/panama-core-v6

@AbdulmueezEmiola
Copy link
Collaborator

@mrogunlana Can you take a look at this commit 312d161, I've tried to refactor the handler class. Made some design decisions also alongst the process. Let me know what you think before I proceed.

@mrogunlana
Copy link
Owner Author

Great work so far, left some comments on the commit: 312d161#commitcomment-100953812

AbdulmueezEmiola added a commit that referenced this issue Feb 17, 2023
@mrogunlana
Copy link
Owner Author

see latest commit on branch 51c140e I moved all files that will make the v6 release in the root directory with the Handler

Here is a design consideration for you: how can we build out the execution workflow of the handler in a extensible fashion? If you examine https://github.com/mrogunlana/Panama.Core/blob/master/Panama/Commands/Handlers/Handler.cs#L285 you'll see that the current Invoke functions similar to the strategy pattern: https://www.dofactory.com/net/strategy-design-pattern#realworld

Here is the update based on this consideration: 8852f0d I think this should be a good base version of the Handler for v6. Please clean up the repository e.g., remove all unused projects files and the only project(s) that should remain are:

  1. Panama.Core
  2. Panama.Core.Test

All directories in Panama.Core should be removed except the following folders:

  1. Interfaces
  2. Exceptions
  3. Extensions

Update all tests to point to Panama.Core.Handler and the objects in the Panama.Core namespace. Also please thoroughly test the new Handler to make sure the Invoke function works as expected.

One nuance I made to unify the ICommand, IRollback, IValidate, IQuery interfaces is that they all inherit from IExecute. The breaking change is to the IValidate where the old version had a Message and IsValid function. The new version has the same contract as ICommand for example where instead of returning a boolean value to denote if a IValidate is valid or not - I'd rather users throw a new ValidationException which will break the workflow and return the message from the exception bubbled up to the handler; this needs to be tested. I'm also open to any other optimization recommendation you suggest.

Once you update all references and clean up the project (and the build is stable 😄) we can move on the adding the IPublish, ISubscribe, IMessage, IBroker to the library which should all the features we need to rollout. Thoughts?

@AbdulmueezEmiola
Copy link
Collaborator

So, I'm trying to update the classes in the test project and I can see that some classes have references to removed projects like sql etc. I'm planning to remove those tests for now. We can add them afterwards when we reach those parts of the project.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Feb 19, 2023

@AbdulmueezEmiola cleaned up obsolete projects and added a few things in this commit (build broken): d8ef37e

@AbdulmueezEmiola
Copy link
Collaborator

@mrogunlana Alright, I'm pulling now

@AbdulmueezEmiola
Copy link
Collaborator

@mrogunlana How should we proceed with the test project, are we also dropping them for now?

@mrogunlana
Copy link
Owner Author

Let's keep the test project, I'm depending on you to resolve the compile errors and update the test project with the proper references. Take a good look at the handler and its contract. I did some pseudo fancy coding to abstract the invocation function from the handler but it may not work. The idea is that I'd like for the handler to be as simple as possible and provide an abstraction around what the handler depends on e.g., the strategy it uses to execute its manifest. Maybe you could refine the rough idea I put together

@mrogunlana
Copy link
Owner Author

@AbdulmueezEmiola here is an example of where named/keyed services:
https://github.com/mrogunlana/Panama.Core/blob/b0adb4d5036c6bc855bd0affec182384fcd19c3f/Panama.Core.CDC.MySQL/LogTailingProcessor.cs#L26

how would we go about doing this in a more "native" dotnet DI way e.g., is it possible to resolve a keyed/named service using constructor injection here?

@AbdulmueezEmiola
Copy link
Collaborator

AbdulmueezEmiola commented Feb 28, 2023 via email

@AbdulmueezEmiola
Copy link
Collaborator

AbdulmueezEmiola commented Feb 28, 2023 via email

@AbdulmueezEmiola
Copy link
Collaborator

AbdulmueezEmiola commented Feb 28, 2023 via email

@mrogunlana
Copy link
Owner Author

@AbdulmueezEmiola Panama.Core.CDC contains the change data capture related module. This will act as a standalone module and provides plug-in-play capabilities through the the Server/Processor design pattern. Essentially, the Panama.Core.CDC creates a sidecar worker that allows the CDC.* namespace packages to register their processes via the IProcess interface.

Panama.Core.CDC.MySQL contains the MySQL flavored processors for change data capture e.g. the LogTailingProcessor is the transactional outbox processor that governs the Outbox table. The gist of the transactional Outbox table is that Panama.Core.CDC.MySQLregisters theLogTailingProcessorwhich subscribes to the low-level change data capture API of MySQL that emits events when data has been written in theOutboxtable; this event bubbles up to theIBrokerarray that handles sending the actual message via message broker e.g. RabbitMQ. There are other processors in thePanama.Core.CDC.MySQLpackage that handles retries for thePublishedandReceived` tables.

I'm still in the process of filling out the high level details of the Panama.Core.CDC.MySQL package but the this is the gist of the technology.

The Panama.Core.CDC and Panama.Core.CDC.MySQL projects should have registrars that attaches the required services on the host similar to the registrar in the root Panama.Core project

Does this give you a working idea of where I'm going with the CDC.* namespace(s)? Think of it as a separate worker process that does the lower level scaffolding to guarantee that a message is published and/or received from a broker. All of this low-level wiring will be obfuscated from the developer when publishing a message from the context of a command.

@AbdulmueezEmiola
Copy link
Collaborator

I think I get the hang of the base CDC package, I have a few ideas on the design which I'll try out and push for you to check. I also get the gist of the sql package also, but want to work on the base package first

@mrogunlana
Copy link
Owner Author

Great, check out the project dotnetcore/cap on github for ideas as its the inspiration for Panama.Core/CDC

The remaining IProcess classes to be implemented can be found here:
https://github.com/mrogunlana/Panama.Core/tree/panama-core-v6/Panama.Core.CDC.MySQL/Processors

This is the container I'm using for the Panama.Core.CDC.MySQL package https://github.com/mrogunlana/Panama.Core/blob/panama-core-v6/Containers/mysql-8/scripts/master.sql for the LogTailingProcessor which subscribes to the low level data change events of mysql using rusuly/mysqlcdc github package.

I'm thinking we want to add an abstraction around https://github.com/mrogunlana/Panama.Core/blob/cab7b9fe975c7264bfd204102e6f479bf3d0e5f5/Panama.Core.CDC.MySQL/Processors/LogTailingProcessor.cs#L81 where it calls the IBroker.Publish to allow us the ability to intercept and observe messages before being sent to the broker - but maybe not?

We also have to decide/design ISubscribe and if we want to provide attribute level subscription (see the dotnetcore/cap project - they use a combination of interface and attribute subscription)

The database tables are:

  1. Outbox - table that is observed by the LogTailingProcessor which passes the data to the broker. Under the covers, we would insert into this table in the same database transaction that we insert into our entity tables e.g. when a new user is inserted, we also insert a message on the outbox table if we want to sent a user.created message on the message broker. The actual data inserted into the outbox table can be deleted immediately following the insert or at some point in the future by a clean up processor since the mysqlcdc package reads the bin transaction log of the mysql database which only appens on a successful transaction
  2. Published - here is where things get tricky. We could actually use the published table as the "outbox" table since technically we would be inserting into this table anyway on or around the original outbox insert where this table maintains the retry history of the "failed" transaction based on a expiration time. For example, if we just used the published table instead of having an outbox table, the workfliw would look like so: a new user is inserted and a published record is inserted with a status of "queued". The LogTailingProcessor would observe that a new published record was inserted and pass its message to the broker. Once the broker published the message, the status of the published record would be changed from "queued" to "sent". In the scenario where there was a failure to send, the record would then be updated from "queued" to "failed". This is where the PublishedRetryProcessor would kick in where it would query the last X messages and try to resent to the broker based on the Retry count limit. If we subscribe directly to the published table instead of having an outbox table, it would allow users who prefer polling to use the coming PollingProcessor to retrieve the messages queue in the published table on an interval. Thinking of this now, I think we should delete the outbox table and use the published table instead. Besides, we would have an ExpiredMessageRemovalProcessor to delete all Expired messages regardless of their status. Thoughts?
  3. Received - once a message comes off the message broker on the other side, it should be inserted into the received table prior to being passed to the ISubscribe listeners. The LogTailingProcessor should also be filtering insert database events from the ReceivedTableId which can be passed from the options. To get a table Id in mysql, you'd have to inspect the information_schema (see: https://github.com/mrogunlana/Panama.Core/blob/cab7b9fe975c7264bfd204102e6f479bf3d0e5f5/Panama.Core.CDC.MySQL/Processors/LogTailingProcessor.cs#L35 for guidance). We should also consider adding an abstraction between the processor and the subscribe publish event to intercept, observe, etc.
  4. Lock - see the use of the lock table in dotnetcore/cap. This table is used in the same fashion where it prevents/synchronizes parallel processing of retries on the published and received table. For example, before updates are made on the published and received tables a "lock" is acquired like so: https://github.com/mrogunlana/Panama.Core/blob/cab7b9fe975c7264bfd204102e6f479bf3d0e5f5/Panama.Core.CDC.MySQL/Extensions/MySqlCdcOptionsExtensions.cs#L79 depending on if the response is true, then updates are made where the lock is then released liked so https://github.com/mrogunlana/Panama.Core/blob/cab7b9fe975c7264bfd204102e6f479bf3d0e5f5/Panama.Core.CDC.MySQL/Extensions/MySqlCdcOptionsExtensions.cs#L124 see the dotnetcore/cap project use of renewing the lock, I believe it's done to add more time to process:
    https://github.com/mrogunlana/Panama.Core/blob/cab7b9fe975c7264bfd204102e6f479bf3d0e5f5/Panama.Core.CDC.MySQL/Extensions/MySqlCdcOptionsExtensions.cs#L162

Hopefully this give a better context of the design

@mrogunlana
Copy link
Owner Author

Also, in the dotnetcore/cap package their published table has a status of delayed where in the event of a shutdown, the cancellation token is used to cancel all process events but first all messages not sent and not expired are set to delayed where their expiration date/time is extended. This means we'll have e to have a DelayedMessageProcessor that will pick theses messages up to send to the broker if it's a Published message or to subscriber if it's a Received message.

@AbdulmueezEmiola
Copy link
Collaborator

Worked on this #22 (comment) earlier today, just pushing it now, I'll check out the project and your explanation now

@mrogunlana
Copy link
Owner Author

I've moved towards having a Published and Received table(s) only instead o the outbox in the latest commit. I'm in the process of filling out the IStore impl for Panama.Core.CDC.MySQL should be done with the IStore implementation soon. Any thoughts so far on my above comments after looking at the source?

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 5, 2023

I'm thinking the handler would look like this in v6:

var handler = _provider.GetService<IHandler>()
	.Add(user)
	.Validate<FirstName>()
	.Validate<LastName>()
	.Validate<Email>()
	.Query<GetUser>()
	.Command<PrepareUser>()
	.Command<UpdateUser>()
	.Rollback<RestoreUser>()
	.Invoke();

commands would look something like this in v6:

public class UpdateUser : ICommand
{
	private IBus _bus;
	private UserDbContext _store;
	
	public UpdateUser(
		  IBus bus
		, UserDbContext store)
	{
		_bus = bus;
		_store = store;
	}
	public async Task Execute(Context context)
	{
		var user = data.GetSingle<User>();
		
		using (var transaction = _store.Database.BeginTransaction(_bus, autoCommit: true))
                {
			_store.Entry(user).State = EntityState.Modified;
			
                        await _store.SaveChangesAsync();
			
			//To publish to specific broker(s)
                        await _bus.PublishAsync<RabbitMqBroker>("user.created", user);
			await _bus.PublishAsync<EventHubBroker>("user.created", user);
			
			//To publish to all registered brokers
                        await _bus.PublishAsync("user.created", user);
                }
	}
}

note the multi-publish option by strongly typed broker; it's just a thought, but I think it would increase adoption if we provided this support. Thoughts?

@AbdulmueezEmiola
Copy link
Collaborator

  • So, I can see how the IStore is meant to work. It's meant to be called by processors.
  • I can also see some Todo in the processors, I'll try to do them and get a better understanding of how things work.

From here #22 (comment)

  • The handler structure looks the same with what we currently have.
  • I'm a bit confused about how somethings work here in the command. From my understanding of the published table, it's meant to queue up items for publishing to the brokers, but here in the execute method of the UpdateUser class, we self-publish to different brokers. Can you give a more high level overview of how the whole thing is connected. I sort of understand each unit individually.

@AbdulmueezEmiola
Copy link
Collaborator

AbdulmueezEmiola commented Mar 6, 2023

To support multiple brokers, I think we need to separate the outbox and published table. That way, we can easily track the status of each of the brokers

@mrogunlana
Copy link
Owner Author

the dotnetcore/cap project implements an event bus design pattern using a dispatcher. the event bus is encapsulated by the IDispatch interface which manages a transient/async queue. But the part that isn't so obvious is that the messages from the queue are "eventually" written to a channel which is associated to a broker e.g. rabbit, event hub, etc. Now the channels and the broker(s) are bootstrapped by the dispatcher which is a IService that runs a collection of IProcess implementations that watch the bus queue and write the messages to the channels by group. the dotnetcore/cap library has a IDispatcher.PerGroup which alludes to how this works. In Panama.Core, I'd like to provide this feature to the developer. If you take a look at the Published and Received tables, they have a field name Group which is the broker type.

You'll also notice that I've added InternalMessage, TransientMessage, and Message classes. The InternalMessage is meant to represent the message that we save to our tables internally which has a Content field that contains the actual Message which has headers and a body. The TransientMessage will be used to write the Message to the channel associated to a particular broker.

Here's my thoughts on:

To support multiple brokers, I think we need to separate the outbox and published table. That way, we can easily track the status of each of the brokers

We'll track or identify which broker owns which message in the Published and Received table by the Group column. There may be some possible collisions in retrying messages and/or polling messages which can be overcome by using the Lock table for retries and by not doing "dirty" reads from the tables which should hopefully be accomplished with our current IStore implementation 😄 I know reading the transaction bin may be tricky, but if we stick to the status we should be good (we may have to add a Processing status -- maybe but we'll see in testing)

Here's my thoughts on:

From my understanding of the published table, it's meant to queue up items for publishing to the brokers, but here in the execute method of the UpdateUser class, we self-publish to different brokers. Can you give a more high level overview of how the whole thing is connected. I sort of understand each unit individually.

The publish inside the ICommand is to enqueue the message to be sent on the event bus which eventually is sent to the message broker by a IDispatcher implementation; the dotnetcore/cap project adds an abstraction about the distributing the message to and from the incoming channel to its brokers and subscribers for tracing purposes.

These are all great questions, hopefully it's making sense now? Let me know. If you want to jump in, I'm in the process of implementing the dispatcher and bus and would like to see where you take it. Feel free to move stuff around, delete/add stuff etc. the dotnetcore/cap project is a good rubric for the event bus implementation, but I know we can do better 😃

@mrogunlana
Copy link
Owner Author

Thinking a little more on adding the Outbox table back and you have a point... it may make things easier for us if we logically separated the bin log table from the Published and Received tables.

We could set the LogTailingProcess to listen to the Outbox table; then that way the message would end up in the Published table with a status of Succeeded or Failed and would then be picked up by the retry process. Once the message is sent it would end up in the Received table naturally. This way we wouldn't have to add funky logic around statuses

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 6, 2023

What if we made the architectural decision of supporting one IStore instance e.g. mysql, mssql etc. per Panama.Core instance? We could then provide a database transaction per handler internally which would prevent developers from having to remember to wrap all their database interactions within validators, commands, queries in transactions which is almost certain to happen without proper training on pseudo-ACID transactions within microservices. The point of v6 is to make Panama.Core a support distributed transactions within a microservice or event/domain-driven designed service architecture. In that world, the domain or database boundaries are well-defined which means that supporting the multi-store use case would not be supported. In other words, at a basis, users of Panama.Core would follow the database per service design pattern.

Given that the handler would have an ambient transaction, in the event of an exception, the transaction would rollback thereby undoing the database interactions of the command(s)

Here is what our UpdateUser command would look given the ambient transaction, notice how we're queueing with the brokers instead of doing a publish; and, also how we're not injecting a bus:

public class UpdateUser : ICommand
{
	public async Task Execute(Context context)
	{
		var user = data.GetSingle<User>();
		
		context.Store<UserDbContext>().Entry(user).State = EntityState.Modified;
                context.Store<UserDbContext>().SaveChangesAsync();
		context.Queue<RabbitMq>("user.updated", user);
		context.Queue<Kafka>("user.updated", user);
	}
}

Note: I'm still deciding the best way to provide store and broker access in the confides of a command. For now, I illustrated it being extended off the context provided to the command. Ideas?

OR:

public class UpdateUser : ICommand
{
	private IStore<UserContext> _store;
	private IPublish<RabbitMq> _rabbit;
	private IPublish<Kafka> _kafka;
	
	public UpdateUser(IStore<UserContext> store)
	{
		_store = store;
	}
	public async Task Execute(Context context)
	{
		var user = data.GetSingle<User>();
		
		_store.Entry(user).State = EntityState.Modified;
		
		_rabbit.Queue("user.updated", user);
		_kafka.Queue("user.updated", user);
	}
}

We could also make the decision to allow developers to queue messages to brokers that get published eventually if the transaction commits internally successfully. I'm still thinking through how exceptions will bubble up given that the transaction will commit downstream of the command performing the database operation.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 6, 2023

we could clean the above command up more by moving the publish events within their own ICommand like so:

var handler = _provider.GetService<IHandler>()
	.Add(user)
	.Validate<FirstName>()
	.Validate<LastName>()
	.Validate<Email>()
	.Query<GetUser>()
	.Command<PrepareUser>()
	.Command<UpdateUser>()
        .Command<PublishUserUpdates>()
	.Invoke();
public class UpdateUser : ICommand
{
	public async Task Execute(Context context)
	{
		var user = data.GetSingle<User>();
		
		context.Store<UserDbContext>().Entry(user).State = EntityState.Modified;
                context.Store.SaveChangesAsync();
	}
}
public class PublishUserUpdates : ICommand
{
	public async Task Execute(Context context)
	{
		var user = data.GetSingle<User>();
		
		context.Queue<RabbitMq>("user.updated", user);
		context.Queue<Kafka>("user.updated", user);
	}
}

Also note that in this scenario we won't need a RestoreUser since the database transaction would rollback the user updates automatically with the ambient transaction of the handler.

@mrogunlana
Copy link
Owner Author

Other idea for handler:

var handler = _provider.GetService<IHandler<User>>()
	.Add(user)
	// notice that the validators are gone in favor of attribute driven validation
        // we could also support custom IValidate classes on the handler as shown above
	.Query<GetUser>()
	.Command<PrepareUser>()
	.Command<UpdateUser>()
        .Publish<RabbitMq>("user.updated", (context) => context.GetSingle<User>())
        .Publish<Kafka>("user.updated", (context) => context.GetSingle<User>())
	.Invoke();

where the User model would be decorated with validate attributes like so:

public class User : IModel
{
	[NotNullOrEmpty]
	public string FirstName { get; set; }
	
	[NotNullOrEmpty]
	public string LastName { get; set; }
	
	[ValidEmail]
	public string Email { get; set; }
}

Let me know which of these you believe would be more palatable for developers.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 7, 2023

I'm leaning more toward command design where Context is extended to provide the Store and Publish using generics as its seemingly less verbose. It should be relatively straight-forward to implement as it's basically using a sort of extension design pattern which is made possible by the IServiceProvider being present on the Context:

public class UpdateUser : ICommand
{
	public async Task Execute(Context context)
	{
		var user = data.GetSingle<User>();
		
		context.Store<UserDbContext>().Entry(user).State = EntityState.Modified;
                context.Store<UserDbContext>().SaveChangesAsync();

		context.Publish<RabbitMq>("user.updated", user);
		context.Publish<Kafka>("user.updated", user);
               
                //or.. developers can simply publish to all known brokers registered like so: 
                context.Publish("user.updated", user);
	}
}

Keep in mind, developers can still inject dependencies via constructor of the command to get the same result:

public class UpdateUser : ICommand
{
	private UserDbContext _store;
	private IBus _bus;
	
	public UpdateUser(UserDbContext store, IBus bus)
	{
		_store = store;
		_bus = bus;
	}
	
	public async Task Execute(Context context)
	{
		var user = data.GetSingle<User>();
		
		_store.Entry(user).State = EntityState.Modified;
                _store.SaveChangesAsync();

		_bus.Publish<RabbitMq>("user.updated", user);
		_bus.Publish<Kafka>("user.updated", user);

                //or.. developers can simply publish to all known brokers registered like so: 
                _bus.Publish("user.updated", user);
	}
}

I'm liking this version of the handler stack as opposed to the expression based handler for simplicity sake:

var handler = _provider.GetService<IHandler>()
	.Add(user)
	.Validate<FirstName>()
	.Validate<LastName>()
	.Validate<Email>()
	.Query<GetUser>()
	.Command<PrepareUser>()
	.Command<UpdateUser>()
        .Command<PublishUserUpdates>()
	.Invoke();

note: I like the keyword Publish as a opposed to Queue in the context of the command to create a logical distinction between the adding data to the payload via context.Add() and the interaction with the message brokers internally via context.Publish(); although, context.Publish() essentially queues the message for publishing and sends if and only if the transaction is committed successfully internally.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 7, 2023

I do like the concept of expression based validators though; but things can get tricky in handler scenarios using collections -- but I guess if the handler is type specific e.g.

var handler = _provider.GetService<IHandler<User>>()
	.Add(users)
	.Query<GetUsers>()
	.Command<PrepareUsers>()
	.Command<UpdateUsers>()
	.Command<PublishUserUpdates>()
    .Invoke();

the handler would be "smart" enough to determine that there is a list and iterate/validate.

Maybe we can roll out attribute and expression based validators in a later version after the initial v6 release?

@mrogunlana
Copy link
Owner Author

Also note, if we introduce an Outbox for messaged being Published to the brokers, we'll also have to introduce an Inbox for messages Received from the brokers.

Just having an Outbox and Inbox alone would suffice if we didn't care about retrying failed messages due to service/network/broker failures or for throttling purposes; This is why I think we'll need the Outbox, Inbox, Published, and Received tables which will all essentially be copies of each other. If messages are Scheduled to be sent in the future or if messages are Delayed due to service shutdown etc. they would sit inside the Published and Received tables until the retries occur or expiration

@AbdulmueezEmiola
Copy link
Collaborator

dotnetcore/cap project

About the 2 structures here, the syntax of the second approach makes it a bit easier for users to understand that they can support multiple brokers and means we don't have to write our own abstraction layer over the brokers.

@AbdulmueezEmiola
Copy link
Collaborator

I do like the concept of expression based validators though; but things can get tricky in handler scenarios using collections -- but I guess if the handler is type specific e.g.

var handler = _provider.GetService<IHandler<User>>()
	.Add(users)
	.Query<GetUsers>()
	.Command<PrepareUsers>()
	.Command<UpdateUsers>()
	.Command<PublishUserUpdates>()
    .Invoke();

the handler would be "smart" enough to determine that there is a list and iterate/validate.

Maybe we can roll out attribute and expression based validators in a later version after the initial v6 release?

I actually tried to explore this using a sort of decorator pattern. Python uses an attribute like syntax to do this, but .net doesn't seem to support something like that. Except if we do reflection. Seemed to complicate the codebase more so I discarded that line of thought

@AbdulmueezEmiola
Copy link
Collaborator

dotnetcore/cap project

About the 2 structures here, the syntax of the second approach makes it a bit easier for users to understand that they can support multiple brokers and means we don't have to write our own abstraction layer over the brokers.

On another thought, we actually need the abstraction as it will be how we will update the published table and identify the broker that was used.

@AbdulmueezEmiola
Copy link
Collaborator

I think I understand how the publishing works back. For the subscription aspect, an issue is to update the received table. Do we have an entry for every item that subscribes to the event?

@AbdulmueezEmiola
Copy link
Collaborator

Apart from that, I can start implementing the publishing side of things. Can you list some tasks for me to do on this so I can ease into working on this. I have a few ideas I'll still prefer a little bit of handholding

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 7, 2023

For publishing:

  • We need implementation of the IInvokeHandler impl specifically for the Panama.Core.CDC.MySQL project that wraps the valiators, queries, and commands in a database transaction; it should also wrap the rollbacks in a different database transaction
  • It's registrar should replace the Panama.Core IInvokeHandler with the Panama.Core.CDC.MySQL version using something like this https://andrewlock.net/exploring-dotnet-6-part-10-new-dependency-injection-features-in-dotnet-6/#detecting-if-a-service-is-registered-in-the-di-container
  • Implement IBus publish methods - the dotnetcore/cap project uses a Channel to write messages to from an async queue that it reads in its consumer processor. This all seems like overkill because the consumer processor interates over a long running while loop. Couldn't they just read the async queue directly? The whole concept of a Channel is to have listeners on the Channel to process the messages written. I wonder if we could have "listeners" subscribe to the channels instead of a long running while loop? Ideas? See the dotnetcore/cap ICapPublisher.Default for inspiration. I think we'll be deviating somewhat from their implementation here.
  • Implement the IDispatcher which runs as a IService. It's job is what an operator is to a phone line which is to act as the go between for messages ready for publishing to a message broker. It would listen to the Channel specific to the Group e.g. RabbitMq, etc. and publish to the broker and/or persist the scheduled/delayed message(s) for later processing. See the dotnetcore/cap IDispatcher.Default for an example implementation and for inspiration - I think we'll be deviating somewhat from their dispatch design here as well.
  • I assume you prefer this version of the handler command design: Upgrade to Panama.Core v6 #22 (comment) ? If so we'll need the IStore and IBus extensions off the Context present in the Panama.Core.CDC.MySQL project
  • I assume you agree with having a Outbox for Published and an Inbox for Received tables as described here: Upgrade to Panama.Core v6 #22 (comment) ? If so we'll need the Outbox and Inbox tables to be initialized in the Panama.Core.CDC.MySQL project for the LogTailingProcess
  • The IBus will have to be updated to allow type specific references to brokers on its publish methods. I'm thinking we can either provide an IBroker implementation for devs for rabbit etc. that they would reference? I'm open to ideas here.

I'll be starting some of these today too.

Edit: I'm on the fence about forcing a single database per handler as this will put us in a difficult situation when we support the concept of read models which are stores built based off events in Event Sourcing. I'm open for ideas, but I'm leaning towards just allowing developers to explicitly instantiate the transaction.

Idea: maybe we introduce the concept of a TransactionHandler that provides an ambient transaction and for the coming read model support we could navigate this single transaction thing differently?

Or, we can introduce IAmbientCommand<IStore> or ITransientCommand<IStore> that adds a transaction at the command level - this is probably a better approach because we can have commands that point to different stores and perform database operations individually and if the handler fails, we can rollback at a command level

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 7, 2023

Here is a good breakdown of the proper use of Channel:

  1. https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/
  2. https://code-maze.com/dotnet-producer-consumer-channels/
  3. https://www.stevejgordon.co.uk/an-introduction-to-system-threading-channels

TL:DR;
What is a Channel?

A channel is simply a data structure that’s used to store produced data for a consumer to retrieve, and an appropriate synchronization to enable that to happen safely, while also enabling appropriate notifications in both directions.

A channel is a data structure to store data from a Producer, which can then be consumed by one or more Consumers. [...] It is important to note, that unlike a Publisher/Subscriber (Pub/Sub) model, where a message produced can be received by one or more Subscribers concurrently, Channels only allow for one Consumer to read a given message.

A channel is a synchronisation concept which supports passing data between producers and consumers, typically concurrently. One or many producers can write data into the channel, which are then read by one or many consumers. [...] Logically a channel is effectively an efficient, thread-safe queue.

So, basically, because we have a one IService to many IProcess implementations, it makes sense to have a thread-safe way to pass data from one IProcess to another IProcess through the machinations of a Channel. The biggest "gotcha" here is to know that regardless of however many messages on the Channel, each message can only be read by a single Consumer. In other words, a Channel delivers one message from one Producer to one Consumer.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 7, 2023

Here is the bus architecture and data flow to give an idea of how the Outbox, Inbox, Published, and Received tables interact off the context of a Handler:

image

The eye friendly version:
image

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 8, 2023

The only other thing I can think of that we should include is adding the ack and nack as headers on the message in transit which is necessary to distribute the transactions appropriately across service boundaries.

For example, in the world of eCommerce, we could have a multi-step/service order creation process that creates an order, updates the product inventory, processes payment, and then ships the order. Through ack and nack we can choreograph the steps like so:

public class CreateOrder : ICommand
{
	private OrderDbContext _store;
	private IBus _bus;
	
	public CreateOrder(OrderDbContext store, IBus bus)
	{
		_store = store;
		_bus = bus;
	}
	
	public async Task Execute(Context context)
	{
		var order = data.GetSingle<Order>();
		
		_store.Entry(user).State = EntityState.Modified;
                _store.SaveChangesAsync();

                //this would propagate order data in our service cluster
		_bus.Publish("order.created", order);

                //if this was a multi-step order creation process that included 
                //inventory and shipping, developers could define the next
                //steps through ack and nack topic callbacks: 
                _bus.Publish("products.ordered", order
                     , ack: "order.products.confirmed"
                     , nack: "order.products.failed");
	}
}

Through a series of ack and nack request/responses, we can guarantee consistency of the data model across service boundaries.

For more complicated scenarios, we'll need to support more complex orchestration though the saga pattern. More on this once we get the basic v6 implementation out the door.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 8, 2023

Notes on our message models e.g., the wrapper of entities published on the bus:

The _Message represents the basic schema of our underlying tables. _Message.Content represents the base64 encoded version of the Message which contains a dictionary of headers such as ack, nack, correlation-id, group, and other possible headers, for tracing like send-time, exceptions, etc and the payload which is the entity being published.

The TransientMessage is meant to be converted to bytes and written to our Channel instances. All data published on the bus should be wrapped in a Message first.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Mar 11, 2023

Here's how subscriptions are implemented in v6:

[Broker(typeof(RabbitMq), "user.created", "rabbit.instance.1"]
public class UserCreated : ISubscribe
{
	public string Name => "user.created"
	
	public async Task Event(Context context)
	{
		_ = context.Provider.GetService<IHandler>()
			.Add(context)
			.Command<SaveUser>()
			.Invoke();
	}
}

Broker ITarget e.g., typeof(RabbitMq) can be refactored to [Broker<RabbitMq>("user.created")] once generic attributes are released in C# 11: https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/proposals/csharp-11.0/generic-attributes. Also note that "rabbit.instance.1" is not required but gives developers the opportunity to setup multiple broker instances of the same type.

@mrogunlana
Copy link
Owner Author

mrogunlana commented Apr 2, 2023

Opting for transaction level abstraction over the use of TransactionScope as I'm finding inconsistent behaviors across persistence stores using TransactionScope as a macro level scoping mechanism to enforce atomic transactions. For example, in mssql TransactionScope appears to lock at the table level whereas when using mysql TransactionScope appears to lock at the row level. As a result, a more "obtrusive" implementation of the eventbus would look like so:

using(var channel = new DefaultChannel(provider)) 
{
	channel.Post("xxx.now", o);
	channel.Post("..", ..);
	channel.Post("..", ..);
	
	channel.Commit();
}

The DefaultChannel here uses the canal waterway concept of a channel or rather IChannel to provide an abstraction of the persistence store to provide a transaction to guarantee that messages are posted to the same storage medium in the same transaction as entities if its domain.

The IChannel is essentially a proxy of a Transaction with a specific implementation provided by the persistence store API. A mongodb implementation of IChannel would look like so:

using(var channel = new MongoChannel(provider, MongoClient))
{
	...
	channel.Post("xxx.now", o);
	channel.Commit();

}

or, etc:

using(var channel = new EntityChannel(provider, context)) 
{
	context.Add(entity);
	await context.SaveChangesAsync();
	
	channel.Post("xxx.now", o);
	channel.Commit();
}

@mrogunlana mrogunlana linked a pull request May 15, 2023 that will close this issue
@mrogunlana mrogunlana mentioned this issue May 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants