Skip to content

Commit

Permalink
HPCC4J-540 Missing file error during publish (#640)
Browse files Browse the repository at this point in the history
* HPCC4J-540 Missing file error during publish

- Corrected error handling on prefetch thread to pass errors on startup
- Fixed potential data race in exception handling
- Added invalid signature test

Signed-off-by: James McMullan [email protected]

* Code review changes

---------

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu authored Sep 8, 2023
1 parent 245e061 commit d67e4b7
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static enum FileType

/*
* (non-Javadoc)
*
*
* @see java.lang.Enum#toString()
*/
public String toString()
Expand Down Expand Up @@ -221,6 +221,16 @@ public String getFileAccessBlob()
return this.fileAccessBlob;
}

/**
* Set the security access blob.
*
* @param accessBlob security access blob
*/
public void setFileAccessBlob(String accessBlob)
{
this.fileAccessBlob = accessBlob;
}

/**
* The underying file format for this partition .
*
Expand Down Expand Up @@ -330,11 +340,11 @@ public FileFilter getFilter()
{
return this.fileFilter;
}

/**
* Set the filter object to select specific rows.
*
* @param filter file filter
* @param filter file filter
* @return the partition
*/
public DataPartition setFilter(FileFilter filter)
Expand All @@ -345,7 +355,7 @@ public DataPartition setFilter(FileFilter filter)

/*
* (non-Javadoc)
*
*
* @see java.lang.Object
*/
public String toString()
Expand Down Expand Up @@ -469,7 +479,7 @@ public int index()

/**
* Is this data partition the TLK
*
*
* @return isTLK
*/
public boolean isTLK()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,13 @@ public boolean hasNext()
try
{
rslt = this.binaryRecordReader.hasNext();

// Has next may not catch the prefetch exception if it occurs at the beginning of a read
// This is due to InputStream.hasNext() being allowed to throw an IOException when closed.
if (this.inputStream.getPrefetchException() != null)
{
throw this.inputStream.getPrefetchException();
}
}
catch (HpccFileException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,16 @@ private int getFilePartCopy()
return filePartCopyIndexPointer;
}

/**
* The HpccFileException from the prefetch thread if an exception has occured.
*
* @return the prefetch exception
*/
public HpccFileException getPrefetchException()
{
return this.prefetchException;
}

/**
* The port number for the remote read service.
*
Expand Down Expand Up @@ -761,6 +771,19 @@ private int startFetch()
return -1;
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
prefetchException = new HpccFileException(response.errorMessage);
try
{
close();
}
catch (IOException e)
{}

return -1;
}

// If len < 0 we have finished reading the file
if (response.len < 0)
{
Expand All @@ -775,12 +798,6 @@ private int startFetch()
return -1;
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
prefetchException = new HpccFileException(response.errorMessage);
return -1;
}

//------------------------------------------------------------------------------
// Retry with the token if handle is invalid
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -1148,6 +1165,11 @@ public int available() throws IOException
// Do the check for closed first here to avoid data races
if (this.closed.get())
{
if (this.prefetchException != null)
{
throw new IOException("Prefetch thread exited early exception.", this.prefetchException);
}

int bufferLen = this.readBufferDataLen.get();
int availBytes = bufferLen - this.readPos;
if (availBytes == 0)
Expand Down Expand Up @@ -1529,7 +1551,6 @@ private void makeActive() throws HpccFileException
throw new HpccFileException("Failed to create streams", e);
}


//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -1567,7 +1588,7 @@ private void makeActive() throws HpccFileException
throw new HpccFileException("Error while attempting to read version response.", e);
}

rowServiceVersion = new String(versionBytes, HPCCCharSet);
rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private static class RowServiceResponse
throw new HpccFileException("Error while attempting to read version response.", e);
}

rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1);
rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1);
}

// Go ahead and make the initial write request. This won't write any data to file
Expand Down Expand Up @@ -298,6 +298,11 @@ private void makeInitialWriteRequest() throws Exception

RowServiceResponse response = this.readResponse();
this.handle = response.handle;

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
throw new IOException(response.errorMessage);
}
}

private String makeCloseHandleRequest()
Expand Down Expand Up @@ -336,14 +341,20 @@ private void sendCloseFileRequest() throws IOException
throw new IOException("Failed on close file with error: ", e);
}

RowServiceResponse response = null;
try
{
readResponse();
response = readResponse();
}
catch (HpccFileException e)
{
throw new IOException("Failed to close file. Unable to read response with error: ", e);
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
throw new IOException(response.errorMessage);
}
}

private RowServiceResponse readResponse() throws HpccFileException
Expand Down Expand Up @@ -493,15 +504,21 @@ public void write(byte[] b, int off, int len) throws IOException

bytesWritten += len;

RowServiceResponse response = null;
try
{
RowServiceResponse response = readResponse();
response = readResponse();
this.handle = response.handle;
}
catch (HpccFileException e)
{
throw new IOException("Failed during write operation. Unable to read response with error: ", e);
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
throw new IOException(response.errorMessage);
}
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ public void emptyCompressedFileTest()
Version remoteVersion = dfuClient.getTargetHPCCBuildVersion();

FieldDef[] fieldDefs = new FieldDef[2];
fieldDefs[0] = new FieldDef("key", FieldType.INTEGER, "lNTEGER4", 4, true, false, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
fieldDefs[0] = new FieldDef("key", FieldType.INTEGER, "INTEGER4", 4, true, false, HpccSrcType.LITTLE_ENDIAN, new FieldDef[0]);
fieldDefs[1] = new FieldDef("value", FieldType.STRING, "STRING", 0, false, false, HpccSrcType.UTF8, new FieldDef[0]);
FieldDef recordDef = new FieldDef("RootRecord", FieldType.RECORD, "rec", 4, false, false, HpccSrcType.LITTLE_ENDIAN, fieldDefs);

Expand All @@ -1039,6 +1039,86 @@ public void emptyCompressedFileTest()
}
}

@Test
public void invalidSignatureTest()
{

HPCCFile readFile = null;
{
Exception readException = null;
try
{
readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass);
DataPartition[] fileParts = readFile.getFileParts();

for (int i = 0; i < fileParts.length; i++)
{
fileParts[i].setFileAccessBlob("invalid_blob");
}

List<HPCCRecord> records = readFile(readFile, null, false);
System.out.println("Record count: " + records.size());
}
catch (Exception e)
{
readException = e;
}

// We are expecting a failure
if (readException != null)
{
System.out.println("Test passed with expected exception: " + readException.getMessage());
}
else
{
Assert.fail("Expected an exception during read due to the invalid signature");
}
}

{
Exception writeException = null;
try
{
FieldDef recordDef = readFile.getRecordDefinition();
String eclRecordDefn = RecordDefinitionTranslator.toECLRecord(recordDef);

HPCCWsDFUClient dfuClient = wsclient.getWsDFUClient();
DFUCreateFileWrapper createResult = dfuClient.createFile(datasets[0] + "-copy123", this.thorClusterFileGroup, eclRecordDefn, 300, false, DFUFileTypeWrapper.Flat, "");

DFUFilePartWrapper[] dfuFileParts = createResult.getFileParts();
DataPartition[] hpccPartitions = DataPartition.createPartitions(dfuFileParts,
new NullRemapper(new RemapInfo(), createResult.getFileAccessInfo()), dfuFileParts.length, createResult.getFileAccessInfoBlob());

for (int partitionIndex = 0; partitionIndex < hpccPartitions.length; partitionIndex++)
{
hpccPartitions[partitionIndex].setFileAccessBlob("invalid_blob");

HPCCRecordAccessor recordAccessor = new HPCCRecordAccessor(recordDef);
HPCCRemoteFileWriter<HPCCRecord> fileWriter = null;

fileWriter = new HPCCRemoteFileWriter<HPCCRecord>(hpccPartitions[partitionIndex], recordDef, recordAccessor, CompressionAlgorithm.NONE);
fileWriter.close();
}

dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, 0, 0, true);
}
catch (Exception e)
{
writeException = e;
}

// We are expecting a failure
if (writeException != null)
{
System.out.println("Test passed with expected exception: " + writeException.getMessage());
}
else
{
Assert.fail("Expected an exception during write due to the invalid signature");
}
}
}

public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception
{
return readFile(file, connectTimeoutMillis, shouldForceTimeout, false, BinaryRecordReader.NO_STRING_PROCESSING);
Expand Down

0 comments on commit d67e4b7

Please sign in to comment.