Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC4J-540 Missing file error during publish #640

Merged
merged 2 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static enum FileType

/*
* (non-Javadoc)
*
*
* @see java.lang.Enum#toString()
*/
public String toString()
Expand Down Expand Up @@ -221,6 +221,16 @@ public String getFileAccessBlob()
return this.fileAccessBlob;
}

/**
* Set the security access blob.
*
* @param accessBlob security access blob
*/
public void setFileAccessBlob(String accessBlob)
{
this.fileAccessBlob = accessBlob;
}

/**
* The underying file format for this partition .
*
Expand Down Expand Up @@ -330,11 +340,11 @@ public FileFilter getFilter()
{
return this.fileFilter;
}

/**
* Set the filter object to select specific rows.
*
* @param filter file filter
* @param filter file filter
* @return the partition
*/
public DataPartition setFilter(FileFilter filter)
Expand All @@ -345,7 +355,7 @@ public DataPartition setFilter(FileFilter filter)

/*
* (non-Javadoc)
*
*
* @see java.lang.Object
*/
public String toString()
Expand Down Expand Up @@ -469,7 +479,7 @@ public int index()

/**
* Is this data partition the TLK
*
*
* @return isTLK
*/
public boolean isTLK()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Iterator;

/**
* Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition
* Remote file reader the reads the data represented by a @see org.hpccsystems.dfs.client.DataPartition
* and constructs records via the provided @see org.hpccsystems.dfs.client#IRecordBuilder.
*/
public class HpccRemoteFileReader<T> implements Iterator<T>
Expand Down Expand Up @@ -72,7 +72,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @throws Exception
* the exception
Expand Down Expand Up @@ -105,20 +105,20 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @param limit
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @throws Exception
* general exception
Expand All @@ -130,22 +130,22 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

/**
* A remote file reader that reads the part identified by the HpccPart object using the record definition provided.
*
*
* @param dp
* the part of the file, name and location
* @param originalRD
* the record defintion for the dataset
* @param recBuilder
* the IRecordBuilder used to construct records
* @param connectTimeout
* @param connectTimeout
* the connection timeout in seconds, -1 for default
* @param limit
* @param limit
* the maximum number of records to read from the provided data partition, -1 specifies no limit
* @param createPrefetchThread
* @param createPrefetchThread
* the input stream should create and manage prefetching on its own thread. If false prefetch needs to be called on another thread periodically.
* @param readSizeKB
* @param readSizeKB
* read request size in KB, -1 specifies use default value
* @param resumeInfo
* @param resumeInfo
* FileReadeResumeInfo data required to restart a read from a particular point in a file
* @throws Exception
* general exception
Expand Down Expand Up @@ -204,7 +204,7 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde

/**
* Returns the stream position within the file.
*
*
* @return stream position
*/
public long getStreamPosition()
Expand All @@ -214,7 +214,7 @@ public long getStreamPosition()

/**
* Returns read resume info for the current position within the file.
*
*
* @return FileReadResumeInfo
*/
public FileReadResumeInfo getFileReadResumeInfo()
Expand All @@ -224,7 +224,7 @@ public FileReadResumeInfo getFileReadResumeInfo()

/**
* Returns read resume info for the specified position within the file.
*
*
* @param streamPosition the stream position to resume from
* @return FileReadResumeInfo
*/
Expand All @@ -242,7 +242,7 @@ public FileReadResumeInfo getFileReadResumeInfo(Long streamPosition)

/**
* Returns the number of messages created during the reading process
*
*
* @return number of messages created
*/
public int getRemoteReadMessageCount()
Expand All @@ -256,7 +256,7 @@ public int getRemoteReadMessageCount()

/**
* Returns messages created during the file reading process
*
*
* @return Messages concatenated into a String
*/
public String getRemoteReadMessages()
Expand Down Expand Up @@ -284,7 +284,7 @@ public void prefetch()

/**
* Is there more data
*
*
* @return true if there is a next record
*/
@Override
Expand All @@ -294,6 +294,13 @@ public boolean hasNext()
try
{
rslt = this.binaryRecordReader.hasNext();

// Has next may not catch the prefetch exception if it occurs at the beginning of a read
// This is due to InputStream.hasNext() being allowed to throw an IOException when closed.
if (this.inputStream.getPrefetchException() != null)
{
throw this.inputStream.getPrefetchException();
}
}
catch (HpccFileException e)
{
Expand Down Expand Up @@ -352,7 +359,7 @@ public int getAvailable() throws IOException

/**
* Returns the RowServiceInputStream used to read the file from dafilesrv
*
*
* @return the input stream
*/
public RowServiceInputStream getInputStream()
Expand All @@ -362,7 +369,7 @@ public RowServiceInputStream getInputStream()

/**
* Returns the BinaryRecordReader used to construct records
*
*
* @return the record reader
*/
public BinaryRecordReader getRecordReader()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,16 @@ private int getFilePartCopy()
return filePartCopyIndexPointer;
}

/**
* The HpccFileException from the prefetch thread if an exception has occured.
*
* @return the prefetch exception
*/
public HpccFileException getPrefetchException()
{
return this.prefetchException;
}

/**
* The port number for the remote read service.
*
Expand Down Expand Up @@ -761,6 +771,19 @@ private int startFetch()
return -1;
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
prefetchException = new HpccFileException(response.errorMessage);
try
{
close();
}
catch (IOException e)
{}

return -1;
}

// If len < 0 we have finished reading the file
if (response.len < 0)
{
Expand All @@ -775,12 +798,6 @@ private int startFetch()
return -1;
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
prefetchException = new HpccFileException(response.errorMessage);
return -1;
}

//------------------------------------------------------------------------------
// Retry with the token if handle is invalid
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -1148,6 +1165,11 @@ public int available() throws IOException
// Do the check for closed first here to avoid data races
if (this.closed.get())
{
if (this.prefetchException != null)
{
throw new IOException("Prefetch thread exited early exception.", this.prefetchException);
}

int bufferLen = this.readBufferDataLen.get();
int availBytes = bufferLen - this.readPos;
if (availBytes == 0)
Expand Down Expand Up @@ -1529,7 +1551,6 @@ private void makeActive() throws HpccFileException
throw new HpccFileException("Failed to create streams", e);
}


//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -1567,7 +1588,7 @@ private void makeActive() throws HpccFileException
throw new HpccFileException("Error while attempting to read version response.", e);
}

rowServiceVersion = new String(versionBytes, HPCCCharSet);
rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private static class RowServiceResponse
throw new HpccFileException("Error while attempting to read version response.", e);
}

rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1);
rowServiceVersion = new String(versionBytes, StandardCharsets.ISO_8859_1);
}

// Go ahead and make the initial write request. This won't write any data to file
Expand Down Expand Up @@ -298,6 +298,11 @@ private void makeInitialWriteRequest() throws Exception

RowServiceResponse response = this.readResponse();
this.handle = response.handle;

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
throw new IOException(response.errorMessage);
}
}

private String makeCloseHandleRequest()
Expand Down Expand Up @@ -336,14 +341,20 @@ private void sendCloseFileRequest() throws IOException
throw new IOException("Failed on close file with error: ", e);
}

RowServiceResponse response = null;
try
{
readResponse();
response = readResponse();
}
catch (HpccFileException e)
{
throw new IOException("Failed to close file. Unable to read response with error: ", e);
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
throw new IOException(response.errorMessage);
}
}

private RowServiceResponse readResponse() throws HpccFileException
Expand Down Expand Up @@ -493,15 +504,21 @@ public void write(byte[] b, int off, int len) throws IOException

bytesWritten += len;

RowServiceResponse response = null;
try
{
RowServiceResponse response = readResponse();
response = readResponse();
this.handle = response.handle;
}
catch (HpccFileException e)
{
throw new IOException("Failed during write operation. Unable to read response with error: ", e);
}

if (response.errorCode != RFCCodes.RFCStreamNoError)
{
throw new IOException(response.errorMessage);
}
}

/*
Expand Down
Loading
Loading