Skip to content

Commit

Permalink
Merge pull request #151 from scalecube/feature/replace-to-agrona-agents
Browse files Browse the repository at this point in the history
Replace to agrona agents
  • Loading branch information
artem-v authored Dec 30, 2019
2 parents a091b3b + 498756c commit e261742
Show file tree
Hide file tree
Showing 55 changed files with 1,423 additions and 1,639 deletions.
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


<artifactId>reactor-aeron-parent</artifactId>
<version>0.1.5-SNAPSHOT</version>
<version>0.2.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>ScaleCube/scalecube-reactor-aeron</name>
Expand All @@ -23,15 +23,15 @@
</scm>

<properties>
<aeron.version>1.18.0</aeron.version>
<reactor.version>Californium-SR5</reactor.version>
<aeron.version>1.24.0</aeron.version>
<reactor.version>Dysprosium-RELEASE</reactor.version>

<slf4j.version>1.7.7</slf4j.version>
<log4j.version>2.11.0</log4j.version>
<disruptor.version>3.4.2</disruptor.version>

<mockito-junit-jupiter.version>2.17.0</mockito-junit-jupiter.version>
<junit-jupiter.version>5.1.0</junit-jupiter.version>
<mockito-junit-jupiter.version>2.27.0</mockito-junit-jupiter.version>
<junit-jupiter.version>5.1.1</junit-jupiter.version>
<hamcrest.version>1.3</hamcrest.version>
</properties>

Expand Down
51 changes: 16 additions & 35 deletions reactor-aeron-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<netty.version>4.1.33.Final</netty.version>
<rsocket.version>0.12.2-RC2</rsocket.version>
<netty.version>4.1.37.Final</netty.version>
<rsocket.version>1.0.0-RC5</rsocket.version>
<rsocket-transport-aeron.version>0.1.12</rsocket-transport-aeron.version>
<hdrhistogram.version>2.1.10</hdrhistogram.version>
<jmh.version>1.21</jmh.version>
Expand All @@ -16,11 +16,24 @@
<parent>
<groupId>io.scalecube</groupId>
<artifactId>reactor-aeron-parent</artifactId>
<version>0.1.5-SNAPSHOT</version>
<version>0.2.0-SNAPSHOT</version>
</parent>

<artifactId>reactor-aeron-benchmarks</artifactId>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>

</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.scalecube</groupId>
Expand Down Expand Up @@ -64,46 +77,14 @@
<artifactId>disruptor</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>${netty.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.console.ContinueBarrier;
import reactor.aeron.mdc.AeronClient;
import reactor.aeron.mdc.AeronResources;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -33,7 +35,7 @@ public static void main(String... args) {
.start()
.block();

AeronConnection connection =
AeronDuplex<DirectBuffer> connection =
AeronClient.create(resources)
.options(
Configurations.MDC_ADDRESS,
Expand Down Expand Up @@ -71,7 +73,7 @@ public static void main(String... args) {
connection.onDispose(resources).onDispose().block();
}

private static void roundTripMessages(AeronConnection connection, long count) {
private static void roundTripMessages(AeronDuplex<DirectBuffer> connection, long count) {
HISTOGRAM.reset();

Disposable disp = reporter.start();
Expand Down Expand Up @@ -102,23 +104,16 @@ private static void roundTripMessages(AeronConnection connection, long count) {
}

private static class NanoTimeGeneratorHandler implements DirectBufferHandler<Object> {

private static final UnsafeBuffer OFFER_BUFFER =
new UnsafeBuffer(
BufferUtil.allocateDirectAligned(
Configurations.MESSAGE_LENGTH, BitUtil.CACHE_LINE_LENGTH));

@Override
public int estimateLength(Object ignore) {
return Configurations.MESSAGE_LENGTH;
}

@Override
public DirectBuffer map(Object ignore, int length) {
public DirectBuffer map(Object ignore) {
OFFER_BUFFER.putLong(0, System.nanoTime());
return OFFER_BUFFER;
}

@Override
public void dispose(Object ignore) {}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package reactor.aeron;

import reactor.aeron.mdc.AeronResources;
import reactor.aeron.mdc.AeronServer;

public final class AeronPongServer {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package reactor.aeron;

import java.util.stream.Stream;
import reactor.aeron.mdc.AeronClient;
import reactor.aeron.mdc.AeronResources;
import reactor.core.publisher.Flux;

public class ClientDemo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
import org.agrona.DirectBuffer;
import reactor.aeron.mdc.AeronClient;
import reactor.aeron.mdc.AeronResources;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.agrona.BitUtil;
import org.agrona.BufferUtil;
import org.agrona.concurrent.UnsafeBuffer;
import reactor.aeron.mdc.AeronClient;
import reactor.aeron.mdc.AeronResources;
import reactor.core.publisher.Flux;

public class ClientThroughput {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface Configurations {
int FRAGMENT_COUNT_LIMIT = Integer.getInteger("reactor.aeron.sample.frameCountLimit", 10);
int MESSAGE_LENGTH = Integer.getInteger("reactor.aeron.sample.messageLength", 128);
int REQUESTED = Integer.getInteger("reactor.aeron.sample.request", 16);

int WARMUP_NUMBER_OF_ITERATIONS = Integer.getInteger("reactor.aeron.sample.warmup.iterations", 5);
long WARMUP_NUMBER_OF_MESSAGES = Long.getLong("reactor.aeron.sample.warmup.messages", 10_000);
long NUMBER_OF_MESSAGES = Long.getLong("reactor.aeron.sample.messages", 100_000_000);
Expand All @@ -34,7 +34,7 @@ public interface Configurations {
int MDC_CONTROL_PORT = Integer.getInteger("reactor.aeron.sample.mdc.control.port", 13001);
int MDC_STREAM_ID = Integer.getInteger("reactor.aeron.sample.mdc.stream.id", 0xcafe0000);
int MDC_SESSION_ID = Integer.getInteger("reactor.aeron.sample.mdc.session.id", 1001);

String IDLE_STRATEGY = System.getProperty("reactor.aeron.sample.idle.strategy", "busyspin");
long REPORT_INTERVAL = Long.getLong("reactor.aeron.sample.report.interval", 1);
long TRACE_REPORTER_INTERVAL = Long.getLong("reactor.aeron.sample.report.interval", 60);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package reactor.aeron;

import static reactor.aeron.DefaultFragmentMapper.asString;

import reactor.aeron.mdc.AeronResources;
import reactor.aeron.mdc.AeronServer;

public class ServerDemo {

/**
Expand All @@ -17,7 +22,7 @@ public static void main(String[] args) {
connection
.inbound()
.receive()
.asString()
.map(asString())
.log("receive")
.then(connection.onDispose()))
.bind()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.time.Duration;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import reactor.aeron.mdc.AeronResources;
import reactor.aeron.mdc.AeronServer;
import reactor.core.publisher.Flux;

public class ServerServerSends {
Expand Down Expand Up @@ -44,13 +46,8 @@ static class ByteBufHandler implements DirectBufferHandler<ByteBuf> {
static final ByteBufHandler defaultInstance = new ByteBufHandler();

@Override
public int estimateLength(ByteBuf buffer) {
return buffer.readableBytes();
}

@Override
public DirectBuffer map(ByteBuf buffer, int length) {
return new UnsafeBuffer(buffer.nioBuffer(), 0, length);
public DirectBuffer map(ByteBuf buffer) {
return new UnsafeBuffer(buffer.nioBuffer(), 0, buffer.readableBytes());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package reactor.aeron;

import reactor.aeron.mdc.AeronResources;
import reactor.aeron.mdc.AeronServer;

public class ServerThroughput {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.NettyPipeline.SendOptions;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
Expand Down Expand Up @@ -98,14 +97,12 @@ private static void roundTripMessages(Connection connection, long count) {

connection
.outbound()
.options(SendOptions::flushOnEach)
.sendObject(Flux.range(0, Configurations.REQUESTED))
.then()
.subscribe();

connection
.outbound()
.options(SendOptions::flushOnEach)
.sendObject(
connection
.inbound()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import reactor.aeron.Configurations;
import reactor.netty.NettyPipeline.SendOptions;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;
Expand Down Expand Up @@ -49,7 +48,7 @@ public static void main(String[] args) {
}))
.handle(
(inbound, outbound) ->
outbound.options(SendOptions::flushOnEach).send(inbound.receive().retain()))
outbound.send(inbound.receive().retain()))
.bind()
.doOnSuccess(
server ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static void main(final String[] args) {
SigInt.register(() -> running.set(false));

RateReporter reporter = new RateReporter();

try (Aeron aeron = Aeron.connect(ctx);
Subscription subscription = aeron.addSubscription(INBOUND_CHANNEL, STREAM_ID);
Publication publication =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.reactor.aeron.AeronClientTransport;
import io.rsocket.util.ByteBufPayload;
import reactor.aeron.AeronClient;
import reactor.aeron.AeronResources;
import reactor.aeron.Configurations;
import reactor.aeron.RateReporter;
import reactor.aeron.mdc.AeronResources;

public final class RSocketAeronClientTps {

Expand All @@ -31,19 +27,20 @@ public static void main(String... args) {
.start()
.block();

RSocket client =
RSocketFactory.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(
() ->
new AeronClientTransport(
AeronClient.create(resources)
.options(
Configurations.MDC_ADDRESS,
Configurations.MDC_PORT,
Configurations.MDC_CONTROL_PORT)))
.start()
.block();
RSocket client = null;
// todo io.rsocket.transport.ClientTransport was changed
// RSocketFactory.connect()
// .frameDecoder(PayloadDecoder.ZERO_COPY)
// .transport(
// () ->
// new AeronClientTransport(
// AeronClient.create(resources)
// .options(
// Configurations.MDC_ADDRESS,
// Configurations.MDC_PORT,
// Configurations.MDC_CONTROL_PORT)))
// .start()
// .block();

RateReporter reporter = new RateReporter();

Expand Down
Loading

0 comments on commit e261742

Please sign in to comment.