Skip to content

Commit

Permalink
Properly clear metadata when encoding/decoding payloads (#343)
Browse files Browse the repository at this point in the history
Fixes #342
  • Loading branch information
cretz authored Sep 5, 2024
1 parent 681cd01 commit ec8821d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/Temporalio/Worker/WorkflowCodecHelper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Linq;
using System.Threading.Tasks;
using Google.Protobuf;
using Google.Protobuf.Collections;
using Temporalio.Api.Common.V1;
using Temporalio.Bridge.Api.ActivityResult;
Expand Down Expand Up @@ -223,6 +224,7 @@ private static async Task EncodeAsync(IPayloadCodec codec, Payload payload)
var encodedList = await codec.EncodeAsync(new Payload[] { payload }).ConfigureAwait(false);
var encoded = encodedList.Single();
payload.Metadata.Clear();
payload.Data = ByteString.Empty;
payload.MergeFrom(encoded);
}

Expand Down Expand Up @@ -324,6 +326,7 @@ private static async Task DecodeAsync(IPayloadCodec codec, Payload payload)
// We are gonna require a single result here
var decoded = await codec.DecodeAsync(new Payload[] { payload }).ConfigureAwait(false);
payload.Metadata.Clear();
payload.Data = ByteString.Empty;
payload.MergeFrom(decoded.Single());
}
}
Expand Down
56 changes: 56 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ namespace Temporalio.Tests.Worker;
using Temporalio.Activities;
using Temporalio.Api.Common.V1;
using Temporalio.Api.Enums.V1;
using Temporalio.Api.History.V1;
using Temporalio.Client;
using Temporalio.Client.Schedules;
using Temporalio.Common;
using Temporalio.Converters;
using Temporalio.Exceptions;
using Temporalio.Runtime;
using Temporalio.Tests.Converters;
using Temporalio.Worker;
using Temporalio.Workflows;
using Xunit;
Expand Down Expand Up @@ -5736,6 +5738,60 @@ await Assert.ThrowsAsync<WorkflowAlreadyStartedException>(() =>
});
}

[Workflow]
public class NullWithCodecWorkflow
{
[Activity]
public static string? ReturnsNull() => null;

[WorkflowRun]
public async Task<string?> RunAsync(string? input)
{
Assert.Null(input);
return await Workflow.ExecuteActivityAsync(
() => ReturnsNull(),
new() { StartToCloseTimeout = TimeSpan.FromSeconds(10) });
}
}

[Fact]
public async Task ExecuteWorkflowAsync_NullWithCodec_EncodedProperly()
{
// Need client with codec
var newOptions = (TemporalClientOptions)Client.Options.Clone();
newOptions.DataConverter = DataConverter.Default with
{
PayloadCodec = new Base64PayloadCodec(),
};
var client = new TemporalClient(Client.Connection, newOptions);

// Run worker
await ExecuteWorkerAsync<NullWithCodecWorkflow>(
async worker =>
{
var handle = await client.StartWorkflowAsync(
(NullWithCodecWorkflow wf) => wf.RunAsync(null),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
var result = await handle.GetResultAsync();
Assert.Null(result);
// Confirm the codec is in use in history with the activity result
ActivityTaskCompletedEventAttributes? attrs = null;
await foreach (var evt in handle.FetchHistoryEventsAsync())
{
if (evt.ActivityTaskCompletedEventAttributes is { } taskAttrs)
{
attrs = taskAttrs;
break;
}
}
Assert.Equal(
Base64PayloadCodec.EncodingName,
attrs?.Result?.Payloads_?.Single()?.Metadata?.GetValueOrDefault("encoding")?.ToStringUtf8());
},
new TemporalWorkerOptions().AddAllActivities<NullWithCodecWorkflow>(null),
client);
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down

0 comments on commit ec8821d

Please sign in to comment.