Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Journal reader missing events #102

Open
gfgw opened this issue Sep 3, 2018 · 12 comments
Open

Journal reader missing events #102

gfgw opened this issue Sep 3, 2018 · 12 comments

Comments

@gfgw
Copy link

gfgw commented Sep 3, 2018

In our application we are using Akka.net, with event sourcing. The persistent actors save their events in an SQL Server database.
We also have view actors, which subscribe to these events, using a journal reader/persistence query, to create materialised views. We have a table in the database, that has a row for every view actor. This row contains the name of the view actor and the offset of the last event prccessed.
At first sight, this is working smoothly. Sometimes however, when we run a test that results in thousands of events, the journal reader is missing some events.

A view actor is a ReceiveActor. When started, it retrieves the last handled event offset from the database (called from the actor's constructor). The offset is piped to self in an OffsetMessage.
On receiving the OffsetMessage the view actor initialises the journal reader. On receiving events (in EventEnvelope messages), the views are updated.

The action that is run from the journal reader, first writes a line to the log. That line contains the event offset.
The EventEnvelope receive handler also writes a line to the log. That line also contains the event offset.

We have a test that results in 9635 event inserted into the journal. Sometimes the journal reader and the EventEnvelope receive handler are logging less than 9635 events.
They both log the same numbers, so it seems the events are missed by the journal reader. The missed events from the log are corresponding to the missing items in the views.
We run the test on an empty database. Logging is at the debug level, and does not show exceptions. The missing events (we have seen numbers of 1 to 4) can be among the first, middle or last events. Everytime this is different.

So far we have no idea what is causing this problem, or how it can be solved.

Following are fragments of our code. The view actors all inherit from a base class: ViewActorBase.

internal abstract class ViewActorBase : ReceiveActor, ILogReceive
{
    public ViewActorBase()
    {
        // Some initialisation code
        ....

        this.Receive<OffsetMessage>(this.HandleOffsetMessage);
        this.ReceiveAsync<EventEnvelope>(this.UpdateState);

        var sender = this.Sender;
        var self = this.Self;
        this.GetViewActorOffset(self, sender);
    }

    private void HandleOffsetMessage(OffsetMessage offsetMessage)
    {
        this.InitialiseJournalReader(offsetMessage.Offset);
    }

    private void InitialiseJournalReader(long offset)
    {
        // obtain read journal by plugin id
        var readJournal = PersistenceQuery.Get(Context.System).ReadJournalFor<SqlReadJournal>($"akka.persistence.query");

        // materialize stream, consuming events
        var materializer = ActorMaterializer.Create(Context.System);

        // issue query to journal
        Source<EventEnvelope, NotUsed> source = readJournal.EventsByTag(this.QueryEventTag, new Sequence(offset));

        var self = this.Self;
        source.RunForeach(envelope => { this.Logger.Debug("{Date:HH:mm:ss.fffff} JournalReader.Tell {Offset}", DateTime.Now, (envelope.Offset as Sequence).Value); self.Tell(envelope); }, materializer);
    }

    private void GetViewActorOffset(IActorRef self, IActorRef sender)
    {
        // Initialise repository
        ....

        repository.GetViewActorOffset(this.GetViewName()).PipeTo(self, sender, offset => new OffsetMessage(offset));
    }
}

internal class MyViewActor : ViewActorBase
{
    protected override async Task UpdateState(EventEnvelope envelope)
    {
        var offset = (envelope.Offset as Sequence).Value;

        this.Logger.Debug("{Date:HH:mm:ss.fffff} {MethodName} {Offset}", DateTime.Now, $"{this.GetType().Name}.UpdateState", offset);

        // Update views
        ....
    }
}

Is there something wrong in our code or architecture? Are there better solutions?

Additional information
We have run some tests with SQL Server profiler monitoring the queries to the database.

A query was executed on the event journal, asking for 100 events, starting at offset 204743. The result contained 61 rows.

<Event id="10" name="RPC:Completed">
  <Column id="1" name="TextData">exec sp_executesql N'
        SELECT TOP (@Take)
        e.PersistenceId as PersistenceId, 
        e.SequenceNr as SequenceNr, 
        e.Timestamp as Timestamp, 
        e.IsDeleted as IsDeleted, 
        e.Manifest as Manifest, 
        e.Payload as Payload,
        e.SerializerId as SerializerId,
        e.Ordering as Ordering
        FROM dbo.EventJournal e
        WHERE e.Ordering &gt; @Ordering AND e.Tags LIKE @Tag
        ORDER BY Ordering ASC
        ',N'@Tag nvarchar(10),@Ordering bigint,@Take bigint',@Tag=N'%;Module;%',@Ordering=204743,@Take=100</Column>
  <Column id="9" name="ClientProcessID">1169425116</Column>
  <Column id="10" name="ApplicationName">Core .Net SqlClient Data Provider</Column>
  <Column id="12" name="SPID">82</Column>
  <Column id="13" name="Duration">353890</Column>
  <Column id="14" name="StartTime">2018-08-30T16:32:32.927+02:00</Column>
  <Column id="15" name="EndTime">2018-08-30T16:32:33.28+02:00</Column>
  <Column id="16" name="Reads">326</Column>
  <Column id="17" name="Writes">0</Column>
  <Column id="18" name="CPU">0</Column>
  <Column id="48" name="RowCounts">61</Column>
</Event>

We expexted the next query to start at 204804 (204743 + 61). However, it started at 204810. Why is it skipping (or missing) 6 events?

@Horusiath
Copy link
Contributor

Horusiath commented Sep 3, 2018

@gfgw as you noticed reader stream is a windowed function based on the Ordering column - when a frame of records is being received, stream handler internally records an Ordering of the latest received record and will use it as the constraint for WHERE clause, when the new frame is going to be requested.

Example:

Request 100 rows WHERE Ordering > 0 and Tag = banana

Ordering: 204800, Tag: banana 
Ordering: 204801, Tag: banana 
Ordering: 204802, Tag: banana 
Ordering: 204803, Tag: banana
Ordering: 204804, Tag: apple
...
Ordering: 204809, Tag: apple
Ordering: 204810, Tag: banana --> this is 61st element, we skipped 6 elements since they didn't match the tag

Request 5 rows WHERE Ordering > 204810 and tag = banana

....

@gfgw
Copy link
Author

gfgw commented Sep 3, 2018

@Horusiath Thank you for the explanation. The question is however: how is it possible that there is a gap between two requests? Starting at offset 204743 the request asks for 100 events. The query resurns 61 rows (because there are no more events at that time). Next request asks for events starting at 204810, which is 67 higher than the previous. So I am missing 6 events. How can that happen?

@Horusiath
Copy link
Contributor

@gfgw if you're reading by tag, offsets are not necessarily monotonic - are you sure, that these 6 event comply to a tag, you were looking for? I've updated the example to present exactly the case you may be talking about.

@gfgw
Copy link
Author

gfgw commented Sep 4, 2018

@Horusiath No, all events in that range have the same tag.

@ondrejpialek
Copy link

Hello, we have a similar setup and are also experiencing the very same issue. Occasionally a random event is missed by replay and does not make its way to the projection. This is obviously a serious problem as replaying events is a fundamental feature of Akka.

From the logs it appears that the replay actor loads two events (logs current offset = requested offset + 2) but only sends a single event to the projection. This does not always happen; other times when many events are read all make it to the projecting actor. It also has nothing to do with the messages themselves, they can be deserialized fine when fetched via other means.

Replay logs:
screenshot_2018-10-11 seq - replay

Projection logs:
screenshot_2018-10-11 seq - projection

There are no errors or any other messages around that time that would explain this (e.g. no dead letters etc), we also have retry logic around the reading & projection restarting the stream but that is not triggered. It seems that the replay actors knows there is some kind of event there but does not send it for some reason.

I wonder if this could somehow be caused by concurrency on the write side? From the logs it seems that there were to events, 55022 and 55023. The replay asked for new events since 55021 and moved the current offset to 55023. I wonder if this might have been caused by the transaction inserting event 55023 committing earlier than the transaction saving 55022, causing the reader to see only the later event, and skipping over row 55022 which might have not been committed yet? If so is there some required DB settings that would prevent this concurrency? Or is there some setting that can be applied to the SQL query to guarantee it does not skip over uncommitted rows?

For completeness this is running in Azure, Akka on a few services on Service Fabric with high concurrency on the write side. The SQL Server is also on Azure, an ordinary SQL DB inside a SQL Server Elastic Pool.

Many thanks,
Ondrej.

@ondrejpialek
Copy link

Ok, I've spent some more time reading and googling around. It seems that relaying on the order of an Identity column is flawed and rows might silently be skipped over, see https://dba.stackexchange.com/questions/133556/can-i-rely-on-reading-sql-server-identity-values-in-order

@Horusiath is there something I am missing or do you agree that the current implementation might be lead to missed events due to the problem described above?

@ondrejpialek
Copy link

I think it's the same problem as akkadotnet/akka.net#3590

@Horusiath
Copy link
Contributor

@ondrejpialek we might have need to make a change in SQL plugins to make journal responsible for autoincrementing offset. I'll try to prepare PR for that this weekend.

@ondrejpialek
Copy link

ondrejpialek commented Oct 12, 2018

Would that solve the problem though? If you have two competing writers (two processes running) then you cannot guarantee which one writes their message first, right? So you could still end up with a wrong order & Persistence Query accidentally skipping over events with lower IDs persisted later.

I think that the only way to prevent IDs persisted out of order is to remove the concurrency on the write side by serializing the writes. Something like having the Journal as a cluster singleton (for distributed scenarios).

Alternatively a "staging table" could be created as form of a FIFO queue to which concurrent processes would write (at this point the offset would not be generated yet) and a single "processor" would move rows from this queue to the journal table, ensuring that the rows in the Journal table are persisted in the order of the Identity column. If this "processor" is some form of a SQL server job then no coordination is needed between different Akka processes. If the Journal however needs to know the offset of the written event then some changes will need to be made so that the offset is read after the row has been moved from the staging table to the journal table.

Overall I think that any of these approaches would lead to lower performance (which is OK imho as first and foremost the system needs to be correct), and the changes would be pretty severe.

I also do want to note that it seems that only the Persistence Query component is affected by this. The question is whether ensuring that rows are in order just for the sake of the Persistence Query is worth it. Perhaps there are other "smarter" ways to fix Persistence Query. For example, if PQ only considered events older than say 1 minute then the likelihood of "missing" events due to a allocated Identity whose row was not written yet is very low. If this is configurable then people have a chance to tweak this to fit their needs.

Some other alternatives would include "gap" detection on the Query side. A gap in IDs (e.g. reading 1,2,3,5 with 4 being committed only after the read is done) can be caused by the following cases:

  • the race condition mentioned here
  • a deleted event
  • a rolled-back transaction

The query could be modified to read deleted events as well to ensure a false positive is not triggered for this particular case (the query could be written so that deleted events do not send their payload). Then a gap in the IDs either means that we encountered this problem or it's a genuine gap due to a rolled-back transaction. At this point the reader would have to retry at a later time to see if the gap can be filled or it could yield the events it has so far but remember it missed number 4 and retry a select for just that ID at a later time. The problem is that a rolled-back transaction would never fill that gap and therefore the reader would have to give up at some point. For systems that generate a lot of these genuine gaps retrying would slow the queries significantly; that being said though the system could ignore gaps older than a few minutes as it is not likely there is a transaction hanging for such a long time. So rebuilding a projection would be as fast as it is now, just a live query would perhaps retry once or twice in case a gap is encountered.

This is a lot of text and I apologize for that, I hope it makes at least some sense :)

NB: Other "Event Stores" based on traditional RDBMS have the same problem, see NEventStore with MSSQL and PostgeSQL: NEventStore/NEventStore.Persistence.SQL#21

@ondrejpialek
Copy link

ondrejpialek commented Oct 12, 2018

Also - they seem to encourage setting READ_COMMITTED_SNAPSHOT to OFF, as it should reduce the risk of ignoring uncommited rows. This should greatly reduce the risk of reading events out of order but it sadly does not make the problem go away completely. On our system where we've seen this bug occur the past two days (as we are increasing load) we have READ_COMMITTED_SNAPSHOT set to ON, which might explain why it can be seen this often. I am waiting an ack from the client to reconfigure the DB to turn this off so 🤞 that the number of issues will go down. I should be noted somewhere in the documentation that this is the correct configuration though. The default used to be OFF but perhaps MS changed this recently on their latest azure environment 🤷‍♂️

@ondrejpialek
Copy link

A very informative debate around the very same problem: NEventStore/NEventStore#425

@Horusiath
Copy link
Contributor

@ondrejpialek I don't want to overcomplicate things here. Any change done here will quite probably have a negative performance impact: some of which can be amortized by batching journal capabilities.

The easiest option here seems to be using a single db connection per journal at the time - as all SQL providers use either TCP or IPC for communication, we should have a global order of events send by journal.

With that all we need to do is to make sure, that journal doesn't reorder write requests it received before sending them to database. The question here would be about the limitations of using a single DB connection. Of course, this comes with traditional issues (i.e. head of line blocking), but we need actual tests to be sure about how serious impact does it have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants