Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x' into candidate-…
Browse files Browse the repository at this point in the history
…9.8.x

Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Sep 20, 2024
2 parents 735de52 + 7ca77b8 commit 819f436
Show file tree
Hide file tree
Showing 37 changed files with 442 additions and 209 deletions.
38 changes: 27 additions & 11 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class FileUtility
static private final int DEFAULT_READ_REQUEST_SIZE = 4096;
static private final int DEFAULT_READ_REQUEST_DELAY = 0;

private static boolean otelInitialized = false;
private static boolean otelNeedsInit = true;

private static class TaskContext
{
Expand Down Expand Up @@ -910,11 +910,13 @@ public void run()
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);
fileReader.setMaxReadRetries(context.readRetries);

long recCount = 0;
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -963,13 +965,15 @@ public void run()
{
try
{
long recCount = 0;
while (fileReader.hasNext())
{
splitTable.addRecordPosition(fileReader.getStreamPosition());
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

splitTable.finish(fileReader.getStreamPosition());

Expand Down Expand Up @@ -1091,14 +1095,18 @@ public void run()
{
for (int k = 0; k < fileReaders.length; k++)
{
long recordsRead = 0;
long recordsWritten = 0;
HpccRemoteFileReader<HPCCRecord> fileReader = fileReaders[k];
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}
context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -1251,14 +1259,19 @@ public void run()
splitEnd = endingSplit.splitEnd;
}

long recordsRead = 0;
long recordsWritten = 0;
while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd)
{
HPCCRecord record = (HPCCRecord) fileReader.getNext();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}

context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord());
inputStreams[j].close();
}
Expand Down Expand Up @@ -2124,7 +2137,7 @@ private static void performWrite(String[] args, TaskContext context)
*/
public static JSONArray run(String[] args)
{
if (!otelInitialized)
if (otelNeedsInit)
{
if (Boolean.getBoolean("otel.java.global-autoconfigure.enabled"))
{
Expand All @@ -2140,10 +2153,13 @@ public static JSONArray run(String[] args)
System.out.println(" otel.metrics.exporter: "+ System.getProperty("otel.metrics.exporter"));
System.out.println(" OTEL_METRICS_EXPORTER Env var: " + System.getenv("OTEL_METRICS_EXPORTER"));

OpenTelemetry otel = AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
if (!org.hpccsystems.ws.client.utils.Utils.isOtelJavaagentUsed())
{
AutoConfiguredOpenTelemetrySdk.initialize().getOpenTelemetrySdk();
}
}

otelInitialized = true;
otelNeedsInit = false;
}

Options options = getTopLevelOptions();
Expand Down Expand Up @@ -2202,4 +2218,4 @@ public static void main(String[] args)

return;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;

import org.hpccsystems.dfs.client.RowServiceOutputStream;
import org.hpccsystems.dfs.client.Utils;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*******************************************************************************/
package org.hpccsystems.dfs.client;

import org.hpccsystems.dfs.client.Utils;

import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;
Expand All @@ -22,7 +20,6 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.semconv.ServerAttributes;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.semconv.ExceptionAttributes;
import io.opentelemetry.semconv.ServerAttributes;
import io.opentelemetry.semconv.ServiceAttributes;

/**
Expand Down Expand Up @@ -1435,8 +1433,6 @@ private void compactBuffer()
@Override
public int available() throws IOException
{
String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";

// Do the check for closed first here to avoid data races
if (this.closed.get())
{
Expand All @@ -1449,6 +1445,8 @@ public int available() throws IOException
int availBytes = bufferLen - this.readPos;
if (availBytes == 0)
{
String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":";

// this.bufferWriteMutex.release();
IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0");
throw wrappedException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.hpccsystems.ws.client;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -43,14 +42,12 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.opentelemetry.semconv.HttpAttributes;
import io.opentelemetry.semconv.ServerAttributes;

Expand Down Expand Up @@ -91,6 +88,7 @@ public boolean isTargetHPCCContainerized() throws Exception
return targetsContainerizedHPCC;
}

@WithSpan
private boolean getTargetHPCCIsContainerized(Connection conn) throws Exception
{
if (wsconn == null)
Expand Down Expand Up @@ -155,6 +153,7 @@ public Version getTargetHPCCBuildVersion()
return targetHPCCBuildVersion;
}

@WithSpan
private String getTargetHPCCBuildVersionString() throws Exception
{
if (wsconn == null)
Expand Down Expand Up @@ -266,47 +265,29 @@ protected boolean initBaseWsClient(Connection connection, boolean fetchVersionAn

if (fetchVersionAndContainerMode)
{
Span fetchHPCCVerSpan = getWsClientSpanBuilder("FetchHPCCVersion").setSpanKind(SpanKind.INTERNAL).startSpan();
try (Scope scope = fetchHPCCVerSpan.makeCurrent())
try
{
try
{
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
}
catch (Exception e)
{
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
success = false;
}
targetHPCCBuildVersion = new Version(getTargetHPCCBuildVersionString());
}
finally
catch (Exception e)
{
fetchHPCCVerSpan.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCVerSpan.end();
initErrMessage = "BaseHPCCWsClient: Could not stablish target HPCC bulid version, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();
success = false;
}

Span fetchHPCCContainerMode = getWsClientSpanBuilder("FetchHPCCContainerMode").startSpan();
try (Scope scope = fetchHPCCContainerMode.makeCurrent())
try
{
try
{
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
}
catch (Exception e)
{
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
}
targetsContainerizedHPCC = getTargetHPCCIsContainerized(wsconn);
}
finally
catch (Exception e)
{
fetchHPCCContainerMode.setStatus(success ? StatusCode.OK : StatusCode.ERROR, initErrMessage);
fetchHPCCContainerMode.end();
initErrMessage = initErrMessage + "\nBaseHPCCWsClient: Could not determine target HPCC Containerization mode, review all HPCC connection values";
if (!e.getLocalizedMessage().isEmpty())
initErrMessage = initErrMessage + "\n" + e.getLocalizedMessage();

success = false;
}
}
if (!initErrMessage.isEmpty())
Expand Down
Loading

0 comments on commit 819f436

Please sign in to comment.