diff --git a/src/main/java/io/cresco/wsapi/Plugin.java b/src/main/java/io/cresco/wsapi/Plugin.java index f2101af..2d1408c 100644 --- a/src/main/java/io/cresco/wsapi/Plugin.java +++ b/src/main/java/io/cresco/wsapi/Plugin.java @@ -223,7 +223,6 @@ public boolean isStarted() { container.addEndpoint(APIDataPlane.class); container.addEndpoint(APILogStreamer.class); - server.start(); logger.info("Started server: " + server); if (server.getConnectors().length > 0) { diff --git a/src/main/java/io/cresco/wsapi/websockets/APIDataPlane.java b/src/main/java/io/cresco/wsapi/websockets/APIDataPlane.java index c5cb416..49a307a 100644 --- a/src/main/java/io/cresco/wsapi/websockets/APIDataPlane.java +++ b/src/main/java/io/cresco/wsapi/websockets/APIDataPlane.java @@ -3,17 +3,18 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import io.cresco.library.data.TopicType; -import io.cresco.library.messaging.MsgEvent; import io.cresco.library.plugin.PluginBuilder; import io.cresco.library.utilities.CLogger; import io.cresco.wsapi.Plugin; import javax.jms.Message; -import javax.jms.StreamMessage; import javax.jms.TextMessage; +import javax.jms.BytesMessage; + import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.lang.reflect.Type; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -79,7 +80,6 @@ private boolean isActive(Session sess) { @OnMessage public void onWebSocketText(Session sess, String message) { - if(isActive(sess)) { try { @@ -155,7 +155,48 @@ public void onWebSocketText(Session sess, String message) } + } + + @OnMessage + public void processUpload(byte[] b, boolean last, Session sess) { + + if(!last) { + System.out.println("processUpload(byte[] b, boolean last, Session sess) NOT LAST MESSAGE!!!!"); + } + + if(isActive(sess)) { + + try { + + String identKey; + String identId; + String ioTypeKey; + String inputId; + + synchronized (lockSessionMap) { + identKey = sessionMap.get(sess.getId()).getIdentKey(); + identId = sessionMap.get(sess.getId()).getIdentId(); + ioTypeKey = sessionMap.get(sess.getId()).getIoTypeKey(); + inputId = sessionMap.get(sess.getId()).getInputId(); + } + + if((identKey != null) && (identId != null)) { + + BytesMessage updateMessage = plugin.getAgentService().getDataPlaneService().createBytesMessage(); + updateMessage.writeBytes(b); + updateMessage.setStringProperty(identKey, identId); + updateMessage.setStringProperty(ioTypeKey, inputId); + + plugin.getAgentService().getDataPlaneService().sendMessage(TopicType.AGENT, updateMessage); + + } else { + logger.error("identKey and identId are null for session_id: " + sess.getId()); + } + } catch (Exception ex) { + logger.error("processUpload: " + ex.getMessage()); + } + } } @@ -167,12 +208,17 @@ private boolean createListener(Session sess, StreamInfo streamInfo) { public void onMessage(Message msg) { try { - if (msg instanceof TextMessage) { - sess.getAsyncRemote().sendObject(((TextMessage) msg).getText()); + } else if (msg instanceof BytesMessage) { + long dataSize = ((BytesMessage) msg).getBodyLength(); + byte[] bytes = new byte[(int)dataSize]; + ((BytesMessage) msg).readBytes(bytes); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + sess.getAsyncRemote().sendBinary(buffer); } + } catch(Exception ex) { ex.printStackTrace(); diff --git a/src/main/java/io/cresco/wsapi/websockets/APISocket.java b/src/main/java/io/cresco/wsapi/websockets/APISocket.java index 4cfe58e..1d59d63 100644 --- a/src/main/java/io/cresco/wsapi/websockets/APISocket.java +++ b/src/main/java/io/cresco/wsapi/websockets/APISocket.java @@ -52,6 +52,7 @@ public void onWebSocketConnect(Session sess) @OnMessage public void onWebSocketText(Session sess, String message) { + String r; Map> incoming_message = gson.fromJson(message, type);