Skip to content

Commit

Permalink
Code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmcmu committed Sep 24, 2024
1 parent 06e5a4e commit 51fea57
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
40 changes: 16 additions & 24 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ private static Options getReadOptions()
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("read")
.argName("files")
Expand All @@ -661,14 +662,16 @@ private static Options getReadTestOptions()
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");
options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice.");
options.addOption("initial_read_size", true, "The size of the initial read request in KB sent to the rowservice,"
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");
options.addOption("read_request_size", true, "The size of the read requests in KB sent to the rowservice.");
options.addOption("read_request_delay", true, "The delay in MS between read requests sent to the rowservice.");
options.addOption("filter", true, "Specifies a filter to apply to the files read from the cluster.");
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand All @@ -692,7 +695,8 @@ private static Options getCopyOptions()
options.addOption("ignore_tlk", false, "Ignore the TLK file when reading Index files.");
options.addOption("read_retries", true, "Sets the maximum number of retries to attempt when reading a file.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("copy")
.argName("files")
Expand All @@ -715,7 +719,8 @@ private static Options getWriteOptions()
options.addRequiredOption("dest_cluster", "Destination Cluster Name", true, "Specifies the name of the cluster to write files back to.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("socket_timeout_seconds", true, "Sets the socket operation timeout in seconds.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently.");
options.addOption("connection_startup_limit", true, "Specifies the maximum number of connections to startup concurrently."
+ " useful in cases where starting up connections too quickly can overwhelm intermediate processes.");

options.addOption(Option.builder("write")
.argName("files")
Expand Down Expand Up @@ -954,13 +959,11 @@ public void run()
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);
fileReader.setMaxReadRetries(context.readRetries);

long recCount = 0;
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
recCount++;
context.getCurrentOperation().recordsRead.incrementAndGet();
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -1010,15 +1013,13 @@ public void run()
{
try
{
long recCount = 0;
while (fileReader.hasNext())
{
splitTable.addRecordPosition(fileReader.getStreamPosition());
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
recCount++;
context.getCurrentOperation().recordsRead.incrementAndGet();
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

splitTable.finish(fileReader.getStreamPosition());

Expand Down Expand Up @@ -1141,18 +1142,14 @@ public void run()
{
for (int k = 0; k < fileReaders.length; k++)
{
long recordsRead = 0;
long recordsWritten = 0;
HpccRemoteFileReader<HPCCRecord> fileReader = fileReaders[k];
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
recordsRead++;
recordsWritten++;
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
}
context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -1305,19 +1302,14 @@ public void run()
splitEnd = endingSplit.splitEnd;
}

long recordsRead = 0;
long recordsWritten = 0;
while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd)
{
HPCCRecord record = (HPCCRecord) fileReader.getNext();
fileWriter.writeRecord(record);
recordsRead++;
recordsWritten++;
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
}

context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord());
inputStreams[j].close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ private static StreamContext constructStreamContext(FieldDef rd, FieldDef pRd, i
public static final int DEFAULT_READ_REQUEST_SPAN_BATCH_SIZE = 25;
public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000; // 5 second connection timeout
public static final int DEFAULT_SOCKET_OP_TIMEOUT_MS = 15000; // 15 second timeout on read / write operations
public static final int DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS = 10;
public static final int DEFAULT_MAX_CONCURRENT_CONNECTION_STARTUPS = 100;

// Note: The platform may respond with more data than this if records are larger than this limit.
public static final int DEFAULT_MAX_READ_SIZE_KB = 4096;
public static final int DEFAULT_INITIAL_REQUEST_READ_SIZE_KB = 32;
public static final int DEFAULT_INITIAL_REQUEST_READ_SIZE_KB = 256;

private static final int SHORT_SLEEP_MS = 1;
private static final int LONG_WAIT_THRESHOLD_US = 100;
Expand Down

0 comments on commit 51fea57

Please sign in to comment.