Skip to content

Commit

Permalink
[insteon] Refactor iostream transport classes (#17930)
Browse files Browse the repository at this point in the history
Signed-off-by: jsetton <[email protected]>
  • Loading branch information
jsetton authored Dec 21, 2024
1 parent 6acfeb6 commit 1b75e03
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Objects;
Expand All @@ -34,8 +33,6 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpStatus;
import org.openhab.binding.insteon.internal.utils.HexUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implements IOStream for an Insteon Hub 2
Expand All @@ -47,10 +44,8 @@
*/
@NonNullByDefault
public class HubIOStream extends IOStream {
private final Logger logger = LoggerFactory.getLogger(HubIOStream.class);

private static final String BS_START = "<BS>";
private static final String BS_END = "</BS>";
private static final String BUFFER_TAG_START = "<BS>";
private static final String BUFFER_TAG_END = "</BS>";
private static final int REQUEST_TIMEOUT = 30; // in seconds

private String host;
Expand All @@ -61,7 +56,7 @@ public class HubIOStream extends IOStream {
private ScheduledExecutorService scheduler;
private @Nullable ScheduledFuture<?> job;
// index of the last byte we have read in the buffer
private int bufferIdx = -1;
private volatile int bufferIdx = -1;

/**
* Constructor
Expand All @@ -84,14 +79,9 @@ public HubIOStream(String host, int port, String username, String password, int
this.scheduler = scheduler;
}

@Override
public boolean isOpen() {
return job != null;
}

@Override
public boolean open() {
if (isOpen()) {
if (job != null) {
logger.warn("hub stream is already open");
return false;
}
Expand Down Expand Up @@ -120,93 +110,65 @@ public boolean open() {

@Override
public void close() {
super.close();

ScheduledFuture<?> job = this.job;
if (job != null) {
job.cancel(true);
this.job = null;
}

InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}

OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
}

/**
* Fetches the latest status buffer from the Hub
* Returns the latest buffer from the Hub
*
* @return string with status buffer
* @return the buffer string
* @throws IOException
*/
private synchronized String bufferStatus() throws IOException {
private String getBuffer() throws IOException {
String result = getURL("/buffstatus.xml");

int start = result.indexOf(BS_START);
if (start == -1) {
throw new IOException("malformed bufferstatus.xml");
}
start += BS_START.length();

int end = result.indexOf(BS_END, start);
if (end == -1) {
throw new IOException("malformed bufferstatus.xml");
int start = result.indexOf(BUFFER_TAG_START);
int end = result.indexOf(BUFFER_TAG_END, start);
if (start == -1 || end == -1) {
throw new IOException("malformed buffstatus.xml");
}
start += BUFFER_TAG_START.length();

return result.substring(start, end).trim();
}

/**
* Sends command to Hub to clear the status buffer
* Clears the Hub buffer
*
* @throws IOException
*/
private synchronized void clearBuffer() throws IOException {
private void clearBuffer() throws IOException {
logger.trace("clearing buffer");
getURL("/1?XB=M=1");
bufferIdx = 0;
}

/**
* Sends Insteon message (byte array) as a readable ascii string to the Hub
* Sends a message to the Hub
*
* @param msg byte array representing the Insteon message
* @throws IOException in case of I/O error
* @param b byte array representing the Insteon message
* @throws IOException
*/
public synchronized void write(ByteBuffer msg) throws IOException {
poll(); // fetch the status buffer before we send out commands

StringBuilder b = new StringBuilder();
while (msg.remaining() > 0) {
b.append(String.format("%02x", msg.get()));
}
String hexMsg = b.toString();
logger.trace("writing a message");
getURL("/3?" + hexMsg + "=I=3");
private void sendMessage(byte[] b) throws IOException {
poll(); // poll the status buffer before we send the message
logger.trace("sending a message");
getURL("/3?" + HexUtils.getHexString(b) + "=I=3");
bufferIdx = 0;
}

/**
* Polls the Hub web interface to fetch the status buffer
* Polls the Hub buffer and add to input stream
*
* @throws IOException if something goes wrong with I/O
* @throws IOException
*/
private synchronized void poll() throws IOException {
String buffer = bufferStatus(); // fetch via http call
private void poll() throws IOException {
String buffer = getBuffer();
logger.trace("poll: {}", buffer);
// The Hub maintains a ring buffer where the last two digits (in hex!) represent
// the position of the last byte read.
Expand Down Expand Up @@ -249,10 +211,9 @@ private synchronized void poll() throws IOException {
logger.trace("no wrap: appending new data: {}", msg);
}
if (msg.length() != 0) {
byte[] array = HexUtils.toByteArray(msg.toString());
ByteBuffer buf = ByteBuffer.wrap(array);
byte[] b = HexUtils.toByteArray(msg.toString());
if (in instanceof HubInputStream hubInput) {
hubInput.handle(buf);
hubInput.add(b);
} else {
logger.debug("hub input stream is null");
}
Expand Down Expand Up @@ -310,10 +271,10 @@ public class HubInputStream extends InputStream {
// A buffer to keep bytes while we are waiting for the inputstream to read
private ReadByteBuffer buffer = new ReadByteBuffer(1024);

public void handle(ByteBuffer b) throws IOException {
public void add(byte[] b) throws IOException {
// Make sure we cleanup as much space as possible
buffer.makeCompact();
buffer.add(b.array());
buffer.add(b);
}

@Override
Expand Down Expand Up @@ -342,18 +303,18 @@ public class HubOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
out.write(b);
flushBuffer();
flush();
}

@Override
public void write(byte @Nullable [] b, int off, int len) throws IOException {
out.write(b, off, len);
flushBuffer();
flush();
}

private void flushBuffer() throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(out.toByteArray());
HubIOStream.this.write(buffer);
@Override
public void flush() throws IOException {
sendMessage(out.toByteArray());
out.reset();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.openhab.binding.insteon.internal.config.InsteonHub2Configuration;
import org.openhab.binding.insteon.internal.config.InsteonPLMConfiguration;
import org.openhab.core.io.transport.serial.SerialPortManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstract class for implementation for I/O stream with anything that looks
Expand All @@ -38,6 +40,7 @@
*/
@NonNullByDefault
public abstract class IOStream {
protected final Logger logger = LoggerFactory.getLogger(getClass());

protected @Nullable InputStream in;
protected @Nullable OutputStream out;
Expand All @@ -47,20 +50,17 @@ public abstract class IOStream {
*
* @param b byte array (output)
* @return number of bytes read
* @throws InterruptedException
* @throws IOException
*/
public int read(byte @Nullable [] b) throws InterruptedException, IOException {
InputStream in = this.in;
if (in == null) {
throw new IOException("input stream not defined");
}
int len = 0;
while (len == 0) {
if (!isOpen()) {
throw new IOException("io stream not open");
}

InputStream in = this.in;
if (in != null) {
len = in.read(b);
} else {
throw new IOException("input stream not defined");
}
len = in.read(b);

if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -77,27 +77,16 @@ public int read(byte @Nullable [] b) throws InterruptedException, IOException {
* Writes data to IOStream
*
* @param b byte array to write
* @throws IOException
*/
public void write(byte @Nullable [] b) throws InterruptedException, IOException {
if (!isOpen()) {
throw new IOException("io stream not open");
}

public void write(byte @Nullable [] b) throws IOException {
OutputStream out = this.out;
if (out != null) {
out.write(b);
} else {
if (out == null) {
throw new IOException("output stream not defined");
}
out.write(b);
}

/**
* Returns if IOStream is open
*
* @return true if stream is open, false if not
*/
public abstract boolean isOpen();

/**
* Opens the IOStream
*
Expand All @@ -108,7 +97,27 @@ public void write(byte @Nullable [] b) throws InterruptedException, IOException
/**
* Closes the IOStream
*/
public abstract void close();
public void close() {
InputStream in = this.in;
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.debug("failed to close input stream", e);
}
this.in = null;
}

OutputStream out = this.out;
if (out != null) {
try {
out.close();
} catch (IOException e) {
logger.debug("failed to close output stream", e);
}
this.out = null;
}
}

/**
* Creates an IOStream from an insteon bridge config object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ public void stop() {
mdbb.stop();
}

if (ioStream.isOpen()) {
ioStream.close();
}
ioStream.close();

ScheduledFuture<?> readJob = this.readJob;
if (readJob != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void stop() {

connected.set(false);

if (ioStream.isOpen()) {
ioStream.close();
}
ioStream.close();

ScheduledFuture<?> readJob = this.readJob;
if (readJob != null) {
Expand Down
Loading

0 comments on commit 1b75e03

Please sign in to comment.