Skip to content

Commit

Permalink
Port to 1.8.2-0-g9ca8498a5
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Mar 13, 2018
1 parent c62d011 commit be7df08
Show file tree
Hide file tree
Showing 70 changed files with 3,542 additions and 656 deletions.
2 changes: 1 addition & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: 0.9.12-30-gec52107
- Aeron: 1.7.0-493-g2412c5c89
- Aeron: 1.8.2-0-g9ca8498a5
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.7.0.0</version>
<version>1.8.2</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
Binary file modified driver/media-driver.jar
Binary file not shown.
3 changes: 1 addition & 2 deletions driver/start-media-driver.bat
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
@echo off
echo Media Driver Started...
java -cp media-driver.jar ^
io.aeron.cluster.ClusteredMediaDriver
java -cp media-driver.jar io.aeron.driver.MediaDriver
echo Media Driver Stopped.
pause
2 changes: 2 additions & 0 deletions driver/version.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1.8.2
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.8.2/
45 changes: 45 additions & 0 deletions src/Adaptive.Aeron.Tests/LogBuffer/TermAppenderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,51 @@ public void ShouldAppendFragmentedFromVectorsToEmptyLog()
.Then(A.CallTo(() => _termBuffer.PutLong(tail2 + DataHeaderFlyweight.RESERVED_VALUE_OFFSET, RV, ByteOrder.LittleEndian)).MustHaveHappened())
.Then(A.CallTo(() => _termBuffer.PutIntOrdered(tail2, frameTwoLength)).MustHaveHappened());
}

[Test]
public void ShouldAppendFragmentedFromVectorsWithNonZeroOffsetToEmptyLog()
{
var mtu = 2048;
var headerLength = _defaultHeader.Capacity;
var maxPayloadLength = mtu - headerLength;
var bufferOneLength = 64;
var offset = 15;
var bufferTwoTotalLength = 3000;
var bufferTwoLength = bufferTwoTotalLength - offset;
var bufferOne = new UnsafeBuffer(new byte[bufferOneLength]);
var bufferTwo = new UnsafeBuffer(new byte[bufferTwoTotalLength]);
bufferOne.SetMemory(0, bufferOne.Capacity, (byte) '1');
bufferTwo.SetMemory(0, bufferTwo.Capacity, (byte) '2');
var msgLength = bufferOneLength + bufferTwoLength;
var tail = 0;
var frameOneLength = mtu;
var frameTwoLength = (msgLength - (mtu - headerLength)) + headerLength;
var resultingOffset = frameOneLength + BitUtil.Align(frameTwoLength, FrameDescriptor.FRAME_ALIGNMENT);

_logMetaDataBuffer.PutLong(TermTailCounterOffset, LogBufferDescriptor.PackTail(TermID, tail));

var vectors = new[]
{
new DirectBufferVector(bufferOne, 0, bufferOneLength),
new DirectBufferVector(bufferTwo, offset, bufferTwoLength)
};

Assert.That(_termAppender.AppendFragmentedMessage(_headerWriter, vectors, msgLength, maxPayloadLength, RVS, TermID), Is.EqualTo(resultingOffset));

var tail2 = tail + frameOneLength;
var bufferTwoOffset = maxPayloadLength - bufferOneLength + offset;
var fragmentTwoPayloadLength = bufferTwoLength - (maxPayloadLength - bufferOneLength);

A.CallTo(() => _headerWriter.Write(_termBuffer, tail, frameOneLength, TermID)).MustHaveHappened()
.Then(A.CallTo(() => _termBuffer.PutBytes(headerLength, bufferOne, 0, bufferOneLength)).MustHaveHappened())
.Then(A.CallTo(() => _termBuffer.PutBytes(headerLength + bufferOneLength, bufferTwo, offset, maxPayloadLength - bufferOneLength)).MustHaveHappened())
.Then(A.CallTo(() => _termBuffer.PutLong(tail + DataHeaderFlyweight.RESERVED_VALUE_OFFSET, RV, ByteOrder.LittleEndian)).MustHaveHappened())
.Then(A.CallTo(() => _termBuffer.PutIntOrdered(tail, frameOneLength)).MustHaveHappened())
.Then(A.CallTo(() => _headerWriter.Write(_termBuffer, tail2, frameTwoLength, TermID)).MustHaveHappened())
.Then(A.CallTo(() => _termBuffer.PutBytes(tail2 + headerLength, bufferTwo, bufferTwoOffset, fragmentTwoPayloadLength)).MustHaveHappened())
.Then(A.CallTo(() => _termBuffer.PutLong(tail2 + DataHeaderFlyweight.RESERVED_VALUE_OFFSET, RV, ByteOrder.LittleEndian)).MustHaveHappened())
.Then(A.CallTo(() => _termBuffer.PutIntOrdered(tail2, frameTwoLength)).MustHaveHappened());
}

[Test]
[ExpectedException(typeof(InvalidOperationException))]
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Adaptive.Aeron.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<PackageId>Aeron.Client</PackageId>
<VersionPrefix>1.7.0</VersionPrefix>
<VersionPrefix>1.8.2</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Aeron Client</Product>
Expand Down
33 changes: 29 additions & 4 deletions src/Adaptive.Aeron/Aeron.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ public static Aeron Connect(Context ctx)
}
}

/// <summary>
/// Print out the values from <seealso cref="#countersReader()"/> which can be useful for debugging.
/// </summary>
/// <param name="out"> to where the counters get printed. </param>
public void PrintCounters(StreamWriter @out)
{
CountersReader counters = CountersReader();
counters.ForEach((value, id, label) => @out.WriteLine("{0,3}: {1:} - {2}", id, value, label));
}


/// <summary>
/// Has the client been closed? If not then the CnC file may not be unmapped.
/// </summary>
Expand Down Expand Up @@ -370,7 +381,7 @@ public class Context : IDisposable
private long _keepAliveInterval = KeepaliveIntervalNs;
private long _interServiceTimeout = 0;
private FileInfo _cncFile;
private string _aeronDirectoryName;
private string _aeronDirectoryName = GetAeronDirectoryName();
private DirectoryInfo _aeronDirectory;
private long _driverTimeoutMs = DRIVER_TIMEOUT_MS;
private MappedByteBuffer _cncByteBuffer;
Expand Down Expand Up @@ -477,7 +488,7 @@ public class Context : IDisposable
/// Key for the linger timeout for a publication to wait around after draining in nanoseconds.
/// </summary>
public const string LINGER_PARAM_NAME = "linger";

/// <summary>
/// Valid value for <seealso cref="MDC_CONTROL_MODE"/> when manual control is desired.
/// </summary>
Expand All @@ -493,9 +504,14 @@ public class Context : IDisposable
/// </summary>
public const string RELIABLE_STREAM_PARAM_NAME = "reliable";

public Context()
/// <summary>
/// Get the default directory name to be used if <seealso cref="AeronDirectoryName(String)"/> is not set. This will take
/// the <seealso cref="AERON_DIR_PROP_NAME"/> if set and if not then <seealso cref="AERON_DIR_PROP_DEFAULT"/>.
/// </summary>
/// <returns> the default directory name to be used if <seealso cref="AeronDirectoryName(String)"/> is not set. </returns>
public static string GetAeronDirectoryName()
{
_aeronDirectoryName = Config.GetProperty(AERON_DIR_PROP_NAME, AERON_DIR_PROP_DEFAULT);
return Config.GetProperty(AERON_DIR_PROP_NAME, AERON_DIR_PROP_DEFAULT);
}

/// <summary>
Expand All @@ -511,6 +527,15 @@ public Context ConcludeAeronDirectory()

return this;
}

/// <summary>
/// Perform a shallow copy of the object.
/// </summary>
/// <returns> a shallow copy of the object. </returns>
public virtual Context Clone()
{
return (Context)MemberwiseClone();
}

/// <summary>
/// This is called automatically by <seealso cref="Connect()"/> and its overloads.
Expand Down
3 changes: 1 addition & 2 deletions src/Adaptive.Aeron/ClientConductor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -749,8 +749,7 @@ private int checkLingeringResources(long nowNs)

if (nowNs > (resource.TimeOfLastStateChange() + RESOURCE_LINGER_NS))
{
ListUtil.FastUnorderedRemove(lingeringResources, i, lastIndex);
lastIndex--;
ListUtil.FastUnorderedRemove(lingeringResources, i, lastIndex--);
resource.Delete();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static Config()
}
}

public static string GetProperty(string propertyName, string defaultValue)
public static string GetProperty(string propertyName, string defaultValue = null)
{
string value;
return Params.TryGetValue(propertyName, out value) ? value : defaultValue;
Expand Down
4 changes: 2 additions & 2 deletions src/Adaptive.Aeron/LogBuffer/ExclusiveTermAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public virtual int AppendUnfragmentedMessage(
int termId,
int termOffset,
HeaderWriter header,
UnsafeBuffer srcBuffer,
IDirectBuffer srcBuffer,
int srcOffset,
int length,
ReservedValueSupplier reservedValueSupplier)
Expand Down Expand Up @@ -407,7 +407,7 @@ public int AppendFragmentedMessage(
int vectorRemaining = vector.length - vectorOffset;
int numBytes = Math.Min(bytesToWrite - bytesWritten, vectorRemaining);

termBuffer.PutBytes(payloadOffset, vector.buffer, vectorOffset, numBytes);
termBuffer.PutBytes(payloadOffset, vector.buffer, vector.offset + vectorOffset, numBytes);

bytesWritten += numBytes;
payloadOffset += numBytes;
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/LogBuffer/TermAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public int AppendFragmentedMessage(HeaderWriter header, DirectBufferVector[] vec
int vectorRemaining = vector.length - vectorOffset;
int numBytes = Math.Min(bytesToWrite - bytesWritten, vectorRemaining);

termBuffer.PutBytes(payloadOffset, vector.buffer, vectorOffset, numBytes);
termBuffer.PutBytes(payloadOffset, vector.buffer, vector.offset + vectorOffset, numBytes);

bytesWritten += numBytes;
payloadOffset += numBytes;
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/LogBuffers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public LogBuffers(string logFileName)
// if log length exceeds MAX_INT we need multiple mapped buffers, (see FileChannel.map doc).
if (logLength < int.MaxValue)
{
var mappedBuffer = IoUtil.MapExistingFile(logFileName, MapMode.ReadWrite);
var mappedBuffer = IoUtil.MapExistingFile(logFileName, MapMode.ReadWrite); // TODO Java has sparse hint
_mappedByteBuffers = new[] {mappedBuffer};

_logMetaDataBuffer = new UnsafeBuffer(mappedBuffer.Pointer,
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Agrona/Adaptive.Agrona.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Agrona</PackageId>
<VersionPrefix>0.9.13</VersionPrefix>
<VersionPrefix>1.8.2</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Agrona libraries initially included in Aeron Client</Product>
Expand Down
33 changes: 33 additions & 0 deletions src/Adaptive.Agrona/Concurrent/Status/CountersReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public class CountersReader
/// <param name="keyBuffer"> for the counter.</param>
/// <param name="label"> for the counter.</param>
public delegate void MetaData(int counterId, int typeId, IDirectBuffer keyBuffer, string label);

/// <summary>
/// Callback function for consuming basic counter details and value.
/// </summary>
/// <param name="value"> of the counter. </param>
/// <param name="counterId"> of the counter </param>
/// <param name="label"> for the counter. </param>
public delegate void CounterConsumer(long value, int counterId, string label);

/// <summary>
/// Can be used to representing a null counter id when passed as a argument.
Expand Down Expand Up @@ -244,6 +252,31 @@ public void ForEach(IntObjConsumer<string> consumer)
counterId++;
}
}

/// <summary>
/// Iterate over the counters and provide the value and basic metadata.
/// </summary>
/// <param name="consumer"> for each allocated counter. </param>
public void ForEach(CounterConsumer consumer)
{
int counterId = 0;

for (int i = 0, capacity = MetaDataBuffer.Capacity; i < capacity; i += METADATA_LENGTH)
{
int recordStatus = MetaDataBuffer.GetIntVolatile(i);

if (RECORD_ALLOCATED == recordStatus)
{
consumer(ValuesBuffer.GetLongVolatile(CounterOffset(counterId)), counterId, LabelValue(i));
}
else if (RECORD_UNUSED == recordStatus)
{
break;
}

counterId++;
}
}

/// <summary>
/// Iterate over all the metadata in the buffer.
Expand Down
Loading

0 comments on commit be7df08

Please sign in to comment.