From d67e4b767fa0e1755073d78de58e0e92fbf3cb6e Mon Sep 17 00:00:00 2001 From: James McMullan Date: Fri, 8 Sep 2023 09:26:53 -0400 Subject: [PATCH] HPCC4J-540 Missing file error during publish (#640) * HPCC4J-540 Missing file error during publish - Corrected error handling on prefetch thread to pass errors on startup - Fixed potential data race in exception handling - Added invalid signature test Signed-off-by: James McMullan James.McMullan@lexisnexis.com * Code review changes --------- Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/DataPartition.java | 20 +++-- .../dfs/client/HpccRemoteFileReader.java | 7 ++ .../dfs/client/RowServiceInputStream.java | 37 +++++++-- .../dfs/client/RowServiceOutputStream.java | 23 +++++- .../dfs/client/DFSReadWriteTest.java | 82 ++++++++++++++++++- 5 files changed, 152 insertions(+), 17 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index f6e8afd58..bc648b49c 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -78,7 +78,7 @@ public static enum FileType /* * (non-Javadoc) - * + * * @see java.lang.Enum#toString() */ public String toString() @@ -221,6 +221,16 @@ public String getFileAccessBlob() return this.fileAccessBlob; } + /** + * Set the security access blob. + * + * @param accessBlob security access blob + */ + public void setFileAccessBlob(String accessBlob) + { + this.fileAccessBlob = accessBlob; + } + /** * The underying file format for this partition . * @@ -330,11 +340,11 @@ public FileFilter getFilter() { return this.fileFilter; } - + /** * Set the filter object to select specific rows. * - * @param filter file filter + * @param filter file filter * @return the partition */ public DataPartition setFilter(FileFilter filter) @@ -345,7 +355,7 @@ public DataPartition setFilter(FileFilter filter) /* * (non-Javadoc) - * + * * @see java.lang.Object */ public String toString() @@ -469,7 +479,7 @@ public int index() /** * Is this data partition the TLK - * + * * @return isTLK */ public boolean isTLK() diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index c0387aad5..38e283215 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -307,6 +307,13 @@ public boolean hasNext() try { rslt = this.binaryRecordReader.hasNext(); + + // Has next may not catch the prefetch exception if it occurs at the beginning of a read + // This is due to InputStream.hasNext() being allowed to throw an IOException when closed. + if (this.inputStream.getPrefetchException() != null) + { + throw this.inputStream.getPrefetchException(); + } } catch (HpccFileException e) { diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 99b488539..794c73b1c 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -491,6 +491,16 @@ private int getFilePartCopy() return filePartCopyIndexPointer; } + /** + * The HpccFileException from the prefetch thread if an exception has occured. + * + * @return the prefetch exception + */ + public HpccFileException getPrefetchException() + { + return this.prefetchException; + } + /** * The port number for the remote read service. * @@ -761,6 +771,19 @@ private int startFetch() return -1; } + if (response.errorCode != RFCCodes.RFCStreamNoError) + { + prefetchException = new HpccFileException(response.errorMessage); + try + { + close(); + } + catch (IOException e) + {} + + return -1; + } + // If len < 0 we have finished reading the file if (response.len < 0) { @@ -775,12 +798,6 @@ private int startFetch() return -1; } - if (response.errorCode != RFCCodes.RFCStreamNoError) - { - prefetchException = new HpccFileException(response.errorMessage); - return -1; - } - //------------------------------------------------------------------------------ // Retry with the token if handle is invalid //------------------------------------------------------------------------------ @@ -1148,6 +1165,11 @@ public int available() throws IOException // Do the check for closed first here to avoid data races if (this.closed.get()) { + if (this.prefetchException != null) + { + throw new IOException("Prefetch thread exited early exception.", this.prefetchException); + } + int bufferLen = this.readBufferDataLen.get(); int availBytes = bufferLen - this.readPos; if (availBytes == 0) @@ -1529,7 +1551,6 @@ private void makeActive() throws HpccFileException throw new HpccFileException("Failed to create streams", e); } - //------------------------------------------------------------------------------ // Check protocol version //------------------------------------------------------------------------------ @@ -1567,7 +1588,7 @@ private void makeActive() throws HpccFileException throw new HpccFileException("Error while attempting to read version response.", e); } - rowServiceVersion = new String(versionBytes, HPCCCharSet); + rowServiceVersion = new String(versionBytes, HPCCCharSet); } //------------------------------------------------------------------------------ diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java index 46dc27a83..5e5b95ceb 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceOutputStream.java @@ -241,7 +241,7 @@ private static class RowServiceResponse throw new HpccFileException("Error while attempting to read version response.", e); } - rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1); + rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1); } // Go ahead and make the initial write request. This won't write any data to file @@ -298,6 +298,11 @@ private void makeInitialWriteRequest() throws Exception RowServiceResponse response = this.readResponse(); this.handle = response.handle; + + if (response.errorCode != RFCCodes.RFCStreamNoError) + { + throw new IOException(response.errorMessage); + } } private String makeCloseHandleRequest() @@ -336,14 +341,20 @@ private void sendCloseFileRequest() throws IOException throw new IOException("Failed on close file with error: ", e); } + RowServiceResponse response = null; try { - readResponse(); + response = readResponse(); } catch (HpccFileException e) { throw new IOException("Failed to close file. Unable to read response with error: ", e); } + + if (response.errorCode != RFCCodes.RFCStreamNoError) + { + throw new IOException(response.errorMessage); + } } private RowServiceResponse readResponse() throws HpccFileException @@ -493,15 +504,21 @@ public void write(byte[] b, int off, int len) throws IOException bytesWritten += len; + RowServiceResponse response = null; try { - RowServiceResponse response = readResponse(); + response = readResponse(); this.handle = response.handle; } catch (HpccFileException e) { throw new IOException("Failed during write operation. Unable to read response with error: ", e); } + + if (response.errorCode != RFCCodes.RFCStreamNoError) + { + throw new IOException(response.errorMessage); + } } /* diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index a8079cb25..2fc59c86d 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -1014,7 +1014,7 @@ public void emptyCompressedFileTest() Version remoteVersion = dfuClient.getTargetHPCCBuildVersion(); FieldDef[] fieldDefs = new FieldDef[2]; - fieldDefs[0] = new FieldDef("key", FieldType.INTEGER, "lNTEGER4", 4, true, false, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); + fieldDefs[0] = new FieldDef("key", FieldType.INTEGER, "INTEGER4", 4, true, false, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]); fieldDefs[1] = new FieldDef("value", FieldType.STRING, "STRING", 0, false, false, HpccSrcType.UTF8, new FieldDef[0]); FieldDef recordDef = new FieldDef("RootRecord", FieldType.RECORD, "rec", 4, false, false, HpccSrcType.LITTLE_ENDIAN, fieldDefs); @@ -1039,6 +1039,86 @@ public void emptyCompressedFileTest() } } + @Test + public void invalidSignatureTest() + { + + HPCCFile readFile = null; + { + Exception readException = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + + for (int i = 0; i < fileParts.length; i++) + { + fileParts[i].setFileAccessBlob("invalid_blob"); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + readException = e; + } + + // We are expecting a failure + if (readException != null) + { + System.out.println("Test passed with expected exception: " + readException.getMessage()); + } + else + { + Assert.fail("Expected an exception during read due to the invalid signature"); + } + } + + { + Exception writeException = null; + try + { + FieldDef recordDef = readFile.getRecordDefinition(); + String eclRecordDefn = RecordDefinitionTranslator.toECLRecord(recordDef); + + HPCCWsDFUClient dfuClient = wsclient.getWsDFUClient(); + DFUCreateFileWrapper createResult = dfuClient.createFile(datasets[0] + "-copy123", this.thorClusterFileGroup, eclRecordDefn, 300, false, DFUFileTypeWrapper.Flat, ""); + + DFUFilePartWrapper[] dfuFileParts = createResult.getFileParts(); + DataPartition[] hpccPartitions = DataPartition.createPartitions(dfuFileParts, + new NullRemapper(new RemapInfo(), createResult.getFileAccessInfo()), dfuFileParts.length, createResult.getFileAccessInfoBlob()); + + for (int partitionIndex = 0; partitionIndex < hpccPartitions.length; partitionIndex++) + { + hpccPartitions[partitionIndex].setFileAccessBlob("invalid_blob"); + + HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef); + HPCCRemoteFileWriter fileWriter = null; + + fileWriter = new HPCCRemoteFileWriter(hpccPartitions[partitionIndex], recordDef, recordAccessor, CompressionAlgorithm.NONE); + fileWriter.close(); + } + + dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, 0, 0, true); + } + catch (Exception e) + { + writeException = e; + } + + // We are expecting a failure + if (writeException != null) + { + System.out.println("Test passed with expected exception: " + writeException.getMessage()); + } + else + { + Assert.fail("Expected an exception during write due to the invalid signature"); + } + } + } + public List readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception { return readFile(file, connectTimeoutMillis, shouldForceTimeout, false, BinaryRecordReader.NO_STRING_PROCESSING);