Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>

# Conflicts:
#	commons-hpcc/pom.xml
#	dfsclient/pom.xml
#	pom.xml
#	wsclient/pom.xml
  • Loading branch information
jakesmith committed May 17, 2024
2 parents 7a643fd + 02e17b4 commit 887f3b4
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 76 deletions.
10 changes: 4 additions & 6 deletions .github/workflows/JAPIPRBuildAction.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
fetch-depth: 0
- name: Rebase
run: |
git config user.email '[email protected]'
git config user.name 'hpccsystems development'
git rebase origin/${{ github.event.pull_request.base.ref }}
git log --pretty=one -n 15

- name: Set up JDK 11
uses: actions/setup-java@v1
Expand All @@ -34,3 +28,7 @@ jobs:

- name: Build with Maven
run: mvn -B package --file pom.xml

# Expect a failure here, verifying that the test suite fails early on init issues
- name: Test Suite Verification
run: "! mvn test --activate-profiles jenkins-on-demand -Dhpccconn=https://bad_host:8010"
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public boolean hasNext() throws HpccFileException
{
if (this.rootRecordBuilder == null)
{
throw new HpccFileException("RecordReader must be initialized before being used.");
throw new HpccFileException("BinaryRecordReader.hasNext(): RecordReader must be initialized before being used. rootRecordBuilder is null, hasNext() failed.");
}

int nextByte = -1;
Expand Down Expand Up @@ -299,7 +299,7 @@ public boolean hasNext() throws HpccFileException
}
catch (IOException e)
{
throw new HpccFileException(e);
throw new HpccFileException("BinaryRecordReader.hasNext(): failed to peek at the next byte in the input stream:" + e.getMessage(),e);
}

return nextByte >= 0;
Expand All @@ -314,7 +314,7 @@ public Object getNext() throws HpccFileException
{
if (this.rootRecordBuilder == null)
{
throw new HpccFileException("RecordReader must be initialized before being used.");
throw new HpccFileException("BinaryRecordReader.getNext(): RecordReader must be initialized before being used, rootRecordBuilder is null.");
}

if (!this.hasNext()) throw new NoSuchElementException("No next record!");
Expand All @@ -325,13 +325,13 @@ public Object getNext() throws HpccFileException
record = parseRecord(this.rootRecordDefinition, this.rootRecordBuilder, this.defaultLE);
if (record == null)
{
throw new HpccFileException("RecordContent not found, or invalid record structure. Check logs for more information.");
throw new HpccFileException("BinaryRecordReader.getNext(): RecordContent not found, or invalid record structure. Check logs for more information.");
}

}
catch (Exception e)
{
throw new HpccFileException("Failed to parse next record: " + e.getMessage(), e);
throw new HpccFileException("BinaryRecordReader.getNext(): Failed to parse next record: " + e.getMessage(), e);
}

this.streamPosAfterLastRecord = this.inputStream.getStreamPosition();
Expand Down Expand Up @@ -370,7 +370,7 @@ private Object parseFlatField(FieldDef fd, boolean isLittleEndian) throws Unpars

if (fd.isFixed() && fd.getDataLen() > Integer.MAX_VALUE)
{
throw new UnparsableContentException("Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE);
throw new UnparsableContentException("BinaryRecordReader.parseFlatField(): Data length: " + fd.getDataLen() + " exceeds max supported length: " + Integer.MAX_VALUE);
}

// Embedded field lengths are little endian
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DataPartition implements Serializable
private String fileAccessBlob;
private FileType fileType;
private boolean isTLK;
private String fileName;

public static enum FileType
{
Expand Down Expand Up @@ -197,13 +198,42 @@ private DataPartition(String[] copyLocations, String[] copyPaths, int partNum, i
* the file type
*/
private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter,
String fileAccessBlob, FileType fileType)
String fileAccessBlob, FileType fileType) {
this(copylocations,copyPaths,this_part,num_parts,clearport,sslport,filter,fileAccessBlob,fileType,null);
}
/**
* Construct the data part, used by makeParts.
*
* @param copylocations
* locations of all copies of this file part
* @param copyPaths
* the copy paths
* @param this_part
* part number
* @param num_parts
* number of parts
* @param clearport
* port number of clear communications
* @param sslport
* port number of ssl communications
* @param filter
* the file filter object
* @param fileAccessBlob
* file access token
* @param fileType
* the file type
* @param fileName
* the file name
*/
private DataPartition(String[] copylocations, String[] copyPaths, int this_part, int num_parts, int clearport, boolean sslport, FileFilter filter,
String fileAccessBlob, FileType fileType, String fileName)
{
this.this_part = this_part;
this.num_parts = num_parts;
this.rowservicePort = clearport;
this.useSSL = sslport;
this.fileFilter = filter;
this.fileName=fileName;
if (this.fileFilter == null)
{
this.fileFilter = new FileFilter();
Expand Down Expand Up @@ -348,6 +378,16 @@ public boolean getUseSsl()
return useSSL;
}

/**
* File name being read
*
* @return filename
*/
public String getFileName()
{
return fileName;
}

/**
* Copy Path.
*
Expand Down Expand Up @@ -415,8 +455,7 @@ public DataPartition setFilter(FileFilter filter)
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append(this.getThisPart());
sb.append(" copy locations: {");
sb.append("part ").append(this.getThisPart()).append(", copy locations: {");
for (int copyindex = 0; copyindex < getCopyCount(); copyindex++)
{
if (copyindex > 0) sb.append(", ");
Expand Down Expand Up @@ -471,6 +510,31 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl
return createPartitions(dfuparts, clusterremapper, max_parts, FileFilter.nullFilter(), fileAccessBlob, FileType.FLAT);
}


/**
* Creates the partitions.
*
* @param dfuparts
* the dfuparts
* @param clusterremapper
* the clusterremapper
* @param max_parts
* the max parts
* @param filter
* the filter
* @param fileAccessBlob
* the file access blob
* @param fileType
* the file type
* @return the data partition[]
* @throws HpccFileException
* the hpcc file exception
*/
public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter,
String fileAccessBlob, FileType fileType) throws HpccFileException {
return createPartitions(dfuparts,clusterremapper,max_parts,filter,fileAccessBlob,fileType,null);
}

/**
* Creates the partitions.
*
Expand All @@ -486,12 +550,14 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl
* the file access blob
* @param fileType
* the file type
* @param fileName
* the file name
* @return the data partition[]
* @throws HpccFileException
* the hpcc file exception
*/
public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, ClusterRemapper clusterremapper, int max_parts, FileFilter filter,
String fileAccessBlob, FileType fileType) throws HpccFileException
String fileAccessBlob, FileType fileType, String fileName) throws HpccFileException
{
DataPartition[] rslt = new DataPartition[dfuparts.length];

Expand All @@ -508,7 +574,7 @@ public static DataPartition[] createPartitions(DFUFilePartWrapper[] dfuparts, Cl

DataPartition new_dp = new DataPartition(clusterremapper.reviseIPs(dfuparts[i].getCopies()), copyPaths, dfuparts[i].getPartIndex(),
dfuparts.length, clusterremapper.revisePort(null), clusterremapper.getUsesSSLConnection(null), filter, fileAccessBlob,
fileType);
fileType,fileName);
new_dp.isTLK = dfuparts[i].isTopLevelKey();

rslt[i] = new_dp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ private void createDataParts() throws HpccFileException
{
ClusterRemapper clusterremapper = ClusterRemapper.makeMapper(clusterRemapInfo, fileinfoforread);
this.dataParts = DataPartition.createPartitions(fileinfoforread.getFileParts(), clusterremapper,
/* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType);
/* maxParts currently ignored anyway */0, filter, fileinfoforread.getFileAccessInfoBlob(), fileType,this.getFileName());

// Check to see if this file has a TLK. The TLK will always be the last partition.
// If we do have a TLK remove it from the standard list of data partitions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* @throws Exception
* general exception
*/
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception
public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilder recBuilder, int connectTimeout, int limit, boolean createPrefetchThread, int readSizeKB, FileReadResumeInfo resumeInfo, int socketOpTimeoutMs) throws Exception
{
this.handlePrefetch = createPrefetchThread;
this.originalRecordDef = originalRD;
Expand Down Expand Up @@ -280,8 +280,8 @@ private boolean retryRead()
try
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
Expand Down Expand Up @@ -434,7 +434,7 @@ public boolean hasNext()
if (!retryRead())
{
canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString(), e);
log.error("Read failure for " + this.dataPartition.toString() +":" + e.getMessage(),e);
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
Expand Down Expand Up @@ -505,7 +505,7 @@ public void close() throws Exception

long closeTimeMs = System.currentTimeMillis();
double readTimeS = (closeTimeMs - openTimeMs) / 1000.0;
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart()
log.info("HPCCRemoteFileReader: Closing file part: " + dataPartition.getThisPart() + " for " + dataPartition.getFileName()
+ " read time: " + readTimeS + "s "
+ " records read: " + recordsRead);
}
Expand Down Expand Up @@ -550,8 +550,8 @@ public void report()
{
if (getRemoteReadMessageCount() > 0)
{
log.warn("DataPartition '" + this.dataPartition + "' read operation messages:\n");
log.warn("DataPartition '" + this.dataPartition + "' read operation messages for " + dataPartition.getFileName() + ":\n");
log.warn(getRemoteReadMessages());
}
}
}
}
Loading

0 comments on commit 887f3b4

Please sign in to comment.