Skip to content

Commit

Permalink
TASK: Ensure that publishing does not attempt to publish 0 events
Browse files Browse the repository at this point in the history
The event store would reject that:

> Writable events must contain at least one event
  • Loading branch information
mhsdesign committed Jan 13, 2025
1 parent 35f58a4 commit 6fa883c
Showing 1 changed file with 39 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,24 +232,26 @@ static function ($handle) use ($rebaseableCommands): void {
$commandSimulator->eventStream(),
);

try {
yield new EventsToPublish(
ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId)
->getEventStreamName(),
$eventsOfWorkspaceToPublish,
ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion)
);
} catch (ConcurrencyException $concurrencyException) {
yield $this->reopenContentStreamWithoutConstraintChecks(
$workspace->currentContentStreamId
);
throw $concurrencyException;
if ($eventsOfWorkspaceToPublish !== null) {
try {
yield new EventsToPublish(
ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId)
->getEventStreamName(),
$eventsOfWorkspaceToPublish,
ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion)
);
} catch (ConcurrencyException $concurrencyException) {
yield $this->reopenContentStreamWithoutConstraintChecks(
$workspace->currentContentStreamId
);
throw $concurrencyException;
}
}

yield $this->forkContentStream(
$command->newContentStreamId,
$baseWorkspace->currentContentStreamId,
Version::fromInteger($baseWorkspaceContentStreamVersion->value + $eventsOfWorkspaceToPublish->count())
Version::fromInteger($baseWorkspaceContentStreamVersion->value + ($eventsOfWorkspaceToPublish?->count() ?? 0))
);

yield new EventsToPublish(
Expand Down Expand Up @@ -303,7 +305,7 @@ private function getCopiedEventsOfEventStream(
WorkspaceName $targetWorkspaceName,
ContentStreamId $targetContentStreamId,
EventStreamInterface $eventStream
): Events {
): Events|null {
$events = [];
foreach ($eventStream as $eventEnvelope) {
$event = $this->eventNormalizer->denormalize($eventEnvelope->event);
Expand All @@ -316,7 +318,8 @@ private function getCopiedEventsOfEventStream(
}
}

return Events::fromArray($events);
// this could technically empty, but we handle it as a no-op
return $events !== [] ? Events::fromArray($events) : null;
}

/**
Expand Down Expand Up @@ -485,31 +488,32 @@ static function ($handle) use ($commandSimulator, $matchingCommands, $remainingC
};
}

// this could empty and a no-op for the rare case when a command returns empty events e.g. the node was already tagged with this subtree tag
$selectedEventsOfWorkspaceToPublish = $this->getCopiedEventsOfEventStream(
$baseWorkspace->workspaceName,
$baseWorkspace->currentContentStreamId,
$commandSimulator->eventStream()->withMaximumSequenceNumber($highestSequenceNumberForMatching),
);

try {
yield new EventsToPublish(
ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId)
->getEventStreamName(),
$selectedEventsOfWorkspaceToPublish,
ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion)
);
} catch (ConcurrencyException $concurrencyException) {
yield $this->reopenContentStreamWithoutConstraintChecks(
$workspace->currentContentStreamId
);
throw $concurrencyException;
if ($selectedEventsOfWorkspaceToPublish !== null) {
try {
yield new EventsToPublish(
ContentStreamEventStreamName::fromContentStreamId($baseWorkspace->currentContentStreamId)
->getEventStreamName(),
$selectedEventsOfWorkspaceToPublish,
ExpectedVersion::fromVersion($baseWorkspaceContentStreamVersion)
);
} catch (ConcurrencyException $concurrencyException) {
yield $this->reopenContentStreamWithoutConstraintChecks(
$workspace->currentContentStreamId
);
throw $concurrencyException;
}
}

yield from $this->forkNewContentStreamAndApplyEvents(
$command->contentStreamIdForRemainingPart,
$baseWorkspace->currentContentStreamId,
Version::fromInteger($baseWorkspaceContentStreamVersion->value + $selectedEventsOfWorkspaceToPublish->count()),
Version::fromInteger($baseWorkspaceContentStreamVersion->value + ($selectedEventsOfWorkspaceToPublish?->count() ?? 0)),
new EventsToPublish(
WorkspaceEventStreamName::fromWorkspaceName($command->workspaceName)->getEventStreamName(),
Events::fromArray([
Expand Down Expand Up @@ -780,7 +784,7 @@ private function forkNewContentStreamAndApplyEvents(
ContentStreamId $sourceContentStreamId,
Version $sourceContentStreamVersion,
EventsToPublish $pointWorkspaceToNewContentStream,
Events $eventsToApplyOnNewContentStream,
Events|null $eventsToApplyOnNewContentStream,
): \Generator {
yield $this->forkContentStream(
$newContentStreamId,
Expand All @@ -797,13 +801,12 @@ private function forkNewContentStreamAndApplyEvents(
yield new EventsToPublish(
ContentStreamEventStreamName::fromContentStreamId($newContentStreamId)
->getEventStreamName(),
$eventsToApplyOnNewContentStream->withAppendedEvents(
Events::with(
new ContentStreamWasReopened(
$newContentStreamId
)
Events::fromArray([
...($eventsToApplyOnNewContentStream ?? []),
new ContentStreamWasReopened(
$newContentStreamId
)
),
]),
ExpectedVersion::fromVersion(Version::first()->next())
);
}
Expand Down

0 comments on commit 6fa883c

Please sign in to comment.