Skip to content

Commit

Permalink
Port to 1.10.2
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Aug 1, 2018
1 parent d1ac7ac commit 773c007
Show file tree
Hide file tree
Showing 21 changed files with 1,093 additions and 26 deletions.
2 changes: 1 addition & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: 0.9.18-13-g0378ffa
- Aeron: 1.10.1
- Aeron: 1.10.2
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.10.1</version>
<version>1.10.2</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
Binary file modified driver/media-driver.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion driver/version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Driver source:
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.10.1/aeron-all-1.10.1.jar
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.10.2/aeron-all-1.10.2.jar
19 changes: 19 additions & 0 deletions scripts/build-prerelease-nuget-packages - Copy.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
@echo off
pushd %~dp0..
SET nuget_source=https://www.myget.org/F/aeron/api/v2/package

del nupkgs\*.nupkg

call dotnet pack src\Adaptive.Aeron\Adaptive.Aeron.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Cluster\Adaptive.Cluster.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Archiver\Adaptive.Archiver.csproj -c Release --output ..\..\nupkgs
call .\scripts\nuget pack .\driver\Aeron.Driver.nuspec -OutputDirectory nupkgs

call dotnet nuget push nupkgs\Agrona.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Client.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Driver.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Cluster.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Archiver.*.nupkg -s %nuget_source%

popd
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Adaptive.Aeron.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<PackageId>Aeron.Client</PackageId>
<VersionPrefix>1.10.1</VersionPrefix>
<VersionPrefix>1.10.2</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Aeron Client</Product>
Expand Down
1 change: 0 additions & 1 deletion src/Adaptive.Aeron/Aeron.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ internal Aeron(Context ctx)
///
/// Threads required for interacting with the media driver are created and managed within the Aeron instance.
///
/// </para>
/// </summary>
/// <returns> the new <seealso cref="Aeron"/> instance connected to the Media Driver. </returns>
public static Aeron Connect()
Expand Down
47 changes: 46 additions & 1 deletion src/Adaptive.Aeron/ClientConductor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ internal class ClientConductor : IAgent, IDriverEventsListener
private long _timeOfLastResourcesCheckNs;
private long _timeOfLastServiceNs;
private bool _isClosed;
private bool _isInCallback;
private string _stashedChannel;
private RegistrationException _driverException;

Expand Down Expand Up @@ -270,6 +271,8 @@ public void OnAvailableImage(
AvailableImageHandler handler = subscription.AvailableImageHandler();
if (null != handler)
{
_isInCallback = true;

try
{
handler(image);
Expand All @@ -278,6 +281,10 @@ public void OnAvailableImage(
{
HandleError(ex);
}
finally
{
_isInCallback = false;
}
}

subscription.AddImage(image);
Expand All @@ -295,6 +302,8 @@ public void OnUnavailableImage(long correlationId, long subscriptionRegistration
UnavailableImageHandler handler = subscription.UnavailableImageHandler();
if (null != handler)
{
_isInCallback = true;

try
{
handler(image);
Expand All @@ -303,6 +312,10 @@ public void OnUnavailableImage(long correlationId, long subscriptionRegistration
{
HandleError(ex);
}
finally
{
_isInCallback = false;
}
}
}
}
Expand All @@ -318,6 +331,8 @@ public void OnAvailableCounter(long registrationId, int counterId)
{
if (null != _availableCounterHandler)
{
_isInCallback = true;

try
{
_availableCounterHandler(_countersReader, registrationId, counterId);
Expand All @@ -326,13 +341,19 @@ public void OnAvailableCounter(long registrationId, int counterId)
{
HandleError(ex);
}
finally
{
_isInCallback = false;
}
}
}

public void OnUnavailableCounter(long registrationId, int counterId)
{
if (null != _unavailableCounterHandler)
{
_isInCallback = true;

try
{
_unavailableCounterHandler(_countersReader, registrationId, counterId);
Expand All @@ -341,6 +362,10 @@ public void OnUnavailableCounter(long registrationId, int counterId)
{
HandleError(ex);
}
finally
{
_isInCallback = false;
}
}
}

Expand All @@ -360,6 +385,7 @@ internal ConcurrentPublication AddPublication(string channel, int streamId)
try
{
EnsureOpen();
EnsureNotReentrant();

_stashedChannel = channel;
long registrationId = _driverProxy.AddPublication(channel, streamId);
Expand All @@ -379,6 +405,7 @@ internal virtual ExclusivePublication AddExclusivePublication(string channel, in
try
{
EnsureOpen();
EnsureNotReentrant();

_stashedChannel = channel;
long registrationId = _driverProxy.AddExclusivePublication(channel, streamId);
Expand All @@ -402,6 +429,7 @@ internal virtual void ReleasePublication(Publication publication)
publication.InternalClose();

EnsureOpen();
EnsureNotReentrant();

var removedPublication = _resourceByRegIdMap[publication.RegistrationId];

Expand Down Expand Up @@ -429,6 +457,7 @@ internal virtual Subscription AddSubscription(string channel, int streamId, Avai
try
{
EnsureOpen();
EnsureNotReentrant();

long correlationId = _driverProxy.AddSubscription(channel, streamId);
Subscription subscription = new Subscription(this, channel, streamId, correlationId, availableImageHandler, unavailableImageHandler);
Expand All @@ -455,6 +484,7 @@ internal virtual void ReleaseSubscription(Subscription subscription)
subscription.InternalClose();

EnsureOpen();
EnsureNotReentrant();

long registrationId = subscription.RegistrationId;
AwaitResponse(_driverProxy.RemoveSubscription(registrationId));
Expand All @@ -473,6 +503,7 @@ internal virtual void AddDestination(long registrationId, string endpointChannel
try
{
EnsureOpen();
EnsureNotReentrant();

AwaitResponse(_driverProxy.AddDestination(registrationId, endpointChannel));
}
Expand All @@ -488,6 +519,7 @@ internal virtual void RemoveDestination(long registrationId, string endpointChan
try
{
EnsureOpen();
EnsureNotReentrant();

AwaitResponse(_driverProxy.RemoveDestination(registrationId, endpointChannel));
}
Expand All @@ -503,6 +535,7 @@ internal void AddRcvDestination(long registrationId, string endpointChannel)
try
{
EnsureOpen();
EnsureNotReentrant();

AwaitResponse(_driverProxy.AddRcvDestination(registrationId, endpointChannel));
}
Expand All @@ -518,7 +551,8 @@ internal void RemoveRcvDestination(long registrationId, string endpointChannel)
try
{
EnsureOpen();

EnsureNotReentrant();

AwaitResponse(_driverProxy.RemoveRcvDestination(registrationId, endpointChannel));
}
finally
Expand All @@ -534,6 +568,7 @@ internal virtual Counter AddCounter(int typeId, IDirectBuffer keyBuffer, int key
try
{
EnsureOpen();
EnsureNotReentrant();

if (keyLength < 0 || keyLength > CountersManager.MAX_KEY_LENGTH)
{
Expand Down Expand Up @@ -563,6 +598,7 @@ internal Counter AddCounter(int typeId, string label)
try
{
EnsureOpen();
EnsureNotReentrant();

if (label.Length > CountersManager.MAX_LABEL_LENGTH)
{
Expand Down Expand Up @@ -591,6 +627,7 @@ internal virtual void ReleaseCounter(Counter counter)
counter.InternalClose();

EnsureOpen();
EnsureNotReentrant();

long registrationId = counter.RegistrationId();
AwaitResponse(_driverProxy.RemoveCounter(registrationId));
Expand Down Expand Up @@ -646,6 +683,14 @@ private void EnsureOpen()
}
}

private void EnsureNotReentrant()
{
if (_isInCallback)
{
throw new AeronException("Reentrant calls not permitted during callbacks");
}
}

private LogBuffers LogBuffers(long registrationId, string logFileName)
{
LogBuffers logBuffers = _logBuffersByIdMap[registrationId];
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Agrona/Adaptive.Agrona.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Agrona</PackageId>
<VersionPrefix>1.10.1</VersionPrefix>
<VersionPrefix>1.10.2</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Agrona libraries initially included in Aeron Client</Product>
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/Adaptive.Archiver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Aeron.Archiver</PackageId>
<VersionPrefix>1.10.1</VersionPrefix>
<VersionPrefix>1.10.2</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Archiving over the Aeron transport</Product>
Expand Down
54 changes: 41 additions & 13 deletions src/Adaptive.Archiver/AeronArchive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public ExclusivePublication AddRecordedExclusivePublication(string channel, int
/// <param name="channel"> to be recorded. </param>
/// <param name="streamId"> to be recorded. </param>
/// <param name="sourceLocation"> of the publication to be recorded. </param>
/// <returns> the correlationId used to identify the request. </returns>
/// <returns> the subscriptionId of the recording. </returns>
public long StartRecording(string channel, int streamId, SourceLocation sourceLocation)
{
_lock.Lock();
Expand All @@ -411,9 +411,7 @@ public long StartRecording(string channel, int streamId, SourceLocation sourceLo
throw new ArchiveException("failed to send start recording request");
}

PollForResponse(correlationId);

return correlationId;
return PollForResponse(correlationId);
}
finally
{
Expand All @@ -432,7 +430,7 @@ public long StartRecording(string channel, int streamId, SourceLocation sourceLo
/// <param name="channel"> to be recorded. </param>
/// <param name="streamId"> to be recorded. </param>
/// <param name="sourceLocation"> of the publication to be recorded. </param>
/// <returns> the correlationId used to identify the request. </returns>
/// <returns> the subscriptionId of the recording. </returns>
public long ExtendRecording(long recordingId, string channel, int streamId,
SourceLocation sourceLocation)
{
Expand All @@ -447,9 +445,7 @@ public long ExtendRecording(long recordingId, string channel, int streamId,
throw new ArchiveException("failed to send extend recording request");
}

PollForResponse(correlationId);

return correlationId;
return PollForResponse(correlationId);
}
finally
{
Expand Down Expand Up @@ -500,16 +496,48 @@ public void StopRecording(Publication publication)
}

/// <summary>
/// Start a replay for a length in bytes of a recording from a position. If the position is <seealso cref="#NULL_POSITION"/>
/// Stop recording for a subscriptionId that has been returned from
/// <seealso cref="#startRecording(String, int, SourceLocation)"/> or
/// <seealso cref="#extendRecording(long, String, int, SourceLocation)"/>.
/// </summary>
/// <param name="subscriptionId"> the subscription was registered with for the recording. </param>
public void StopRecording(long subscriptionId)
{
_lock.Lock();
try
{
long correlationId = aeron.NextCorrelationId();

if (!archiveProxy.StopRecording(subscriptionId, correlationId, controlSessionId))
{
throw new ArchiveException("failed to send stop recording request");
}

PollForResponse(correlationId);
}
finally
{
_lock.Unlock();
}
}


/// <summary>
/// Start a replay for a length in bytes of a recording from a position. If the position is <seealso cref="NULL_POSITION"/>
/// then the stream will be replayed from the start.
///
/// The lower 32-bits of the returned value contain the <see cref="Image.SessionId"/> of the received replay. All
/// 64-bits are required to uniquely identify the replay when calling <see cref="StopReplay(long)"/>. The lower 32-bits
/// can be obtained by casting the <see cref="long"/> value to an <see cref="int"/>.
///
/// </summary>
/// <param name="recordingId"> to be replayed. </param>
/// <param name="position"> from which the replay should begin or <seealso cref="#NULL_POSITION"/> if from the start. </param>
/// <param name="length"> of the stream to be replayed. Use <seealso cref="Long#MAX_VALUE"/> to follow a live recording or <see cref="NULL_LENGTH"/> to replay the whole stream of unknown length. </param>
/// <param name="position"> from which the replay should begin or <seealso cref="NULL_POSITION"/> if from the start. </param>
/// <param name="length"> of the stream to be replayed. Use <seealso cref="long.MaxValue"/> to follow a live recording or <see cref="NULL_LENGTH"/> to replay the whole stream of unknown length. </param>
/// <param name="replayChannel"> to which the replay should be sent. </param>
/// <param name="replayStreamId"> to which the replay should be sent. </param>
/// <returns> the id of the replay session which will be the same as the <seealso cref="Image#sessionId()"/> of the received
/// replay for correlation with the matching channel and stream id. </returns>
/// <returns> the id of the replay session which will be the same as the <seealso cref="Image.SessionId"/> of the received
/// replay for correlation with the matching channel and stream id in the lower 32 bits. </returns>
public long StartReplay(long recordingId, long position, long length, string replayChannel,
int replayStreamId)
{
Expand Down
Loading

0 comments on commit 773c007

Please sign in to comment.