-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
High memory usage #346
Comments
@tudorFrizeru I've taken a look at your dump file and it appears that the growth may be in/around unmanaged memory which should not typically be an issue with Akka.Persistence.Sql. If I had to guess, I'm betting this is related to a couple of bugs in 8.0 around Native thread local storage cleanup (dotnet/runtime#95722) that won't get released till the 2024/02/13 (about a week from now). Examples of other people running into similar issues on .NET8: dotnet/runtime#96581 Will keep digging in repo code however. |
@to11mtm thank you for your quick response! I downgraded to .net 6 again and the problem remains the same. Here's a screenshot with the managed memory allocations. |
@tudorFrizeru If you could that may help, there are a few other possibilities and that may narrow things down. |
🤔 Question, is this running in IIS or via console? |
The application is hosted as a windows service (using TopShelf for window service hosting, running http.sys as a web server). We do not use IIS. |
@to11mtm FYI, I managed to isolate the problem to the Akka.Persistence.Sql package : I reverted back to using the Akka.persistence.SqlServer, with .net 8, and I have no memory issues anymore. I will be happy to test again once there's a fix available! |
Thank you for that feedback! I do have a PR up that I believe will solve some issues. Was there anything in your app doing an |
Hello everyone, To avoid this, refrain from utilizing the DbContext property directly within the class. Instead, opt to use it within methods. The recommended approach is to instantiate a new DbContext within each method call. You can achieve this either by creating a new instance explicitly (new DbContext()), or by retrieving it from the services (services.GetService()). This practice ensures that each method operates with a fresh DbContext, preventing potential conflicts and ensuring better management of resources. |
What does that have to do with this project? We use Linq2Db, not EF. |
I think it has nothing to do with the Entity Framework, but with Net8.0 and the GC. In my case, the dbContext cache was not released by the GC as long as the class was used. In net4.8 it worked as expected. I thought there might be a similar problem with Linq2Db. |
I think in the Query DAO all callbacks for Some of them already are static Akka.Persistence.Sql/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs Line 117 in fe49a6b
While others are not Akka.Persistence.Sql/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs Line 151 in fe49a6b
Akka.Persistence.Sql/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs Line 60 in fe49a6b
It seems to me that |
Agreed and I thought I started working on fixing this at one point. >_< @anpin If you'd like to submit a PR I'll happily review, or I can try to check my branches (switched laptops over weekend so a bit disorganized at the moment, apologies.) Another thing that needs to be changed, is the default number of sequences grabbed in a fetch. At one point we were grabbing up to 10,000 ordering values at a time in tl;dr - max ordering per query should be a number where the final array won't pollute LOH. given the size, I'd say 2000 is a safer-ish number in general, around 32Kib. 4000 might be OK as well but eh? |
I definitely should, but need to run benchmarks first |
tried to make these calls static, but it doesn't seem to make any difference. In fact even makes it worse.
Before
After
Changesdiff --git a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
index d062b73..512a3bf 100644
--- a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
+++ b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
@@ -47,17 +47,14 @@ namespace Akka.Persistence.Sql.Query.Dao
_deserializeFlow = serializer.DeserializeFlow();
}
- public Source<string, NotUsed> AllPersistenceIdsSource(long max)
- {
- var maxTake = MaxTake(max);
-
- return AsyncSource<string>.FromEnumerable(
- new { _dbStateHolder, maxTake },
- static async input =>
+ public Source<string, NotUsed> AllPersistenceIdsSource(long max) =>
+ AsyncSource<string>.FromEnumerable(
+ new { _dbStateHolder, maxTake = MaxTake(max) },
+ static async input =>
{
return await input._dbStateHolder.ExecuteWithTransactionAsync(
input.maxTake,
- async (connection, token,take) =>
+ static async (connection, token, take) =>
{
return await connection
.GetTable<JournalRow>()
@@ -68,7 +65,6 @@ namespace Akka.Persistence.Sql.Query.Dao
.ToListAsync(token);
});
});
- }
public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> EventsByTag(
string tag,
@@ -83,11 +79,9 @@ namespace Akka.Persistence.Sql.Query.Dao
{
TagMode.Csv => AsyncSource<JournalRow>
.FromEnumerable(
- new { args= new QueryArgs(offset,maxOffset,maxTake,
- $"{separator}{tag}{separator}"), _dbStateHolder },
+ new { _dbStateHolder, args = new QueryArgs(offset,maxOffset,maxTake,$"{separator}{tag}{separator}") },
static async input =>
{
- //var tagValue = input.tag;
return await input._dbStateHolder.ExecuteWithTransactionAsync(
input.args,
static async (connection, token, inVals) =>
@@ -109,7 +103,7 @@ namespace Akka.Persistence.Sql.Query.Dao
TagMode.TagTable => AsyncSource<JournalRow>
.FromEnumerable(
- new { _dbStateHolder, args= new QueryArgs(offset,maxOffset,maxTake,tag)},
+ new {_dbStateHolder, args = new QueryArgs(offset,maxOffset,maxTake,tag) },
static async input =>
{
return await input._dbStateHolder.ExecuteWithTransactionAsync(
@@ -143,12 +137,12 @@ namespace Akka.Persistence.Sql.Query.Dao
=> Task.FromResult(
AsyncSource<JournalRow>
.FromEnumerable(
- new { persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max), _dbStateHolder },
+ new { _dbStateHolder, persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max) },
static async state =>
{
return await state._dbStateHolder.ExecuteWithTransactionAsync(
state,
- async (connection, token, txState) =>
+ static async (connection, token, txState) =>
{
var query = connection
.GetTable<JournalRow>()
@@ -182,7 +176,7 @@ namespace Akka.Persistence.Sql.Query.Dao
public Source<long, NotUsed> JournalSequence(long offset, long limit)
{
return AsyncSource<long>.FromEnumerable(
- new { maxTake = MaxTake(limit), offset, _dbStateHolder },
+ new {_dbStateHolder, offset, maxTake = MaxTake(limit) },
async input =>
{
return await input._dbStateHolder.ExecuteWithTransactionAsync(
@@ -207,7 +201,7 @@ namespace Akka.Persistence.Sql.Query.Dao
return await ConnectionFactory.ExecuteWithTransactionAsync(
ReadIsolationLevel,
ShutdownToken,
- async (connection, token) =>
+ static async (connection, token) =>
{
// persistence-jdbc does not filter deleted here.
var result = await connection
@@ -233,18 +227,15 @@ namespace Akka.Persistence.Sql.Query.Dao
var maxTake = MaxTake(max);
return AsyncSource<JournalRow>.FromEnumerable(
- new {_dbStateHolder , args=new QueryArgs(offset,maxOffset,maxTake) },
- static async input =>
- {
- return await ExecuteEventQuery(input._dbStateHolder, input._dbStateHolder.Mode, input.args);
- }
- ).Via(_deserializeFlow);
+ new {_dbStateHolder , args = new QueryArgs(offset,maxOffset,maxTake) },
+ static args => ExecuteEventQuery(args._dbStateHolder,args.args)
+ ).Via(_deserializeFlow);
}
-
-
- internal static async Task<List<JournalRow>> ExecuteEventQuery(DbStateHolder stateHolder, TagMode tagMode, QueryArgs queryArgs)
+
+
+ internal static async Task<IEnumerable<JournalRow>> ExecuteEventQuery(DbStateHolder stateHolder, QueryArgs queryArgs)
{
- return tagMode != TagMode.TagTable
+ return stateHolder.Mode != TagMode.TagTable
? await ExecuteEventQueryNonTagTable(stateHolder, queryArgs)
: await ExecuteEventQueryTagTable(stateHolder, queryArgs);
} |
Version Information
AKKA 1.5.16
Modules:
AKKA.Persistence.SQL 1.5.13,
AKKA Streams 1.5.16,
AKKA Serilog 1.5.12.1
Describe the performance issue
After migrating to AKKA 1.5, .NET 8 and the new AKKA.Persistence.SQL, the memory usage has increased to 1 GB (sometimes 2 GB between separate runs, with no changes).
Before the upgrade, the project was running AKKA 1.4.49, .NET 6 and the AKKA.Persistence.SqlServer 1.4.35 and the memory allocation was flattening at 400 MB (mainly due to asp.net core hosting).
I did the upgrade performing the complete database schema upgrade with the migration of Tags to the seaprate table and enabling the UUID in the EventJournal.
Data and Specs
Memory dump, sample persistence configuration and a screenshot with the diagnostic events available here
Expected behavior
Similar memory alocation as before
Actual behavior
Memory allocation has increased to 1 GB, and doesn't increase further (as opposed to 400 MB before the upgrade), and sometimes for unexplained reasons it caps at 2 GB.
Environment
Running on Windows 11, .NET 8, SQL Server 2022, hosting the ActorSystem in an asp.net core environment.
Not using the new AKKA Hosting, for historical reasons I create the ActorSystem through a delegate when the asp.net server starts.
No Cluster, no Remoting.
Additional context
Since the migraiton, I keep seeing this line in the debug console log, every 1-2 seconds:
[22:54:19 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [9]
[22:54:21 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [10]
[22:54:22 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [11]
[22:54:27 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [12]
[22:54:29 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [13]
[22:54:30 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [14]
I don't know if it is relevant, the Serilog also has a sink to a Loki instance, in addition to the Console sink.
The text was updated successfully, but these errors were encountered: