Skip to content

Commit

Permalink
feat: or-1951 allow individual rebuild of bodies
Browse files Browse the repository at this point in the history
  • Loading branch information
koenmetsu committed Oct 10, 2023
1 parent 27387b0 commit 88ca574
Show file tree
Hide file tree
Showing 11 changed files with 3,896 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ namespace OrganisationRegistry.ElasticSearch.Projections.Body;
using System.Data.Common;
using System.Linq;
using System.Threading.Tasks;
using Bodies;
using BodyClassification.Events;
using BodyClassificationType.Events;
using Client;
using Common;
using Configuration;
using ElasticSearch.Bodies;
using Function.Events;
using Infrastructure;
using Infrastructure.Change;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
namespace OrganisationRegistry.ElasticSearch.Projections.Body;

using System;
using Bodies;
using Client;
using Configuration;
using ElasticSearch.Bodies;
using Infrastructure;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ namespace OrganisationRegistry.ElasticSearch.Projections.Body;
using System;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Bodies;
using Client;
using ElasticSearch.Bodies;
using Osc;

public static class MassUpdateBodyExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace OrganisationRegistry.ElasticSearch.Projections;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Bodies;
using Body;
using Cache;
using Microsoft.EntityFrameworkCore;
Expand Down Expand Up @@ -34,6 +35,7 @@ public EventProcessor(Scheduler scheduler,
PeopleRunner peopleRunner,
CacheRunner cacheRunner,
IndividualRebuildRunner individualRebuildRunner,
IndividualBodyRebuildRunner individualBodyRebuildRunner,
OrganisationsRunner organisationsRunner,
IContextFactory contextFactory)
{
Expand All @@ -51,7 +53,7 @@ public EventProcessor(Scheduler scheduler,
_messagePump = Task.Factory.StartNew(
async () =>
{
var elasticRunners = new ElasticRunners(bodyRunner, peopleRunner, cacheRunner, individualRebuildRunner, organisationsRunner);
var elasticRunners = new ElasticRunners(bodyRunner, peopleRunner, cacheRunner, individualRebuildRunner, individualBodyRebuildRunner, organisationsRunner);
await PumpMessages(scheduler, logger, elasticRunners, contextFactory);
},
_messagePumpCancellation.Token,
Expand Down Expand Up @@ -101,6 +103,7 @@ private async Task CatchUpAction(Scheduler scheduler, ElasticRunners runners)
{
await runners.CacheRunner.Run();
await runners.IndividualRebuildRunner.Run();
await runners.IndividualBodyRebuildRunner.Run();
await runners.PeopleRunner.Run();
await runners.BodyRunner.Run();
await runners.OrganisationsRunner.Run();
Expand Down Expand Up @@ -168,7 +171,7 @@ public CatchUp(int batchSize)
public int BatchSize { get; }
}

private record ElasticRunners(BodyRunner BodyRunner, PeopleRunner PeopleRunner, CacheRunner CacheRunner, IndividualRebuildRunner IndividualRebuildRunner, OrganisationsRunner OrganisationsRunner);
private record ElasticRunners(BodyRunner BodyRunner, PeopleRunner PeopleRunner, CacheRunner CacheRunner, IndividualRebuildRunner IndividualRebuildRunner, IndividualBodyRebuildRunner IndividualBodyRebuildRunner, OrganisationsRunner OrganisationsRunner);

public async Task StartAsync(CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
namespace OrganisationRegistry.ElasticSearch.Projections.Bodies;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Body;
using Client;
using ElasticSearch.Bodies;
using Infrastructure;
using Infrastructure.Change;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using Osc;
using OrganisationRegistry.Infrastructure.Events;
using SqlServer;
using SqlServer.ProjectionState;

public class IndividualBodyRebuildRunner
{
public string ProjectionName => "IndividualRebuild";

private readonly ILogger<IndividualBodyRebuildRunner> _logger;
private readonly IEventStore _store;
private readonly IContextFactory _contextFactory;
private readonly IProjectionStates _projectionStates;
private readonly ElasticBus _bus;
private readonly Elastic _elastic;

public IndividualBodyRebuildRunner(
ILogger<IndividualBodyRebuildRunner> logger,
IEventStore store,
IContextFactory contextFactory,
IProjectionStates projectionStates,
ElasticBus bus,
Elastic elastic,
ElasticBusRegistrar busRegistrar)
{
_logger = logger;
_store = store;
_contextFactory = contextFactory;
_projectionStates = projectionStates;
_bus = bus;
_elastic = elastic;

busRegistrar.RegisterEventHandlers(BodyRunner.EventHandlers);
}

public async Task Run()
{
await using var context = _contextFactory.Create();

var lastProcessedEventNumber =
await _projectionStates.GetLastProcessedEventNumber(BodyRunner.ElasticSearchProjectionsProjectionName);

var bodyToRebuilds = await context.BodiesToRebuild.ToListAsync();

if (bodyToRebuilds.Count > 0)
_logger.LogInformation("[{ProjectionName}] Found {NumberOfBodies} bodies to rebuild", ProjectionName, bodyToRebuilds.Count);

try
{
foreach (var body in bodyToRebuilds)
{
var envelopes = _store
.GetEventEnvelopesUntil<OrganisationRegistry.Body.Body>(
body.BodyId,
lastProcessedEventNumber)
.ToList();

_logger.LogDebug("[{ProjectionName}] Found {NumberOfEnvelopes} envelopes (until #{MaxEventNumber}) to process for Body {BodyId}",
ProjectionName, envelopes.Count, envelopes.Last().Number, body.BodyId);

var allChanges = new List<ElasticChanges>();
foreach (var envelope in envelopes)
{
var changes = await ProcessEnvelope(envelope);
allChanges.Add(changes);
}

var documentCache = new Dictionary<Guid, BodyDocument>();

foreach (var changeSet in allChanges)
{
foreach (var changeSetChange in changeSet.Changes)
{
await ProcessChange(changeSetChange, documentCache);
}
}
await FlushDocuments(documentCache);

context.BodiesToRebuild.Remove(body);
await context.SaveChangesAsync();
}
}
catch (Exception ex)
{
_logger.LogCritical(0, ex, "[{ProjectionName}] An exception occurred while handling envelopes", ProjectionName);
throw;
}
}

private async Task ProcessChange(IElasticChange? changeSetChange, Dictionary<Guid, BodyDocument> documentCache)
{
switch (changeSetChange)
{
case ElasticDocumentCreation<BodyDocument> elasticDocumentCreation:
{
foreach (var documentChange in elasticDocumentCreation.Changes)
{
var document = documentChange.Value();
documentCache.Add(documentChange.Key, document);
}
break;
}
case ElasticPerDocumentChange<BodyDocument> perDocumentChange:
{
foreach (var documentChange in perDocumentChange.Changes)
{
BodyDocument? document;

if (!documentCache.ContainsKey(documentChange.Key))
{
document = (await _elastic.TryGetAsync(async () =>
(await _elastic.WriteClient.GetAsync<BodyDocument>(documentChange.Key))
.ThrowOnFailure()))
.Source;

documentCache.Add(documentChange.Key, document);
}
else
{
document = documentCache[documentChange.Key];
}

await documentChange.Value(document);
}

break;
}
case ElasticMassChange massChange:
{
await FlushDocuments(documentCache);
await massChange.Change(_elastic);
await _elastic.TryGetAsync(async () =>
(await _elastic.WriteClient.Indices.RefreshAsync(Indices.Index<BodyDocument>())).ThrowOnFailure());
break;
}
}
}


private async Task FlushDocuments(Dictionary<Guid, BodyDocument> documentCache)
{
if (documentCache.Any())
{
if (documentCache.Any(x => x.Key == Guid.Empty || string.IsNullOrEmpty(x.Value.Name)))
{
throw new Exception("Found document without key or name.");
}

await _elastic.TryAsync(async () =>
{
_elastic.WriteClient.BulkAll(documentCache.Values, b => b
.BackOffTime("30s")
.BackOffRetries(5)
.RefreshOnCompleted(false)
.MaxDegreeOfParallelism(Environment.ProcessorCount)
.Size(1000)
)
.Wait(TimeSpan.FromMinutes(15), next =>
{
_logger.LogInformation("Wrote page {PageNumber}", next.Page);
});
await Task.CompletedTask;
});
documentCache.Clear();
}
}


private async Task<ElasticChanges> ProcessEnvelope(IEnvelope envelope)
{
var changes = await _bus.Publish(null, null, (dynamic) envelope);
return new ElasticChanges(envelope.Number, changes);
}
}
16 changes: 16 additions & 0 deletions src/OrganisationRegistry.ElasticSearch.Projections/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace OrganisationRegistry.ElasticSearch.Projections;
using Autofac.Features.OwnedInstances;
using Autofac.Util;
using Be.Vlaanderen.Basisregisters.DataDog.Tracing.Sql.EntityFrameworkCore;
using Bodies;
using Body;
using Cache;
using Client;
Expand Down Expand Up @@ -207,6 +208,21 @@ public static async Task Main(string[] args)
);
})
.AddSingleton(provider =>
{
var bus = new ElasticBus(provider.GetRequiredService<ILogger<ElasticBus>>());
return new IndividualBodyRebuildRunner(
provider.GetRequiredService<ILogger<IndividualBodyRebuildRunner>>(),
provider.GetRequiredService<IEventStore>(),
provider.GetRequiredService<IContextFactory>(),
provider.GetRequiredService<IProjectionStates>(),
bus,
provider.GetRequiredService<Elastic>(),
new ElasticBusRegistrar(provider.GetRequiredService<ILogger<ElasticBusRegistrar>>(),
bus,
provider.GetRequiredService<Func<IServiceProvider>>())
);
})
.AddSingleton(provider =>
{
var bus = new ElasticBus(provider.GetRequiredService<ILogger<ElasticBus>>());
return new PeopleRunner(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace OrganisationRegistry.SqlServer.ElasticSearchProjections;

using System;
using Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using OrganisationRegistry.Infrastructure;

public class BodyToRebuild
{
public Guid BodyId { get; set; }
}

public class BodyToRebuildConfiguration : EntityMappingConfiguration<BodyToRebuild>
{
public const string TableName = "BodiesToRebuild";

public override void Map(EntityTypeBuilder<BodyToRebuild> b)
{
b.ToTable(TableName, WellknownSchemas.ElasticSearchProjectionsSchema)
.HasKey(p => p.BodyId)
.IsClustered(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class OrganisationRegistryContext : DbContext
public DbSet<IsActivePerOrganisationCapacity> IsActivePerOrganisationCapacityList { get; set; } = null!;
public DbSet<ElasticSearchProjections.OrganisationPerBody> OrganisationPerBodyListForES { get; set; } = null!;
public DbSet<OrganisationToRebuild> OrganisationsToRebuild { get; set; } = null!;
public DbSet<BodyToRebuild> BodiesToRebuild { get; set; } = null!;

public DbSet<OrganisationCacheItem> OrganisationCache { get; set; } = null!;
public DbSet<ContactTypeCacheItem> ContactTypeCache { get; set; } = null!;
Expand Down
Loading

0 comments on commit 88ca574

Please sign in to comment.