Skip to content

Commit

Permalink
Improve usage if native ByteBuf
Browse files Browse the repository at this point in the history
Improve performances by using native ByteBuf instead of using
wrapped byte array.
Include Benchmark as Integration tests
  • Loading branch information
fredericBregier committed Mar 3, 2021
1 parent 5174335 commit 3b0e884
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -251,4 +252,23 @@ public static ByteBuf slice(final ByteBuf byteBuf, final int start,
bufSliced.writerIndex(0);
return bufSliced;
}

/**
* Replace the arrays with one Pooled ByteBuf (not wrapped)
*
* @param arrays
*
* @return the ByteBuf from pool
*/
public static ByteBuf wrappedBuffer(byte[]... arrays) {
int size = 0;
for (byte[] array : arrays) {
size += array.length;
}
final ByteBuf finalByteBuf = ByteBufAllocator.DEFAULT.buffer(size);
for (byte[] array : arrays) {
finalByteBuf.writeBytes(array);
}
return finalByteBuf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1264,7 +1264,8 @@ private void shutdownLocalChannel() {
if (time == 0) {
logger.info("Will close networkChannel {}", ncr.nbLocalChannels());
NetworkTransaction.shuttingDownNetworkChannel(ncr);
NetworkTransaction.shuttingdownNetworkChannelsPerHostID(ncr.getHostId());
NetworkTransaction
.shuttingdownNetworkChannelsPerHostID(ncr.getHostId());
}
} finally {
ncr.unlockNetwork();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.waarp.common.digest.FilesystemBasedDigest;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
Expand Down Expand Up @@ -123,7 +122,7 @@ public void createAllBuffers(final LocalChannelReference lcr,

@Override
public void createEnd(final LocalChannelReference lcr) {
end = Unpooled.wrappedBuffer(key);
end = WaarpNettyUtil.wrappedBuffer(key);
}

@Override
Expand All @@ -137,7 +136,7 @@ public void createMiddle(final LocalChannelReference lcr) {
if (dataRecv != null) {
middle = dataRecv;
} else {
middle = Unpooled.wrappedBuffer(data);
middle = WaarpNettyUtil.wrappedBuffer(data);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.waarp.common.json.JsonHandler;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.WaarpNettyUtil;
import org.waarp.common.utility.WaarpStringUtils;
import org.waarp.openr66.context.ErrorCode;
import org.waarp.openr66.protocol.configuration.Configuration;
import org.waarp.openr66.protocol.configuration.PartnerConfiguration;
Expand Down Expand Up @@ -365,7 +366,8 @@ public void createAllBuffers(final LocalChannelReference lcr,
@Override
public void createEnd(final LocalChannelReference lcr) {
if (transferInformation != null) {
end = Unpooled.wrappedBuffer(transferInformation.getBytes());
end = WaarpNettyUtil
.wrappedBuffer(transferInformation.getBytes(WaarpStringUtils.UTF8));
}
}

Expand All @@ -380,13 +382,13 @@ public void createHeader(final LocalChannelReference lcr)
final ObjectNode node = JsonHandler.createObjectNode();
JsonHandler.setValue(node, FIELDS.rule, rulename);
JsonHandler.setValue(node, FIELDS.mode, mode);
header =
Unpooled.wrappedBuffer(JsonHandler.writeAsString(node).getBytes());
header = WaarpNettyUtil.wrappedBuffer(
JsonHandler.writeAsString(node).getBytes(WaarpStringUtils.UTF8));
} else {
header = Unpooled.wrappedBuffer(rulename.getBytes(),
PartnerConfiguration.BLANK_SEPARATOR_FIELD
.getBytes(),
Integer.toString(mode).getBytes());
header = WaarpNettyUtil.wrappedBuffer(rulename.getBytes(),
PartnerConfiguration.BLANK_SEPARATOR_FIELD
.getBytes(),
Integer.toString(mode).getBytes());
}
}

Expand All @@ -408,11 +410,14 @@ public void createMiddle(final LocalChannelReference lcr)
JsonHandler.setValue(node, FIELDS.length, originalSize);
// Add limit if specified
JsonHandler.setValue(node, FIELDS.limit, limit);
middle = Unpooled
.wrappedBuffer(away, JsonHandler.writeAsString(node).getBytes());
middle = WaarpNettyUtil.wrappedBuffer(away,
JsonHandler.writeAsString(node)
.getBytes(
WaarpStringUtils.UTF8));
} else {
middle = Unpooled
.wrappedBuffer(away, filename.getBytes(), separator.getBytes(),
middle = WaarpNettyUtil
.wrappedBuffer(away, filename.getBytes(WaarpStringUtils.UTF8),
separator.getBytes(),
Integer.toString(blocksize).getBytes(),
separator.getBytes(),
Integer.toString(rank).getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,49 @@ public static int startServer(String serverConfig) throws Exception {
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (NUMBER_FILES == -1) {
Configuration.configuration.setTimeoutCon(100);
WaarpLoggerFactory.setLogLevel(WaarpLogLevel.ERROR);
for (int pid : PIDS) {
Processes.kill(pid, true);
}
tearDownAfterClassClient();
tearDownAfterClassMinimal();
tearDownAfterClassServer();
return;
}
CloseableHttpClient httpClient = null;
int max = SystemPropertyUtil.get(IT_LONG_TEST, false)? 60 : 20;
if (max < NUMBER_FILES) {
max = NUMBER_FILES * 3;
}
int totalTransfers = max;
int nb = 0;
int every10sec = 10;
HttpGet request =
new HttpGet("http://127.0.0.1:8098/v2/transfers?limit=100000");
final long startTime = System.currentTimeMillis();
try {
httpClient = HttpClientBuilder.create().setConnectionManagerShared(true)
.disableAutomaticRetries().build();
HttpGet request =
new HttpGet("http://127.0.0.1:8098/v2/transfers?limit=1000");
CloseableHttpResponse response = null;
int nb = 0;
int every10sec = 10;
int max = SystemPropertyUtil.get(IT_LONG_TEST, false)? 60 : 20;
try {
response = httpClient.execute(request);
assertEquals(200, response.getStatusLine().getStatusCode());
String content = EntityUtils.toString(response.getEntity());
ObjectNode node = JsonHandler.getFromString(content);
if (node != null) {
JsonNode number = node.findValue("totalResults");
int newNb = number.asInt();
max += newNb;
totalTransfers = max;
logger.warn("Found {} transfers", newNb);
}
} finally {
if (response != null) {
response.close();
}
}
while (nb < max) {
try {
response = httpClient.execute(request);
Expand Down Expand Up @@ -363,7 +396,12 @@ public static void tearDownAfterClass() throws Exception {
httpClient.close();
}
}
final long stopTime = System.currentTimeMillis();
totalTransfers -= nb;
logger.warn("Duration {} for {} item so {} items/s", (stopTime - startTime) / 1000.0, totalTransfers,
totalTransfers / ((stopTime - startTime) / 1000.0));
Configuration.configuration.setTimeoutCon(100);
WaarpLoggerFactory.setLogLevel(WaarpLogLevel.ERROR);
for (int pid : PIDS) {
Processes.kill(pid, true);
}
Expand All @@ -388,7 +426,7 @@ private void checkMemory() {
Runtime runtime = Runtime.getRuntime();
long newUsedMemory = runtime.totalMemory() - runtime.freeMemory();
if (newUsedMemory > MAX_USED_MEMORY) {
logger.warn("Used Memory > 2GB {} {}", usedMemory / 1048576.0,
logger.info("Used Memory > 2GB {} {}", usedMemory / 1048576.0,
newUsedMemory / 1048576.0);
}
}
Expand Down Expand Up @@ -472,6 +510,11 @@ private void testBigTransfer(boolean limit, String serverName, boolean direct,
@Test
public void test01_LoopBigSendsSyncNoLimit()
throws IOException, InterruptedException {
if (SystemPropertyUtil.get(IT_LONG_TEST, false)) {
NUMBER_FILES = 4;
} else {
NUMBER_FILES = 2;
}
logger.warn("Start {} {}", Processes.getCurrentMethodName(), NUMBER_FILES);
testBigTransfer(false, "server2", true, false, 1024 * 500 * 1024);
testBigTransfer(false, "server2", true, false, 1024 * 1024 * 1024);
Expand All @@ -485,6 +528,11 @@ public void test01_LoopBigSendsSyncNoLimit()
@Test
public void test02_LoopBigSendsSyncSslNoLimit()
throws IOException, InterruptedException {
if (SystemPropertyUtil.get(IT_LONG_TEST, false)) {
NUMBER_FILES = 2;
} else {
NUMBER_FILES = 1;
}
logger.warn("Start {} {}", Processes.getCurrentMethodName(), NUMBER_FILES);
testBigTransfer(false, "server2-ssl", true, false, 1024 * 501 * 1024);
if (SystemPropertyUtil.get(IT_LONG_TEST, false)) {
Expand All @@ -493,6 +541,123 @@ public void test02_LoopBigSendsSyncSslNoLimit()
logger.warn("End {}", Processes.getCurrentMethodName());
}

@Test
public void test03_LoopBigSendsSyncSslNoLimit()
throws IOException, InterruptedException {
if (SystemPropertyUtil.get(IT_LONG_TEST, false)) {
NUMBER_FILES = 700;
} else {
NUMBER_FILES = -1;
logger.warn("Test disabled without IT_LONG_TEST");
Assume.assumeTrue("If the Long term tests are allowed",
SystemPropertyUtil.get(IT_LONG_TEST, false));
return;
}
logger.warn("Start {} {}", Processes.getCurrentMethodName(), NUMBER_FILES);
int limit = 700;
int middleSize = 360 - (limit / 2);

Assume.assumeNotNull(networkTransaction);
Configuration.configuration.changeNetworkLimit(0, 0, 0, 0, 1000);
File baseDir = new File("/tmp/R66/scenario_big_file_limitbdw/R1/out/");
for (int i = 0; i < limit; i++) {
int size = (middleSize + i) * 1024;
File fileOut = new File(baseDir, "hello" + size);
final File outHello = generateOutFile(fileOut.getAbsolutePath(), size);
}
logger.warn("End of file creations");
long timestart = System.currentTimeMillis();
R66Future[] futures = new R66Future[limit];
for (int i = 0; i < limit; i++) {
int size = (middleSize + i) * 1024;
final R66Future future = new R66Future(true);
futures[i] = future;
final TestTransferNoDb transaction =
new TestTransferNoDb(future, "server2-ssl", "hello" + size, "loop",
"Test Loop Send", true, BLOCK_SIZE,
DbConstantR66.ILLEGALVALUE,
networkTransaction);
transaction.setNormalInfoAsWarn(false);
transaction.run();
}
for (int i = 0; i < limit; i++) {
futures[i].awaitOrInterruptible();
assertTrue(futures[i].isSuccess());
}
//logger.info("Runner: {}", future.getRunner());
long timestop = System.currentTimeMillis();
logger.warn("Direct {}, Recv {}, LimitBandwidth {} " +
"({} seconds, {} MBPS vs {} " +
"and {}) of size {} with block size {}", true, false, limit,
(timestop - timestart) / 1000,
limit * 180*1024 / 1000.0 / (timestop - timestart),
Configuration.configuration.getServerGlobalReadLimit() /
1000000.0,
Configuration.configuration.getServerChannelReadLimit() /
1000000.0, limit *180*1024, BLOCK_SIZE);
checkMemory();
logger.warn("End {}", Processes.getCurrentMethodName());
}


@Test
public void test04_LoopBigSendsSyncNoLimit()
throws IOException, InterruptedException {
if (SystemPropertyUtil.get(IT_LONG_TEST, false)) {
NUMBER_FILES = 700;
} else {
NUMBER_FILES = -1;
logger.warn("Test disabled without IT_LONG_TEST");
Assume.assumeTrue("If the Long term tests are allowed",
SystemPropertyUtil.get(IT_LONG_TEST, false));
return;
}
logger.warn("Start {} {}", Processes.getCurrentMethodName(), NUMBER_FILES);
int limit = 700;
int middleSize = 360 - (limit / 2);

Assume.assumeNotNull(networkTransaction);
Configuration.configuration.changeNetworkLimit(0, 0, 0, 0, 1000);
File baseDir = new File("/tmp/R66/scenario_big_file_limitbdw/R1/out/");
for (int i = 0; i < limit; i++) {
int size = (middleSize + i) * 1024;
File fileOut = new File(baseDir, "hello" + size);
final File outHello = generateOutFile(fileOut.getAbsolutePath(), size);
}
logger.warn("End of file creations");
long timestart = System.currentTimeMillis();
R66Future[] futures = new R66Future[limit];
for (int i = 0; i < limit; i++) {
int size = (middleSize + i) * 1024;
final R66Future future = new R66Future(true);
futures[i] = future;
final TestTransferNoDb transaction =
new TestTransferNoDb(future, "server2", "hello" + size, "loop",
"Test Loop Send", true, BLOCK_SIZE,
DbConstantR66.ILLEGALVALUE,
networkTransaction);
transaction.setNormalInfoAsWarn(false);
transaction.run();
}
for (int i = 0; i < limit; i++) {
futures[i].awaitOrInterruptible();
assertTrue(futures[i].isSuccess());
}
//logger.info("Runner: {}", future.getRunner());
long timestop = System.currentTimeMillis();
logger.warn("Direct {}, Recv {}, LimitBandwidth {} " +
"({} seconds, {} MBPS vs {} " +
"and {}) of size {} with block size {}", true, false, limit,
(timestop - timestart) / 1000,
limit * 180*1024 / 1000.0 / (timestop - timestart),
Configuration.configuration.getServerGlobalReadLimit() /
1000000.0,
Configuration.configuration.getServerChannelReadLimit() /
1000000.0, limit *180*1024, BLOCK_SIZE);
checkMemory();
logger.warn("End {}", Processes.getCurrentMethodName());
}

private void waitForAllDone(DbTaskRunner runner) {
while (true) {
try {
Expand Down
Loading

0 comments on commit 3b0e884

Please sign in to comment.