Skip to content

Commit

Permalink
[insteon] Convert legacy threads to use scheduler service (#17904)
Browse files Browse the repository at this point in the history
Signed-off-by: jsetton <[email protected]>
  • Loading branch information
jsetton authored Dec 18, 2024
1 parent 29915c4 commit a94e4a1
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.openhab.binding.insteon.internal.device.LegacyDeviceFeature;
import org.openhab.binding.insteon.internal.device.LegacyDeviceType;
import org.openhab.binding.insteon.internal.device.LegacyDeviceTypeLoader;
import org.openhab.binding.insteon.internal.device.LegacyPollManager;
import org.openhab.binding.insteon.internal.device.LegacyRequestManager;
import org.openhab.binding.insteon.internal.device.X10Address;
import org.openhab.binding.insteon.internal.device.database.LegacyModemDBEntry;
import org.openhab.binding.insteon.internal.device.feature.LegacyFeatureListener;
Expand Down Expand Up @@ -269,7 +267,7 @@ public void updateFeatureState(ChannelUID channelUID, State state) {
int ndev = checkIfInModemDatabase(device);
if (device.hasModemDBEntry()) {
device.setStatus(DeviceStatus.POLLING);
LegacyPollManager.instance().startPolling(device, ndev);
driver.getPollManager().startPolling(device, ndev);
}
}
devices.put(address, device);
Expand All @@ -286,7 +284,7 @@ public void removeDevice(DeviceAddress address) {
}

if (device.getStatus() == DeviceStatus.POLLING) {
LegacyPollManager.instance().stopPolling(device);
driver.getPollManager().stopPolling(device);
}
}

Expand Down Expand Up @@ -350,8 +348,6 @@ public void shutdown() {
logger.debug("shutting down Insteon bridge");
driver.stop();
devices.clear();
LegacyRequestManager.destroyInstance();
LegacyPollManager.instance().stop();
isActive = false;
}

Expand Down Expand Up @@ -427,7 +423,7 @@ public int compare(Byte b1, Byte b2) {

public void logDeviceStatistics() {
String msg = String.format("devices: %3d configured, %3d polling, msgs received: %5d", devices.size(),
LegacyPollManager.instance().getSizeOfQueue(), messagesReceived);
driver.getPollManager().getSizeOfQueue(), messagesReceived);
logger.debug("{}", msg);
messagesReceived = 0;
for (LegacyDevice device : devices.values()) {
Expand Down Expand Up @@ -485,7 +481,7 @@ public void driverCompletelyInitialized() {
device.setHasModemDBEntry(true);
}
if (device.getStatus() != DeviceStatus.POLLING) {
LegacyPollManager.instance().startPolling(device, dbes.size());
driver.getPollManager().startPolling(device, dbes.size());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,9 @@ public void doPoll(long delay) {
mrequestQueue.add(qe);
}
}
LegacyRequestManager instance = LegacyRequestManager.instance();
if (instance != null) {
instance.addQueue(this, now + delay);
} else {
logger.warn("request queue manager is null");
LegacyDriver driver = this.driver;
if (driver != null) {
driver.getRequestManager().addQueue(this, now + delay);
}

if (!list.isEmpty()) {
Expand Down Expand Up @@ -388,11 +386,9 @@ public void enqueueDelayedMessage(Msg msg, LegacyDeviceFeature feature, long del
msg.setQuietTime(QUIET_TIME_DIRECT_MESSAGE);
}
logger.trace("enqueing direct message with delay {}", delay);
LegacyRequestManager instance = LegacyRequestManager.instance();
if (instance != null) {
instance.addQueue(this, now + delay);
} else {
logger.warn("request queue manger instance is null");
LegacyDriver driver = this.driver;
if (driver != null) {
driver.getRequestManager().addQueue(this, now + delay);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.insteon.internal.InsteonBindingConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,16 +47,16 @@ public class LegacyPollManager {
private static final long MIN_MSEC_BETWEEN_POLLS = 2000L;

private final Logger logger = LoggerFactory.getLogger(LegacyPollManager.class);
private static LegacyPollManager poller = new LegacyPollManager(); // for singleton

private @Nullable Thread pollThread = null;
private ScheduledExecutorService scheduler;
private @Nullable ScheduledFuture<?> job;
private TreeSet<PQEntry> pollQueue = new TreeSet<>();
private boolean keepRunning = true;

/**
* Constructor
*/
private LegacyPollManager() {
public LegacyPollManager(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}

/**
Expand Down Expand Up @@ -104,39 +106,19 @@ public void stopPolling(LegacyDevice device) {
* Starts the poller thread
*/
public void start() {
if (pollThread == null) {
pollThread = new Thread(new PollQueueReader());
setParamsAndStart(pollThread);
}
}

private void setParamsAndStart(@Nullable Thread thread) {
if (thread != null) {
thread.setName("OH-binding-" + InsteonBindingConstants.BINDING_ID + "-pollQueueReader");
thread.setDaemon(true);
thread.start();
if (job == null) {
job = scheduler.schedule(new PollQueueReader(), 0, TimeUnit.SECONDS);
}
}

/**
* Stops the poller thread
*/
public void stop() {
logger.debug("stopping poller!");
synchronized (pollQueue) {
pollQueue.clear();
keepRunning = false;
pollQueue.notify();
}
try {
Thread pollThread = this.pollThread;
if (pollThread != null) {
pollThread.join();
this.pollThread = null;
}
keepRunning = true;
} catch (InterruptedException e) {
logger.debug("got interrupted on exit: {}", e.getMessage());
ScheduledFuture<?> job = this.job;
if (job != null) {
job.cancel(true);
this.job = null;
}
}

Expand Down Expand Up @@ -204,18 +186,17 @@ private long findNextExpirationTime(LegacyDevice device, long time) {
private class PollQueueReader implements Runnable {
@Override
public void run() {
logger.debug("starting poll thread.");
synchronized (pollQueue) {
while (keepRunning) {
try {
logger.debug("starting poll queue thread");
try {
while (!Thread.interrupted()) {
synchronized (pollQueue) {
readPollQueue();
} catch (InterruptedException e) {
logger.warn("poll queue reader thread interrupted!");
break;
}
}
} catch (InterruptedException e) {
logger.trace("poll queue thread interrupted!");
}
logger.debug("poll thread exiting");
logger.debug("exiting poll queue thread!");
}

/**
Expand All @@ -225,10 +206,9 @@ public void run() {
* @throws InterruptedException
*/
private void readPollQueue() throws InterruptedException {
while (pollQueue.isEmpty() && keepRunning) {
if (pollQueue.isEmpty()) {
logger.trace("waiting for poll queue to fill");
pollQueue.wait();
}
if (!keepRunning) {
return;
}
// something is in the queue
Expand Down Expand Up @@ -297,14 +277,4 @@ public String toString() {
return device.getAddress().toString() + "/" + String.format("%tc", new Date(expirationTime));
}
}

/**
* Singleton pattern instance() method
*
* @return the poller instance
*/
public static synchronized LegacyPollManager instance() {
poller.start();
return poller;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.insteon.internal.InsteonBindingConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,24 +41,18 @@
*/
@NonNullByDefault
public class LegacyRequestManager {
private static @Nullable LegacyRequestManager instance = null;
private final Logger logger = LoggerFactory.getLogger(LegacyRequestManager.class);
private @Nullable Thread queueThread = null;

private ScheduledExecutorService scheduler;
private @Nullable ScheduledFuture<?> job;
private Queue<RequestQueue> requestQueues = new PriorityQueue<>();
private Map<LegacyDevice, RequestQueue> requestQueueHash = new HashMap<>();
private boolean keepRunning = true;

private LegacyRequestManager() {
queueThread = new Thread(new RequestQueueReader());
setParamsAndStart(queueThread);
}

private void setParamsAndStart(@Nullable Thread thread) {
if (thread != null) {
thread.setName("OH-binding-" + InsteonBindingConstants.BINDING_ID + "-requestQueueReader");
thread.setDaemon(true);
thread.start();
}
/**
* Constructor
*/
public LegacyRequestManager(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}

/**
Expand Down Expand Up @@ -92,37 +88,40 @@ public void addQueue(LegacyDevice device, long time) {
}
}

/**
* Starts request queue thread
*/
public void start() {
if (job == null) {
job = scheduler.schedule(new RequestQueueReader(), 0, TimeUnit.SECONDS);
}
}

/**
* Stops request queue thread
*/
private void stopThread() {
logger.debug("stopping thread");
Thread queueThread = this.queueThread;
if (queueThread != null) {
synchronized (requestQueues) {
keepRunning = false;
requestQueues.notifyAll();
}
try {
logger.debug("waiting for thread to join");
queueThread.join();
logger.debug("request queue thread exited!");
} catch (InterruptedException e) {
logger.warn("got interrupted waiting for thread exit ", e);
}
this.queueThread = null;
public void stop() {
ScheduledFuture<?> job = this.job;
if (job != null) {
job.cancel(true);
this.job = null;
}
}

class RequestQueueReader implements Runnable {
@Override
public void run() {
logger.debug("starting request queue thread");
synchronized (requestQueues) {
while (keepRunning) {
try {
RequestQueue queue;
while (keepRunning && (queue = requestQueues.peek()) != null) {
try {
while (!Thread.interrupted()) {
synchronized (requestQueues) {
if (requestQueues.isEmpty()) {
logger.trace("waiting for request queues to fill");
requestQueues.wait();
continue;
}
RequestQueue queue = requestQueues.peek();
if (queue != null) {
long now = System.currentTimeMillis();
long expTime = queue.getExpirationTime();
LegacyDevice device = queue.getDevice();
Expand Down Expand Up @@ -150,13 +149,10 @@ public void run() {
logger.debug("device queue for {} is empty!", device.getAddress());
}
}
logger.trace("waiting for request queues to fill");
requestQueues.wait();
} catch (InterruptedException e) {
logger.warn("request queue thread got interrupted, breaking..", e);
break;
}
}
} catch (InterruptedException e) {
logger.trace("request queue thread interrupted!");
}
logger.debug("exiting request queue thread!");
}
Expand Down Expand Up @@ -188,19 +184,4 @@ public int compareTo(RequestQueue queue) {
return (int) (expirationTime - queue.expirationTime);
}
}

public static synchronized @Nullable LegacyRequestManager instance() {
if (instance == null) {
instance = new LegacyRequestManager();
}
return instance;
}

public static synchronized void destroyInstance() {
LegacyRequestManager instance = LegacyRequestManager.instance;
if (instance != null) {
instance.stopThread();
LegacyRequestManager.instance = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void run() {
}
}
} catch (InterruptedException e) {
logger.debug("poll queue thread interrupted!");
logger.trace("poll queue thread interrupted!");
}
logger.debug("exiting poll queue thread!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void run() {
}
}
} catch (InterruptedException e) {
logger.debug("request queue thread interrupted!");
logger.trace("request queue thread interrupted!");
}
logger.debug("exiting request queue thread!");
}
Expand Down
Loading

0 comments on commit a94e4a1

Please sign in to comment.