Skip to content

Commit

Permalink
Bump Arcane.Framework from 0.0.35 to 0.0.47 in /src (#55)
Browse files Browse the repository at this point in the history
* Bump Arcane.Framework from 0.0.35 to 0.0.47 in /src

Bumps [Arcane.Framework](https://github.com/SneaksAndData/arcane-framework) from 0.0.35 to 0.0.47.
- [Release notes](https://github.com/SneaksAndData/arcane-framework/releases)
- [Commits](SneaksAndData/arcane-framework@v0.0.35...v0.0.47)

---
updated-dependencies:
- dependency-name: Arcane.Framework
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <[email protected]>

* Fix incompatible changes

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Vitalii Savitskii <[email protected]>
Co-authored-by: George Zubrienko <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2024
1 parent 346b91d commit 2a54285
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/Arcane.Stream.Cdm.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<RootNamespace>Arcane.Stream.Cdm</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Arcane.Framework" Version="0.0.35" />
<PackageReference Include="Arcane.Framework" Version="0.0.47" />
<PackageReference Include="SnD.Sdk" Version="1.1.22" />
</ItemGroup>

Expand Down
16 changes: 13 additions & 3 deletions src/GraphBuilder/CdmChangeFeedGraphBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
using System.Threading.Tasks;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using Arcane.Framework.Contracts;
using Arcane.Framework.Services.Base;
using Arcane.Framework.Sinks.Models;
using Arcane.Framework.Sinks.Parquet;
using Arcane.Framework.Sources.CdmChangeFeedSource;
using Arcane.Stream.Cdm.Models;
Expand All @@ -19,13 +21,15 @@ public class CdmChangeFeedGraphBuilder : IStreamGraphBuilder<CdmChangeFeedStream
private readonly IBlobStorageService blobStorageService;
private readonly MetricsService metricsService;
private readonly IBlobStorageWriter blobStorageWriter;
private readonly IInterruptionToken interruptionToken;

public CdmChangeFeedGraphBuilder(IBlobStorageService blobStorageService, MetricsService metricsService,
IBlobStorageWriter blobStorageWriter)
IBlobStorageWriter blobStorageWriter, IInterruptionToken interruptionToken)
{
this.blobStorageService = blobStorageService;
this.metricsService = metricsService;
this.blobStorageWriter = blobStorageWriter;
this.interruptionToken = interruptionToken;
}

public IRunnableGraph<(UniqueKillSwitch, Task)> BuildGraph(CdmChangeFeedStreamContext context)
Expand All @@ -38,7 +42,11 @@ public CdmChangeFeedGraphBuilder(IBlobStorageService blobStorageService, Metrics
context.SchemaUpdateInterval);

var dimensions = source.GetDefaultTags().GetAsDictionary(context, context.StreamId);
var parquetSink = ParquetSinkFromContext(context, source.GetParquetSchema(), this.blobStorageWriter, context.SinkLocation);
var parquetSink = ParquetSinkFromContext(context,
source.GetParquetSchema(),
this.blobStorageWriter,
context.SinkLocation,
this.interruptionToken);
return Source.FromGraph(source)
.GroupedWithin(context.RowsPerGroup, context.GroupingInterval)
.Select(grp =>
Expand All @@ -55,12 +63,14 @@ public CdmChangeFeedGraphBuilder(IBlobStorageService blobStorageService, Metrics


private static ParquetSink ParquetSinkFromContext(CdmChangeFeedStreamContext streamContext, Schema schema,
IBlobStorageWriter blobStorageWriter, string sinkLocation)
IBlobStorageWriter blobStorageWriter, string sinkLocation, IInterruptionToken interruptionToken)
{
var parquetSink = ParquetSink.Create(parquetSchema: schema, storageWriter: blobStorageWriter,
parquetFilePath: $"{sinkLocation}/{streamContext.StreamId}",
rowGroupsPerFile: streamContext.GroupsPerFile,
createSchemaFile: true,
interruptionToken: interruptionToken,
streamMetadata: streamContext.GetStreamMetadata().GetOrElse(new StreamMetadata(Option<StreamPartition[]>.None)),
dataSinkPathSegment: streamContext.IsBackfilling ? "backfill" : "data",
dropCompletionToken: streamContext.IsBackfilling);

Expand Down
6 changes: 5 additions & 1 deletion src/Models/CdmChangeFeedStreamContext.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.Text.Json.Serialization;
using Akka.Util;
using Arcane.Framework.Configuration;
using Arcane.Framework.Services.Base;
using Arcane.Framework.Sinks.Models;

namespace Arcane.Stream.Cdm.Models;

Expand Down Expand Up @@ -57,7 +59,9 @@ public class CdmChangeFeedStreamContext : IStreamContext, IStreamContextWriter
[JsonConverter(typeof(SecondsToTimeSpanConverter))]
[JsonPropertyName("schemaUpdateIntervalSeconds")]
public TimeSpan SchemaUpdateInterval { get; set; }


public Option<StreamMetadata> GetStreamMetadata() => new StreamMetadata(Option<StreamPartition[]>.None);

/// <inheritdoc cref="IStreamContext.StreamId"/>>
public string StreamId { get; private set; }

Expand Down

0 comments on commit 2a54285

Please sign in to comment.