Skip to content

Commit

Permalink
Adding binary transfer for clients
Browse files Browse the repository at this point in the history
  • Loading branch information
codybum committed Feb 18, 2024
1 parent 0e39102 commit 77bf4b2
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
1 change: 0 additions & 1 deletion src/main/java/io/cresco/wsapi/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ public boolean isStarted() {
container.addEndpoint(APIDataPlane.class);
container.addEndpoint(APILogStreamer.class);


server.start();
logger.info("Started server: " + server);
if (server.getConnectors().length > 0) {
Expand Down
56 changes: 51 additions & 5 deletions src/main/java/io/cresco/wsapi/websockets/APIDataPlane.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import io.cresco.library.data.TopicType;
import io.cresco.library.messaging.MsgEvent;
import io.cresco.library.plugin.PluginBuilder;
import io.cresco.library.utilities.CLogger;
import io.cresco.wsapi.Plugin;

import javax.jms.Message;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.BytesMessage;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -79,7 +80,6 @@ private boolean isActive(Session sess) {
@OnMessage
public void onWebSocketText(Session sess, String message)
{

if(isActive(sess)) {

try {
Expand Down Expand Up @@ -155,7 +155,48 @@ public void onWebSocketText(Session sess, String message)

}

}

@OnMessage
public void processUpload(byte[] b, boolean last, Session sess) {

if(!last) {
System.out.println("processUpload(byte[] b, boolean last, Session sess) NOT LAST MESSAGE!!!!");
}

if(isActive(sess)) {

try {

String identKey;
String identId;
String ioTypeKey;
String inputId;

synchronized (lockSessionMap) {
identKey = sessionMap.get(sess.getId()).getIdentKey();
identId = sessionMap.get(sess.getId()).getIdentId();
ioTypeKey = sessionMap.get(sess.getId()).getIoTypeKey();
inputId = sessionMap.get(sess.getId()).getInputId();
}

if((identKey != null) && (identId != null)) {

BytesMessage updateMessage = plugin.getAgentService().getDataPlaneService().createBytesMessage();
updateMessage.writeBytes(b);
updateMessage.setStringProperty(identKey, identId);
updateMessage.setStringProperty(ioTypeKey, inputId);

plugin.getAgentService().getDataPlaneService().sendMessage(TopicType.AGENT, updateMessage);

} else {
logger.error("identKey and identId are null for session_id: " + sess.getId());
}
} catch (Exception ex) {
logger.error("processUpload: " + ex.getMessage());
}

}

}

Expand All @@ -167,12 +208,17 @@ private boolean createListener(Session sess, StreamInfo streamInfo) {
public void onMessage(Message msg) {
try {


if (msg instanceof TextMessage) {

sess.getAsyncRemote().sendObject(((TextMessage) msg).getText());

} else if (msg instanceof BytesMessage) {
long dataSize = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int)dataSize];
((BytesMessage) msg).readBytes(bytes);
ByteBuffer buffer = ByteBuffer.wrap(bytes);
sess.getAsyncRemote().sendBinary(buffer);
}

} catch(Exception ex) {

ex.printStackTrace();
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/cresco/wsapi/websockets/APISocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void onWebSocketConnect(Session sess)

@OnMessage
public void onWebSocketText(Session sess, String message) {

String r;

Map<String, Map<String, String>> incoming_message = gson.fromJson(message, type);
Expand Down

0 comments on commit 77bf4b2

Please sign in to comment.