diff --git a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClient.java b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClient.java index dc6cf16..f1af335 100644 --- a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClient.java +++ b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClient.java @@ -1,261 +1,17 @@ -/* - * JBoss, Home of Professional Open Source. - * Copyright 2015 Red Hat, Inc., and individual contributors - * as indicated by the @author tags. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.jboss.pnc.buildagent.client; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.jboss.pnc.buildagent.api.ResponseMode; -import org.jboss.pnc.buildagent.api.TaskStatusUpdateEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.websocket.ClientEndpointConfig; -import javax.websocket.CloseReason; -import javax.websocket.ContainerProvider; -import javax.websocket.Session; import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; /** - * @see "https://github.com/undertow-io/undertow/blob/5bdddf327209a4abf18792e78148863686c26e9b/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/BinaryEndpointTest.java" - * * @author Matej Lazar */ -public class BuildAgentClient implements Closeable { - - private static final Logger log = LoggerFactory.getLogger(BuildAgentClient.class); - private final ResponseMode responseMode; - private final boolean readOnly; - - private RemoteEndpoint statusUpdatesEndpoint; - private RemoteEndpoint commandExecutingEndpoint; - - private AtomicBoolean closed = new AtomicBoolean(false); - - public BuildAgentClient(String termBaseUrl, - Optional> responseDataConsumer, - Consumer onStatusUpdate, - String commandContext - ) throws TimeoutException, InterruptedException { - this(termBaseUrl, responseDataConsumer, onStatusUpdate, commandContext, ResponseMode.BINARY, false); - } - - public BuildAgentClient(String termBaseUrl, - Optional> responseDataConsumer, - Consumer onStatusUpdate, - String commandContext, - ResponseMode responseMode, - boolean readOnly) throws TimeoutException, InterruptedException { - - this.responseMode = responseMode; - this.readOnly = readOnly; - - Consumer onStatusUpdateInternal = (event) -> { - onStatusUpdate.accept(event); - }; - - statusUpdatesEndpoint = connectStatusListenerClient(termBaseUrl, onStatusUpdateInternal, commandContext); - commandExecutingEndpoint = connectCommandExecutingClient(termBaseUrl, responseDataConsumer, commandContext); - } - - public void executeCommand(String command) throws BuildAgentClientException { - execute(command); - } - - public void execute(Object command) throws BuildAgentClientException { - log.info("Executing remote command [{}]...", command); - javax.websocket.RemoteEndpoint.Basic remoteEndpoint = commandExecutingEndpoint.getRemoteEndpoint(); - - ByteBuffer byteBuffer = prepareRemoteCommand(command); - - try { - log.debug("Sending remote command..."); - remoteEndpoint.sendBinary(byteBuffer); - log.debug("Command sent."); - } catch (IOException e) { - log.error("Cannot execute remote command.", e); - } - } - - private ByteBuffer prepareRemoteCommand(Object command) throws BuildAgentClientException { - Map cmdJson = new HashMap<>(); - cmdJson.put("action", "read"); - - ByteBuffer byteBuffer; - if (command instanceof String) { - cmdJson.put("data", command + "\n"); - ObjectMapper mapper = new ObjectMapper(); - try { - byteBuffer = ByteBuffer.wrap(mapper.writeValueAsBytes(cmdJson)); - } catch (JsonProcessingException e) { - throw new BuildAgentClientException("Cannot serialize string command.", e); - } - } else { - try { - byteBuffer = ByteBuffer.allocate(1).put(((Integer)command).byteValue()); - } catch (BufferOverflowException | ClassCastException e) { - throw new BuildAgentClientException("Invalid signal.", e); - } - byteBuffer.flip(); - } - return byteBuffer; - } - - private RemoteEndpoint connectStatusListenerClient(String webSocketBaseUrl, Consumer onStatusUpdate, String commandContext) { - RemoteEndpoint client = initializeDefault("statusListener"); - Consumer responseConsumer = (text) -> { - log.trace("Decoding response: {}", text); - - ObjectMapper mapper = new ObjectMapper(); - JsonNode jsonObject = null; - try { - jsonObject = mapper.readTree(text); - } catch (IOException e) { - log.error( "Cannot read JSON string: " + text, e); - } - try { - TaskStatusUpdateEvent taskStatusUpdateEvent = mapper.treeToValue(jsonObject.get("event"), TaskStatusUpdateEvent.class); - onStatusUpdate.accept(taskStatusUpdateEvent); - } catch (IOException e) { - log.error("Cannot deserialize TaskStatusUpdateEvent.", e); - } - }; - client.onStringMessage(responseConsumer); - - commandContext = formatCommandContext(commandContext); - - try { - String websocketUrl = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_LISTENER_PATH + commandContext; - ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); - ContainerProvider.getWebSocketContainer().connectToServer(client, clientEndpointConfig, new URI(websocketUrl)); - } catch (Exception e) { - throw new AssertionError("Failed to connect to remote client.", e); - } - return client; - } - - private RemoteEndpoint connectCommandExecutingClient(String webSocketBaseUrl, Optional> responseDataConsumer, String commandContext) throws InterruptedException, TimeoutException { - - RemoteEndpoint client = initializeDefault("commandExecuting"); - - if (ResponseMode.TEXT.equals(responseMode)) { - registerTextResponseConsumer(responseDataConsumer, client); - } else if (ResponseMode.BINARY.equals(responseMode)) { - registerBinaryResponseConsumer(responseDataConsumer, client); - } else { - log.info("Connecting commandExecutingClient in silent mode."); - //must be silent mode - } - - String appendReadOnly = readOnly ? "/ro" : ""; - - String webSocketPath; - if (ResponseMode.TEXT.equals(responseMode)) { - webSocketPath = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_TERMINAL_TEXT_PATH; - } else if (ResponseMode.BINARY.equals(responseMode)) { - webSocketPath = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_TERMINAL_PATH; - } else { - webSocketPath = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_TERMINAL_SILENT_PATH; - } - - commandContext = formatCommandContext(commandContext); - - try { - String websocketUrl = webSocketPath + commandContext + appendReadOnly; - ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); - ContainerProvider.getWebSocketContainer().connectToServer(client, clientEndpointConfig, new URI(websocketUrl)); - } catch (Exception e) { - throw new AssertionError("Failed to connect to remote client.", e); - } - return client; - } - - private String formatCommandContext(String commandContext) { - if (commandContext != null && !commandContext.equals("")) { - commandContext = "/" + commandContext; - } - return commandContext; - } - - private String stripEndingSlash(String path) { - return path.replaceAll("/$", ""); - } - - private void registerBinaryResponseConsumer(Optional> responseDataConsumer, RemoteEndpoint client) { - Consumer responseConsumer = (bytes) -> { - String responseData = new String(bytes, StandardCharsets.UTF_8); - responseDataConsumer.ifPresent((rdc) -> rdc.accept(responseData));; - }; - client.onBinaryMessage(responseConsumer); - } - - private void registerTextResponseConsumer(Optional> responseDataConsumer, RemoteEndpoint client) { - Consumer responseConsumer = (string) -> { - responseDataConsumer.ifPresent((rdc) -> rdc.accept(string));; - }; - client.onStringMessage(responseConsumer); - } - - private RemoteEndpoint initializeDefault(String name) { - - Consumer onOpen = (session) -> { - log.info("Client connection opened for {}.", name); - }; +public interface BuildAgentClient extends Closeable { - Consumer onClose = (closeReason) -> { - log.info("Client connection closed for {}. {}", name, closeReason); - }; + void execute(Object command) throws BuildAgentClientException; - Consumer onError = (throwable) -> { - if (!closed.get()) { - log.error("An error occurred in websocket client for " + name, throwable); - } else { - log.trace("An error occurred in websocket client for " + name, throwable); - } - }; - RemoteEndpoint client = new RemoteEndpoint(onOpen, onClose, onError); + void cancel() throws BuildAgentClientException; - return client; - } + String getSessionId(); - @Override - public void close() throws IOException { - if(closed.compareAndSet(false, true)) { - try { - commandExecutingEndpoint.close(); - statusUpdatesEndpoint.close(); - } catch (Exception e) { - log.error("Cannot close client.", e); - } - } else { - log.debug("Already closed."); - } - } + boolean isServerAlive(); } diff --git a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClientBase.java b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClientBase.java new file mode 100644 index 0000000..4cb2f59 --- /dev/null +++ b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClientBase.java @@ -0,0 +1,43 @@ +package org.jboss.pnc.buildagent.client; + +import org.jboss.logging.Logger; +import org.jboss.pnc.buildagent.common.http.HttpClient; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author Matej Lazar + */ +public abstract class BuildAgentClientBase { + + private final Logger log = Logger.getLogger(BuildAgentClientBase.class); + + protected final HttpClient httpClient; + protected final URI livenessProbeLocation; + + public BuildAgentClientBase(String termBaseUrl) throws BuildAgentClientException { + this.livenessProbeLocation = URI.create(termBaseUrl + "/servlet/is-alive"); + try { + httpClient = new HttpClient(); + } catch (IOException e) { + throw new BuildAgentClientException("Cannot initialize http client.", e); + } + } + + public boolean isServerAlive() { + CompletableFuture responseFuture = new CompletableFuture<>(); + httpClient.invoke(livenessProbeLocation, "HEAD", "", responseFuture); + try { + HttpClient.Response response = responseFuture.get(5, TimeUnit.SECONDS); + return response.getCode() == 200; + } catch (InterruptedException | TimeoutException | ExecutionException e) { + log.warn("Did not receive liveness probe response.", e); + return false; + } + } +} diff --git a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClientException.java b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClientException.java index f9ba009..0c0e9cc 100644 --- a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClientException.java +++ b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentClientException.java @@ -22,6 +22,10 @@ * @author Matej Lazar */ public class BuildAgentClientException extends Exception { + public BuildAgentClientException(String message) { + super(message); + } + public BuildAgentClientException(String message, Exception e) { super(message, e); } diff --git a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentHttpClient.java b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentHttpClient.java index 4cc9af3..daf52fc 100644 --- a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentHttpClient.java +++ b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentHttpClient.java @@ -22,9 +22,8 @@ /** * @author Matej Lazar */ -public class BuildAgentHttpClient { +public class BuildAgentHttpClient extends BuildAgentClientBase implements BuildAgentClient { private final Logger logger = LoggerFactory.getLogger(BuildAgentHttpClient.class); - private final HttpClient httpClient; private final ObjectMapper objectMapper = new ObjectMapper(); private final URL invokerUrl; @@ -37,6 +36,7 @@ public class BuildAgentHttpClient { public BuildAgentHttpClient(String termBaseUrl, URL callbackUrl, String callbackMethod) throws BuildAgentClientException { + super(termBaseUrl); this.callbackUrl = callbackUrl; this.callbackMethod = callbackMethod; try { @@ -44,16 +44,18 @@ public BuildAgentHttpClient(String termBaseUrl, URL callbackUrl, String callback } catch (MalformedURLException e) { throw new BuildAgentClientException("Invalid term url.", e); } - try { - httpClient = new HttpClient(); - } catch (IOException e) { - throw new BuildAgentClientException("Cannot initialize http client.", e); - } } - public void executeCommand(String command) - throws BuildAgentClientException, InterruptedException, ExecutionException, TimeoutException { - InvokeRequest request = new InvokeRequest(command, callbackUrl, callbackMethod); + @Override + public void execute(Object command) throws BuildAgentClientException { + String cmd; + if (command instanceof String) { + cmd = (String) command; + } else { + throw new BuildAgentClientException("Http client supports only String commands."); + } + + InvokeRequest request = new InvokeRequest(cmd, callbackUrl, callbackMethod); CompletableFuture responseFuture = new CompletableFuture<>(); try { String requestJson = objectMapper.writeValueAsString(request); @@ -63,7 +65,12 @@ public void executeCommand(String command) } catch (URISyntaxException e) { throw new BuildAgentClientException("Invalid command execution url.", e); } - HttpClient.Response response = responseFuture.get(5, TimeUnit.SECONDS); //TODO timeout + HttpClient.Response response = null; //TODO timeout + try { + response = responseFuture.get(5, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new BuildAgentClientException("No response form the remote agent.", e); + } InvokeResponse invokeResponse; try { invokeResponse = objectMapper.readValue(response.getString(), InvokeResponse.class); @@ -78,7 +85,8 @@ public void executeCommand(String command) * @return true if cancel was successful * @throws BuildAgentClientException */ - public boolean cancel() throws BuildAgentClientException { + @Override + public void cancel() throws BuildAgentClientException { Cancel request = new Cancel(sessionId); CompletableFuture responseFuture = new CompletableFuture<>(); try { @@ -93,13 +101,20 @@ public boolean cancel() throws BuildAgentClientException { } try { HttpClient.Response response = responseFuture.get(5, TimeUnit.SECONDS); - return response.getCode() == 200; + if (response.getCode() != 200) { + throw new BuildAgentClientException(String.format("Remove invocation failed, received status {}.", response.getCode())); + } } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new BuildAgentClientException("Error reading cancel request.", e); } } + @Override public String getSessionId() { return sessionId; } + + @Override + public void close() throws IOException { + } } diff --git a/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentSocketClient.java b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentSocketClient.java new file mode 100644 index 0000000..c44bc32 --- /dev/null +++ b/client/src/main/java/org/jboss/pnc/buildagent/client/BuildAgentSocketClient.java @@ -0,0 +1,270 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2015 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.jboss.pnc.buildagent.client; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.jboss.pnc.buildagent.api.ResponseMode; +import org.jboss.pnc.buildagent.api.TaskStatusUpdateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.websocket.ClientEndpointConfig; +import javax.websocket.CloseReason; +import javax.websocket.ContainerProvider; +import javax.websocket.Session; +import java.io.IOException; +import java.net.URI; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * @see "https://github.com/undertow-io/undertow/blob/5bdddf327209a4abf18792e78148863686c26e9b/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/BinaryEndpointTest.java" + * + * @author Matej Lazar + */ +public class BuildAgentSocketClient extends BuildAgentClientBase implements BuildAgentClient { + + private static final Logger log = LoggerFactory.getLogger(BuildAgentSocketClient.class); + + private String commandContext; + + private final ResponseMode responseMode; + private final boolean readOnly; + + private RemoteEndpoint statusUpdatesEndpoint; + private RemoteEndpoint commandExecutingEndpoint; + + private AtomicBoolean closed = new AtomicBoolean(false); + + public BuildAgentSocketClient(String termBaseUrl, + Optional> responseDataConsumer, + Consumer onStatusUpdate, + String commandContext + ) throws TimeoutException, InterruptedException, BuildAgentClientException { + this(termBaseUrl, responseDataConsumer, onStatusUpdate, commandContext, ResponseMode.BINARY, false); + } + + public BuildAgentSocketClient(String termBaseUrl, + Optional> responseDataConsumer, + Consumer onStatusUpdate, + String commandContext, + ResponseMode responseMode, + boolean readOnly) throws TimeoutException, InterruptedException, BuildAgentClientException { + super(termBaseUrl); + this.commandContext = formatCommandContext(commandContext); + this.responseMode = responseMode; + this.readOnly = readOnly; + + Consumer onStatusUpdateInternal = (event) -> { + onStatusUpdate.accept(event); + }; + + statusUpdatesEndpoint = connectStatusListenerClient(termBaseUrl, onStatusUpdateInternal); + commandExecutingEndpoint = connectCommandExecutingClient(termBaseUrl, responseDataConsumer); + } + + @Deprecated + public void executeCommand(String command) throws BuildAgentClientException { + execute(command); + } + + public void execute(Object command) throws BuildAgentClientException { + log.info("Executing remote command [{}]...", command); + javax.websocket.RemoteEndpoint.Basic remoteEndpoint = commandExecutingEndpoint.getRemoteEndpoint(); + + ByteBuffer byteBuffer = prepareRemoteCommand(command); + + try { + log.debug("Sending remote command..."); + remoteEndpoint.sendBinary(byteBuffer); + log.debug("Command sent."); + } catch (IOException e) { + log.error("Cannot execute remote command.", e); + } + } + + public void cancel() throws BuildAgentClientException { + execute('C' - 64); //send ctrl+C + } + + @Override + public String getSessionId() { + return commandContext; + } + + private ByteBuffer prepareRemoteCommand(Object command) throws BuildAgentClientException { + Map cmdJson = new HashMap<>(); + cmdJson.put("action", "read"); + + ByteBuffer byteBuffer; + if (command instanceof String) { + cmdJson.put("data", command + "\n"); + ObjectMapper mapper = new ObjectMapper(); + try { + byteBuffer = ByteBuffer.wrap(mapper.writeValueAsBytes(cmdJson)); + } catch (JsonProcessingException e) { + throw new BuildAgentClientException("Cannot serialize string command.", e); + } + } else { + try { + byteBuffer = ByteBuffer.allocate(1).put(((Integer)command).byteValue()); + } catch (BufferOverflowException | ClassCastException e) { + throw new BuildAgentClientException("Invalid signal.", e); + } + byteBuffer.flip(); + } + return byteBuffer; + } + + private RemoteEndpoint connectStatusListenerClient(String webSocketBaseUrl, Consumer onStatusUpdate) { + RemoteEndpoint client = initializeDefault("statusListener"); + Consumer responseConsumer = (text) -> { + log.trace("Decoding response: {}", text); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonObject = null; + try { + jsonObject = mapper.readTree(text); + } catch (IOException e) { + log.error( "Cannot read JSON string: " + text, e); + } + try { + TaskStatusUpdateEvent taskStatusUpdateEvent = mapper.treeToValue(jsonObject.get("event"), TaskStatusUpdateEvent.class); + onStatusUpdate.accept(taskStatusUpdateEvent); + } catch (IOException e) { + log.error("Cannot deserialize TaskStatusUpdateEvent.", e); + } + }; + client.onStringMessage(responseConsumer); + + try { + String websocketUrl = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_LISTENER_PATH + commandContext; + ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); + ContainerProvider.getWebSocketContainer().connectToServer(client, clientEndpointConfig, new URI(websocketUrl)); + } catch (Exception e) { + throw new AssertionError("Failed to connect to remote client.", e); + } + return client; + } + + private RemoteEndpoint connectCommandExecutingClient(String webSocketBaseUrl, Optional> responseDataConsumer) throws InterruptedException, TimeoutException { + + RemoteEndpoint client = initializeDefault("commandExecuting"); + + if (ResponseMode.TEXT.equals(responseMode)) { + registerTextResponseConsumer(responseDataConsumer, client); + } else if (ResponseMode.BINARY.equals(responseMode)) { + registerBinaryResponseConsumer(responseDataConsumer, client); + } else { + log.info("Connecting commandExecutingClient in silent mode."); + //must be silent mode + } + + String appendReadOnly = readOnly ? "/ro" : ""; + + String webSocketPath; + if (ResponseMode.TEXT.equals(responseMode)) { + webSocketPath = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_TERMINAL_TEXT_PATH; + } else if (ResponseMode.BINARY.equals(responseMode)) { + webSocketPath = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_TERMINAL_PATH; + } else { + webSocketPath = stripEndingSlash(webSocketBaseUrl) + RemoteEndpoint.WEB_SOCKET_TERMINAL_SILENT_PATH; + } + + try { + String websocketUrl = webSocketPath + commandContext + appendReadOnly; + ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); + ContainerProvider.getWebSocketContainer().connectToServer(client, clientEndpointConfig, new URI(websocketUrl)); + } catch (Exception e) { + throw new AssertionError("Failed to connect to remote client.", e); + } + return client; + } + + private String formatCommandContext(String commandContext) { + if (commandContext != null && !commandContext.equals("")) { + commandContext = "/" + commandContext; + } + return commandContext; + } + + private String stripEndingSlash(String path) { + return path.replaceAll("/$", ""); + } + + private void registerBinaryResponseConsumer(Optional> responseDataConsumer, RemoteEndpoint client) { + Consumer responseConsumer = (bytes) -> { + String responseData = new String(bytes, StandardCharsets.UTF_8); + responseDataConsumer.ifPresent((rdc) -> rdc.accept(responseData));; + }; + client.onBinaryMessage(responseConsumer); + } + + private void registerTextResponseConsumer(Optional> responseDataConsumer, RemoteEndpoint client) { + Consumer responseConsumer = (string) -> { + responseDataConsumer.ifPresent((rdc) -> rdc.accept(string));; + }; + client.onStringMessage(responseConsumer); + } + + private RemoteEndpoint initializeDefault(String name) { + + Consumer onOpen = (session) -> { + log.info("Client connection opened for {}.", name); + }; + + Consumer onClose = (closeReason) -> { + log.info("Client connection closed for {}. {}", name, closeReason); + }; + + Consumer onError = (throwable) -> { + if (!closed.get()) { + log.error("An error occurred in websocket client for " + name, throwable); + } else { + log.trace("An error occurred in websocket client for " + name, throwable); + } + }; + RemoteEndpoint client = new RemoteEndpoint(onOpen, onClose, onError); + + return client; + } + + @Override + public void close() throws IOException { + if(closed.compareAndSet(false, true)) { + try { + commandExecutingEndpoint.close(); + statusUpdatesEndpoint.close(); + } catch (Exception e) { + log.error("Cannot close client.", e); + } + } else { + log.debug("Already closed."); + } + } +} diff --git a/server/src/test/java/org/jboss/pnc/buildagent/server/ConcurrentStdInOut.java b/server/src/test/java/org/jboss/pnc/buildagent/server/ConcurrentStdInOut.java index 165ac3f..0bfc2bf 100644 --- a/server/src/test/java/org/jboss/pnc/buildagent/server/ConcurrentStdInOut.java +++ b/server/src/test/java/org/jboss/pnc/buildagent/server/ConcurrentStdInOut.java @@ -1,7 +1,7 @@ package org.jboss.pnc.buildagent.server; import org.jboss.pnc.buildagent.api.TaskStatusUpdateEvent; -import org.jboss.pnc.buildagent.client.BuildAgentClient; +import org.jboss.pnc.buildagent.client.BuildAgentSocketClient; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -102,7 +102,7 @@ public void shouldNotMixStdInAndStdoutLines() throws Exception { Consumer responseConsumer = s -> { responses.add(s); }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalUrl, Optional.of(responseConsumer), onStatusUpdate, context); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalUrl, Optional.of(responseConsumer), onStatusUpdate, context); buildAgentClient.executeCommand(TEST_COMMAND); String received = queue.poll(5, TimeUnit.SECONDS); diff --git a/server/src/test/java/org/jboss/pnc/buildagent/server/LivenessProbeTest.java b/server/src/test/java/org/jboss/pnc/buildagent/server/LivenessProbeTest.java new file mode 100644 index 0000000..9b9fdf8 --- /dev/null +++ b/server/src/test/java/org/jboss/pnc/buildagent/server/LivenessProbeTest.java @@ -0,0 +1,27 @@ +package org.jboss.pnc.buildagent.server; + +import org.jboss.pnc.buildagent.client.BuildAgentClientException; +import org.jboss.pnc.buildagent.client.BuildAgentHttpClient; +import org.junit.Assert; +import org.junit.Test; + +import static org.jboss.pnc.buildagent.server.TermdServer.HOST; + +/** + * @author Matej Lazar + */ +public class LivenessProbeTest { + + @Test + public void shouldGetLivenessResponse() throws InterruptedException, BuildAgentClientException { + int port = TermdServer.getNextPort(); + String terminalBaseUrl = "http://" + HOST + ":" + port; + + BuildAgentHttpClient client = new BuildAgentHttpClient(terminalBaseUrl, null, null); + Assert.assertFalse(client.isServerAlive()); + + TermdServer.startServer(HOST, port, "", true, true); + Assert.assertTrue(client.isServerAlive()); + TermdServer.stopServer(); + } +} diff --git a/server/src/test/java/org/jboss/pnc/buildagent/server/TermdServer.java b/server/src/test/java/org/jboss/pnc/buildagent/server/TermdServer.java index 13c5afe..5f31070 100644 --- a/server/src/test/java/org/jboss/pnc/buildagent/server/TermdServer.java +++ b/server/src/test/java/org/jboss/pnc/buildagent/server/TermdServer.java @@ -34,6 +34,8 @@ */ public class TermdServer { + public static final String HOST = "localhost"; + private static final AtomicInteger port_pool = new AtomicInteger(8090); private static BuildAgentServer buildAgentServer; diff --git a/server/src/test/java/org/jboss/pnc/buildagent/server/TestGetRunningProcesses.java b/server/src/test/java/org/jboss/pnc/buildagent/server/TestGetRunningProcesses.java index bd34658..f61d576 100644 --- a/server/src/test/java/org/jboss/pnc/buildagent/server/TestGetRunningProcesses.java +++ b/server/src/test/java/org/jboss/pnc/buildagent/server/TestGetRunningProcesses.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.jboss.pnc.buildagent.api.Status; import org.jboss.pnc.buildagent.api.TaskStatusUpdateEvent; -import org.jboss.pnc.buildagent.client.BuildAgentClient; +import org.jboss.pnc.buildagent.client.BuildAgentSocketClient; import org.jboss.pnc.buildagent.common.ObjectWrapper; import org.jboss.pnc.buildagent.common.Wait; import org.junit.AfterClass; @@ -89,7 +89,7 @@ public void getRunningProcesses() throws Exception { } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalUrl, Optional.empty(), onStatusUpdate, context); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalUrl, Optional.empty(), onStatusUpdate, context); buildAgentClient.executeCommand(TEST_COMMAND); Supplier evaluationSupplier = () -> resultReceived.get(); diff --git a/server/src/test/java/org/jboss/pnc/buildagent/server/httpinvoker/TestCommandExecution.java b/server/src/test/java/org/jboss/pnc/buildagent/server/httpinvoker/TestCommandExecution.java index 2206579..9ee2d52 100644 --- a/server/src/test/java/org/jboss/pnc/buildagent/server/httpinvoker/TestCommandExecution.java +++ b/server/src/test/java/org/jboss/pnc/buildagent/server/httpinvoker/TestCommandExecution.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.jboss.pnc.buildagent.api.Status; import org.jboss.pnc.buildagent.api.TaskStatusUpdateEvent; +import org.jboss.pnc.buildagent.client.BuildAgentClient; import org.jboss.pnc.buildagent.client.BuildAgentClientException; import org.jboss.pnc.buildagent.client.BuildAgentHttpClient; import org.jboss.pnc.buildagent.server.TermdServer; @@ -72,8 +73,8 @@ public void shouldExecuteRemoteCommand() responseConsumers.add(onResult); URL callbackUrl = new URL("http://" + HOST +":" + LOCAL_PORT+"/" + CallbackHandler.class.getSimpleName()); - BuildAgentHttpClient client = new BuildAgentHttpClient(terminalBaseUrl, callbackUrl, "PUT"); - client.executeCommand(TEST_COMMAND_BASE + "10 0"); + BuildAgentClient client = new BuildAgentHttpClient(terminalBaseUrl, callbackUrl, "PUT"); + client.execute(TEST_COMMAND_BASE + "10 0"); Assert.assertNotNull(client.getSessionId()); @@ -93,8 +94,8 @@ public void shouldCancelRemoteCommand() responseConsumers.add(onResult); URL callbackUrl = new URL("http://" + HOST +":" + LOCAL_PORT+"/" + CallbackHandler.class.getSimpleName()); - BuildAgentHttpClient client = new BuildAgentHttpClient(terminalBaseUrl, callbackUrl, "PUT"); - client.executeCommand(TEST_COMMAND_BASE + "4 250"); + BuildAgentClient client = new BuildAgentHttpClient(terminalBaseUrl, callbackUrl, "PUT"); + client.execute(TEST_COMMAND_BASE + "4 250"); Assert.assertNotNull(client.getSessionId()); diff --git a/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnection.java b/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnection.java index cfbacf7..8ec97af 100644 --- a/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnection.java +++ b/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnection.java @@ -21,7 +21,7 @@ import org.jboss.pnc.buildagent.api.ResponseMode; import org.jboss.pnc.buildagent.api.Status; import org.jboss.pnc.buildagent.api.TaskStatusUpdateEvent; -import org.jboss.pnc.buildagent.client.BuildAgentClient; +import org.jboss.pnc.buildagent.client.BuildAgentSocketClient; import org.jboss.pnc.buildagent.common.ObjectWrapper; import org.jboss.pnc.buildagent.common.Wait; import org.jboss.pnc.buildagent.server.MockProcess; @@ -112,7 +112,7 @@ public void clientShouldBeAbleToRunRemoteCommandAndReceiveResults(ResponseMode r log.trace("Adding to remote response list [{}].", responseData); remoteResponses.add(responseData); }; - BuildAgentClient buildAgentClient = new BuildAgentClient( + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient( terminalBaseUrl, Optional.of(onResponseData), onStatusUpdate, @@ -147,7 +147,7 @@ public void shouldExecuteTwoTasksAndWriteToLogs() throws Exception { } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context); buildAgentClient.executeCommand(getTestCommand(100, 0)); Wait.forCondition(() -> completed.get(), 10, ChronoUnit.SECONDS, "Command did not complete in given timeout."); @@ -179,7 +179,7 @@ public void shouldEnqueueNewTasksWhenOneIsRunning() throws Exception { } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context); buildAgentClient.executeCommand(getTestCommand(3, 1)); buildAgentClient.executeCommand(getTestCommand(3, 0, "2nd-command.")); @@ -207,7 +207,7 @@ public void shouldExecuteTwoTasksInSilentMode() throws Exception { } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.SILENT, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.SILENT, false); buildAgentClient.executeCommand(getTestCommand(100, 0)); Wait.forCondition(() -> completed.get(), 10, ChronoUnit.SECONDS, "Command did not complete in given timeout."); @@ -237,7 +237,7 @@ public void shouldExecuteAndCancelTheExecution() throws Exception { } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.SILENT, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.SILENT, false); buildAgentClient.executeCommand(getTestCommand(3, 1000)); Wait.forCondition(() -> running.get(), 1, ChronoUnit.SECONDS, "Command did not start in given timeout."); @@ -259,7 +259,7 @@ public void clientShouldBeAbleToConnectToRunningProcess() throws Exception { completed.set(true); } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); buildAgentClient.executeCommand(getTestCommand(100, 20)); sleep(1000); //wait for async command start @@ -269,7 +269,7 @@ public void clientShouldBeAbleToConnectToRunningProcess() throws Exception { Consumer onResponse = (message) -> { response.append(message); }; - BuildAgentClient buildAgentClientReconnected = new BuildAgentClient(terminalBaseUrl, Optional.of(onResponse), onStatusUpdate, context, ResponseMode.BINARY, false); + BuildAgentSocketClient buildAgentClientReconnected = new BuildAgentSocketClient(terminalBaseUrl, Optional.of(onResponse), onStatusUpdate, context, ResponseMode.BINARY, false); Wait.forCondition(() -> completed.get(), 15, ChronoUnit.SECONDS, "Operation did not complete within given timeout."); Wait.forCondition(() -> response.toString().contains("I'm done."), 5, ChronoUnit.SECONDS, "Missing or invalid response: " + response.toString()); @@ -287,14 +287,14 @@ public void clientShouldBeAbleToConnectToRunningProcessInDifferentResponseMode() completed.set(true); } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); buildAgentClient.executeCommand(getTestCommand(100, 10)); StringBuilder response = new StringBuilder(); Consumer onResponse = (message) -> { response.append(message); }; - BuildAgentClient buildAgentClientReconnected = new BuildAgentClient( + BuildAgentSocketClient buildAgentClientReconnected = new BuildAgentSocketClient( terminalBaseUrl, Optional.of(onResponse), (event) -> {}, @@ -319,14 +319,14 @@ public void textClientShouldReciveOutputWhenCommandStartedInSilentMode() throws completed.set(true); } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.SILENT, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.SILENT, false); buildAgentClient.executeCommand(getTestCommand(100, 10)); StringBuilder response = new StringBuilder(); Consumer onResponse = (message) -> { response.append(message); }; - BuildAgentClient buildAgentClientReconnected = new BuildAgentClient( + BuildAgentSocketClient buildAgentClientReconnected = new BuildAgentSocketClient( terminalBaseUrl, Optional.of(onResponse), (event) -> {}, @@ -360,9 +360,9 @@ public void clientShouldBeAbleToExecuteCommandWithoutListeningToResponse() throw Consumer onSilentResponse = (message) -> { silentResponse.append(message); }; - BuildAgentClient buildAgentClientListener = new BuildAgentClient(terminalBaseUrl, Optional.of(onResponse), (event) -> {}, context, ResponseMode.TEXT, true); + BuildAgentSocketClient buildAgentClientListener = new BuildAgentSocketClient(terminalBaseUrl, Optional.of(onResponse), (event) -> {}, context, ResponseMode.TEXT, true); //connect executing client - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.of(onSilentResponse), onStatusUpdate, context, ResponseMode.SILENT, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.of(onSilentResponse), onStatusUpdate, context, ResponseMode.SILENT, false); buildAgentClient.executeCommand(getTestCommand(100, 0)); Wait.forCondition(() -> completed.get(), 10, ChronoUnit.SECONDS, "Operation did not complete within given timeout."); @@ -392,9 +392,9 @@ public void clientShouldBeAbleToConnectAndListenForOutputBeforeTheProcessStart() Consumer onResponse = (message) -> { response.append(message); }; - BuildAgentClient buildAgentClientListener = new BuildAgentClient(terminalBaseUrl, Optional.of(onResponse), (event) -> {}, context, ResponseMode.TEXT, true); + BuildAgentSocketClient buildAgentClientListener = new BuildAgentSocketClient(terminalBaseUrl, Optional.of(onResponse), (event) -> {}, context, ResponseMode.TEXT, true); //connect executing client - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); buildAgentClient.executeCommand(getTestCommand(100, 0)); Wait.forCondition(() -> completed.get(), 10, ChronoUnit.SECONDS, "Operation did not complete within given timeout."); diff --git a/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnectionWithBindPath.java b/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnectionWithBindPath.java index 5e6e285..f245ebc 100644 --- a/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnectionWithBindPath.java +++ b/server/src/test/java/org/jboss/pnc/buildagent/server/websockets/TestWebSocketConnectionWithBindPath.java @@ -22,7 +22,7 @@ import org.jboss.pnc.buildagent.api.ResponseMode; import org.jboss.pnc.buildagent.api.Status; import org.jboss.pnc.buildagent.api.TaskStatusUpdateEvent; -import org.jboss.pnc.buildagent.client.BuildAgentClient; +import org.jboss.pnc.buildagent.client.BuildAgentSocketClient; import org.jboss.pnc.buildagent.common.ObjectWrapper; import org.jboss.pnc.buildagent.common.Wait; import org.junit.AfterClass; @@ -77,7 +77,7 @@ public void clientShouldBeAbleToReConnect() throws Exception { completed.set(true); } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); buildAgentClient.executeCommand(TEST_COMMAND); Thread.sleep(1000); //make sure async command execution started @@ -87,7 +87,7 @@ public void clientShouldBeAbleToReConnect() throws Exception { Consumer onResponse = (message) -> { response.append(message); }; - BuildAgentClient buildAgentClientReconnected = new BuildAgentClient( + BuildAgentSocketClient buildAgentClientReconnected = new BuildAgentSocketClient( terminalBaseUrl, Optional.of(onResponse), onStatusUpdate, @@ -111,7 +111,7 @@ public void clientShouldBeAbleToReConnectWithDifferentResponseMode() throws Exce completed.set(true); } }; - BuildAgentClient buildAgentClient = new BuildAgentClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); + BuildAgentSocketClient buildAgentClient = new BuildAgentSocketClient(terminalBaseUrl, Optional.empty(), onStatusUpdate, context, ResponseMode.BINARY, false); buildAgentClient.executeCommand(TEST_COMMAND); Thread.sleep(1000); //make sure async command execution started @@ -121,7 +121,7 @@ public void clientShouldBeAbleToReConnectWithDifferentResponseMode() throws Exce Consumer onResponse = (message) -> { response.append(message); }; - BuildAgentClient buildAgentClientReconnected = new BuildAgentClient( + BuildAgentSocketClient buildAgentClientReconnected = new BuildAgentSocketClient( terminalBaseUrl, Optional.of(onResponse), onStatusUpdate,