diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index bc648b49c..589e62f4c 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -16,6 +16,10 @@ package org.hpccsystems.dfs.client; import java.io.Serializable; +import java.security.InvalidParameterException; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; import org.hpccsystems.commons.ecl.FileFilter; import org.hpccsystems.commons.errors.HpccFileException; @@ -261,11 +265,60 @@ public String[] getCopyLocations() public String getCopyIP(int copyindex) { int copiescount = copyLocations.length; - if (copyindex < 0 || copyindex >= copiescount) return null; + if (copyindex < 0 || copyindex >= copiescount) + { + return null; + } return copyLocations[copyindex]; } + /** + * Set the copy IP + * + * @param copyIndex + * the copyindex + * @param copyIP The IP of the file part copy + */ + public void setCopyIP(int copyIndex, String copyIP) + { + if (copyIndex < 0 || copyIndex >= copyLocations.length) + { + return; + } + + copyLocations[copyIndex] = copyIP; + } + + /** + * Add file part copy + * + * @param index The index at which to insert the file part copy + * @param copyIP The IP of the new file part copy + * @param copyPath The path of the new file part copy + */ + public void add(int index, String copyIP, String copyPath) throws Exception + { + if (index < 0 || index > copyLocations.length) + { + throw new InvalidParameterException("Insertion index: " + index + " is invalid." + + "Expected index in range of: [0," + copyLocations.length + "]"); + } + + if (copyIP == null || copyPath == null) + { + throw new InvalidParameterException("Copy IP or Path are invalid, must be non-null."); + } + + List copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations)); + copyLocationsList.add(index, copyIP); + copyLocations = copyLocationsList.toArray(new String[0]); + + List copyPathList = new ArrayList<>(Arrays.asList(copyPaths)); + copyPathList.add(index, copyPath); + copyPaths = copyPathList.toArray(new String[0]); + } + /** * Count of copies available for this file part. * @return copy locations size diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 795fd8478..0ea652111 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -520,6 +520,11 @@ public String getIP() return this.dataPart.getCopyIP(prioritizedCopyIndexes.get(getFilePartCopy())); } + private String getCopyPath() + { + return this.dataPart.getCopyPath(prioritizedCopyIndexes.get(getFilePartCopy())); + } + private int getFilePartCopy() { return filePartCopyIndexPointer; @@ -1528,150 +1533,156 @@ private void makeActive() throws HpccFileException this.active.set(false); this.handle = 0; - try + boolean needsRetry = false; + do { - log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '" - + getIP() + "'"); - + needsRetry = false; try { - if (getUseSSL()) + log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'" + " for Path: '" + getCopyPath() + "'"); + try + { + if (getUseSSL()) + { + SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); + sock = (SSLSocket) ssf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + + log.debug("Attempting SSL handshake..."); + ((SSLSocket) sock).startHandshake(); + log.debug("SSL handshake successful..."); + log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + } + else + { + SocketFactory sf = SocketFactory.getDefault(); + sock = sf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + } + + this.sock.setSoTimeout(socketOpTimeoutMs); + + log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + } + catch (java.net.UnknownHostException e) { - SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); - sock = (SSLSocket) ssf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); - - log.debug("Attempting SSL handshake..."); - ((SSLSocket) sock).startHandshake(); - log.debug("SSL handshake successful..."); - log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e); } - else + catch (java.io.IOException e) { - SocketFactory sf = SocketFactory.getDefault(); - sock = sf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + throw new HpccFileException(e); } - this.sock.setSoTimeout(socketOpTimeoutMs); + try + { + this.dos = new java.io.DataOutputStream(sock.getOutputStream()); + this.dis = new java.io.DataInputStream(sock.getInputStream()); + } + catch (java.io.IOException e) + { + throw new HpccFileException("Failed to create streams", e); + } - log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); - } - catch (java.net.UnknownHostException e) - { - throw new HpccFileException("Bad file part addr " + this.getIP(), e); - } - catch (java.io.IOException e) - { - throw new HpccFileException(e); - } + //------------------------------------------------------------------------------ + // Check protocol version + //------------------------------------------------------------------------------ - try - { - this.dos = new java.io.DataOutputStream(sock.getOutputStream()); - this.dis = new java.io.DataInputStream(sock.getInputStream()); - } - catch (java.io.IOException e) - { - throw new HpccFileException("Failed to create streams", e); - } + try + { + String msg = makeGetVersionRequest(); + int msgLen = msg.length(); - //------------------------------------------------------------------------------ - // Check protocol version - //------------------------------------------------------------------------------ + this.dos.writeInt(msgLen); + this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); + this.dos.flush(); + } + catch (IOException e) + { + throw new HpccFileException("Failed on initial remote read trans", e); + } - try - { - String msg = makeGetVersionRequest(); - int msgLen = msg.length(); + RowServiceResponse response = readResponse(); + if (response.len == 0) + { + useOldProtocol = true; + } + else + { + useOldProtocol = false; - this.dos.writeInt(msgLen); - this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); - this.dos.flush(); - } - catch (IOException e) - { - throw new HpccFileException("Failed on initial remote read trans", e); - } + byte[] versionBytes = new byte[response.len]; + try + { + this.dis.readFully(versionBytes); + } + catch (IOException e) + { + throw new HpccFileException("Error while attempting to read version response.", e); + } - RowServiceResponse response = readResponse(); - if (response.len == 0) - { - useOldProtocol = true; - } - else - { - useOldProtocol = false; + rowServiceVersion = new String(versionBytes, HPCCCharSet); + } + + //------------------------------------------------------------------------------ + // Send initial read request + //------------------------------------------------------------------------------ - byte[] versionBytes = new byte[response.len]; try { - this.dis.readFully(versionBytes); + String readTrans = null; + if (this.tokenBin == null) + { + this.tokenBin = new byte[0]; + readTrans = makeInitialRequest(); + } + else + { + readTrans = makeTokenRequest(); + } + + int transLen = readTrans.length(); + this.dos.writeInt(transLen); + this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); + this.dos.flush(); } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + throw new HpccFileException("Failed on initial remote read read trans", e); } - rowServiceVersion = new String(versionBytes, HPCCCharSet); - } - - //------------------------------------------------------------------------------ - // Send initial read request - //------------------------------------------------------------------------------ - - try - { - String readTrans = null; - if (this.tokenBin == null) - { - this.tokenBin = new byte[0]; - readTrans = makeInitialRequest(); - } - else + if (CompileTimeConstants.PROFILE_CODE) { - readTrans = makeTokenRequest(); + firstByteTimeNS = System.nanoTime(); } - int transLen = readTrans.length(); - this.dos.writeInt(transLen); - this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); - this.dos.flush(); + this.active.set(true); } - catch (IOException e) + catch (Exception e) { - throw new HpccFileException("Failed on initial remote read read trans", e); - } + log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + + "'"); + log.error(e.getMessage()); - if (CompileTimeConstants.PROFILE_CODE) - { - firstByteTimeNS = System.nanoTime(); + needsRetry = true; + if (!setNextFilePartCopy()) + { + throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); + } } - - this.active.set(true); - } - catch (Exception e) - { - log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() - + "'"); - log.error(e.getMessage()); - - if (!setNextFilePartCopy()) - // This should be a multi exception - throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); - } + } while (needsRetry); } /* Notes on protocol: diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 2fc59c86d..b963ea4b1 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -178,6 +178,7 @@ public void readResumeTest() throws Exception System.out.println("Messages from file part (" + i + ") read operation:\n" + fileReader.getRemoteReadMessages()); } + Runtime runtime = Runtime.getRuntime(); ArrayList resumedRecords = new ArrayList(); for (int i = 0; i < resumeInfo.size(); i++) { @@ -194,6 +195,13 @@ public void readResumeTest() throws Exception resumedRecords.add(record); } + + // Periodically run garbage collector to prevent buffers in remote file readers from exhausting free memory + // This is only needed due to rapidly creating / destroying thousands of HpccRemoteFileReaders + if ((i % 10) == 0) + { + runtime.gc(); + } } assertEquals("Number of records did not match during read resume.", records.size(), resumedRecords.size()); @@ -1039,6 +1047,53 @@ public void emptyCompressedFileTest() } } + @Test + public void filePartReadRetryTest() + { + { + HPCCFile readFile = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + for (int i = 0; i < fileParts.length; i++) + { + String firstCopyIP = fileParts[i].getCopyIP(0); + String firstCopyPath = fileParts[i].getCopyPath(0); + fileParts[i].setCopyIP(0, "1.1.1.1"); + fileParts[i].add(1, firstCopyIP, firstCopyPath); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + Assert.fail(e.getMessage()); + } + } + + { + HPCCFile readFile = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + for (int i = 0; i < fileParts.length; i++) + { + fileParts[i].add(0,"1.1.1.1", ""); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + Assert.fail(e.getMessage()); + } + } + } + @Test public void invalidSignatureTest() {