Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.4.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>

# Conflicts:
#	commons-hpcc/pom.xml
#	dfsclient/pom.xml
#	pom.xml
#	wsclient/pom.xml
  • Loading branch information
ghalliday committed Oct 13, 2023
2 parents 443c15e + 80d27a8 commit 8dea8de
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package org.hpccsystems.dfs.client;

import java.io.Serializable;
import java.security.InvalidParameterException;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;

import org.hpccsystems.commons.ecl.FileFilter;
import org.hpccsystems.commons.errors.HpccFileException;
Expand Down Expand Up @@ -261,11 +265,60 @@ public String[] getCopyLocations()
public String getCopyIP(int copyindex)
{
int copiescount = copyLocations.length;
if (copyindex < 0 || copyindex >= copiescount) return null;
if (copyindex < 0 || copyindex >= copiescount)
{
return null;
}

return copyLocations[copyindex];
}

/**
* Set the copy IP
*
* @param copyIndex
* the copyindex
* @param copyIP The IP of the file part copy
*/
public void setCopyIP(int copyIndex, String copyIP)
{
if (copyIndex < 0 || copyIndex >= copyLocations.length)
{
return;
}

copyLocations[copyIndex] = copyIP;
}

/**
* Add file part copy
*
* @param index The index at which to insert the file part copy
* @param copyIP The IP of the new file part copy
* @param copyPath The path of the new file part copy
*/
public void add(int index, String copyIP, String copyPath) throws Exception
{
if (index < 0 || index > copyLocations.length)
{
throw new InvalidParameterException("Insertion index: " + index + " is invalid."
+ "Expected index in range of: [0," + copyLocations.length + "]");
}

if (copyIP == null || copyPath == null)
{
throw new InvalidParameterException("Copy IP or Path are invalid, must be non-null.");
}

List<String> copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations));
copyLocationsList.add(index, copyIP);
copyLocations = copyLocationsList.toArray(new String[0]);

List<String> copyPathList = new ArrayList<>(Arrays.asList(copyPaths));
copyPathList.add(index, copyPath);
copyPaths = copyPathList.toArray(new String[0]);
}

/**
* Count of copies available for this file part.
* @return copy locations size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,11 @@ public String getIP()
return this.dataPart.getCopyIP(prioritizedCopyIndexes.get(getFilePartCopy()));
}

private String getCopyPath()
{
return this.dataPart.getCopyPath(prioritizedCopyIndexes.get(getFilePartCopy()));
}

private int getFilePartCopy()
{
return filePartCopyIndexPointer;
Expand Down Expand Up @@ -1528,150 +1533,156 @@ private void makeActive() throws HpccFileException
this.active.set(false);
this.handle = 0;

try
boolean needsRetry = false;
do
{
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '"
+ getIP() + "'");

needsRetry = false;
try
{
if (getUseSSL())
log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '"
+ (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'" + " for Path: '" + getCopyPath() + "'");
try
{
if (getUseSSL())
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
else
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
}

this.sock.setSoTimeout(socketOpTimeoutMs);

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
catch (java.net.UnknownHostException e)
{
SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
sock = (SSLSocket) ssf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);

log.debug("Attempting SSL handshake...");
((SSLSocket) sock).startHandshake();
log.debug("SSL handshake successful...");
log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e);
}
else
catch (java.io.IOException e)
{
SocketFactory sf = SocketFactory.getDefault();
sock = sf.createSocket();

// Optimize for bandwidth over latency and connection time.
// We are opening up a long standing connection and potentially reading a significant amount of
// data
// So we don't care as much about individual packet latency or connection time overhead
sock.setPerformancePreferences(0, 1, 2);
sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
throw new HpccFileException(e);
}

this.sock.setSoTimeout(socketOpTimeoutMs);
try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}

log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
}
catch (java.net.UnknownHostException e)
{
throw new HpccFileException("Bad file part addr " + this.getIP(), e);
}
catch (java.io.IOException e)
{
throw new HpccFileException(e);
}
//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------

try
{
this.dos = new java.io.DataOutputStream(sock.getOutputStream());
this.dis = new java.io.DataInputStream(sock.getInputStream());
}
catch (java.io.IOException e)
{
throw new HpccFileException("Failed to create streams", e);
}
try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();

//------------------------------------------------------------------------------
// Check protocol version
//------------------------------------------------------------------------------
this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}

try
{
String msg = makeGetVersionRequest();
int msgLen = msg.length();
RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;

this.dos.writeInt(msgLen);
this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Failed on initial remote read trans", e);
}
byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
}

RowServiceResponse response = readResponse();
if (response.len == 0)
{
useOldProtocol = true;
}
else
{
useOldProtocol = false;
rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

byte[] versionBytes = new byte[response.len];
try
{
this.dis.readFully(versionBytes);
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
{
readTrans = makeTokenRequest();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
}
catch (IOException e)
{
throw new HpccFileException("Error while attempting to read version response.", e);
throw new HpccFileException("Failed on initial remote read read trans", e);
}

rowServiceVersion = new String(versionBytes, HPCCCharSet);
}

//------------------------------------------------------------------------------
// Send initial read request
//------------------------------------------------------------------------------

try
{
String readTrans = null;
if (this.tokenBin == null)
{
this.tokenBin = new byte[0];
readTrans = makeInitialRequest();
}
else
if (CompileTimeConstants.PROFILE_CODE)
{
readTrans = makeTokenRequest();
firstByteTimeNS = System.nanoTime();
}

int transLen = readTrans.length();
this.dos.writeInt(transLen);
this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
this.dos.flush();
this.active.set(true);
}
catch (IOException e)
catch (Exception e)
{
throw new HpccFileException("Failed on initial remote read read trans", e);
}
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (CompileTimeConstants.PROFILE_CODE)
{
firstByteTimeNS = System.nanoTime();
needsRetry = true;
if (!setNextFilePartCopy())
{
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
}

this.active.set(true);
}
catch (Exception e)
{
log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ "'");
log.error(e.getMessage());

if (!setNextFilePartCopy())
// This should be a multi exception
throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
}
} while (needsRetry);
}

/* Notes on protocol:
Expand Down
Loading

0 comments on commit 8dea8de

Please sign in to comment.