Skip to content

Commit

Permalink
Acquire locks with timeout (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
slominskir authored Feb 12, 2024
1 parent 134f764 commit abc4aed
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 89 deletions.
163 changes: 78 additions & 85 deletions src/main/java/org/jlab/epics2web/epics/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import gov.aps.jca.TimeoutException;
import gov.aps.jca.dbr.DBR;
import gov.aps.jca.dbr.DBRType;
import org.jlab.util.LockAcquisitionTimeoutException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -17,9 +19,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.json.JsonObjectBuilder;

public class ChannelManager {
Expand All @@ -39,6 +42,12 @@ public class ChannelManager {
private final ScheduledExecutorService timeoutExecutor;
private final ExecutorService callbackExecutor;

private final ReentrantLock clientLock = new ReentrantLock();
private final ReentrantLock monitorLock = new ReentrantLock();

private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 5;
private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 60;

/**
* Create a new ChannelMonitorManager.
*
Expand All @@ -52,35 +61,6 @@ public ChannelManager(CAJContext context, ScheduledExecutorService timeoutExecut
this.callbackExecutor = callbackExecutor;
}

public void reset(CAJContext context) {
try {
this.context.destroy(); // Destroy old context
} catch (Exception e) { // IllegalStateException or CAException or whatever
LOGGER.log(Level.SEVERE, "Unable to destroy context with unresponsive virtual circuit", e);
}

synchronized (monitorMap) {
monitorMap.clear();
this.context = context; // Assign new context
}

Map<PvListener, Set<String>> old;

synchronized (clientMap) {
old = new HashMap<>(clientMap);
for (PvListener listener : old.keySet()) {
clientMap.put(listener, Collections.emptySet());
}
}

for (PvListener listener : old.keySet()) {
Set<String> pvs = old.get(listener);
String listOPvs = pvs.stream().collect(Collectors.joining(" "));
//addPvs(listener, pvs);
LOGGER.log(Level.INFO, "Client: {0}, PVs: {1}", new Object[]{listener, listOPvs});
}
}

public void addValueToJSON(JsonObjectBuilder builder, DBR dbr) {
try {
if (dbr.isDOUBLE()) {
Expand Down Expand Up @@ -197,19 +177,6 @@ private DBR doGet(CAJChannel channel, boolean enumLabel) throws CAException {
return dbr;
}

/**
* Registers a PV monitor on the supplied PV for the given listener.
* Equivalent to calling addPvs with a set of one PV.
*
* @param listener The PvListener
* @param pv The EPICS PV name
*/
public void addPv(PvListener listener, String pv) {
HashSet<String> pvSet = new HashSet<>();
pvSet.add(pv);
addPvs(listener, pvSet);
}

/**
* Registers PV monitors on the supplied PVs for the given listener. Note
* that internally only a single monitor is used for any given PV. PVs for
Expand All @@ -220,7 +187,7 @@ public void addPv(PvListener listener, String pv) {
* @param listener The PvListener to receive notifications
* @param addPvSet The set of PVs to monitor
*/
public void addPvs(PvListener listener, Set<String> addPvSet) {
public void addPvs(PvListener listener, Set<String> addPvSet) throws InterruptedException, CAException, LockAcquisitionTimeoutException {
Set<String> newPvSet = new HashSet<>();

if (addPvSet != null) {
Expand All @@ -236,21 +203,24 @@ public void addPvs(PvListener listener, Set<String> addPvSet) {
for (String pv : addPvSet) {
//LOGGER.log(Level.FINEST, "addListener pv: {0}; pv: {1}", new Object[]{session, pv});

ChannelMonitor monitor;
synchronized (monitorMap) {
monitor = monitorMap.get(pv);
ChannelMonitor monitor = null;

if(monitorLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
monitor = monitorMap.get(pv);

if (monitor == null) {
//LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv);
try {
if (monitor == null) {
//LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv);
monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor);
monitorMap.put(pv, monitor);
} catch (CAException e) {
LOGGER.log(Level.WARNING, "Unable to create channel monitor; skipping", e);
} else {
//LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv);
}
} else {
//LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv);
} finally {
monitorLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in addPvs");
}

if (monitor != null) {
Expand All @@ -259,14 +229,20 @@ public void addPvs(PvListener listener, Set<String> addPvSet) {
}
}

synchronized (clientMap) {
Set<String> oldPvSet = clientMap.get(listener);
if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> oldPvSet = clientMap.get(listener);

if (oldPvSet != null) {
newPvSet.addAll(oldPvSet);
}

if (oldPvSet != null) {
newPvSet.addAll(oldPvSet);
clientMap.put(listener, newPvSet);
} finally {
clientLock.unlock();
}

clientMap.put(listener, newPvSet);
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs");
}
}

Expand All @@ -276,19 +252,25 @@ public void addPvs(PvListener listener, Set<String> addPvSet) {
* @param listener The PvListener
* @param clearPvSet The PV set to clear
*/
public void clearPvs(PvListener listener, Set<String> clearPvSet) {
public void clearPvs(PvListener listener, Set<String> clearPvSet) throws InterruptedException, LockAcquisitionTimeoutException {
Set<String> newPvSet;

synchronized (clientMap) {
Set<String> oldPvSet = clientMap.get(listener);
if(clientLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> oldPvSet = clientMap.get(listener);

if (oldPvSet != null) {
newPvSet = new HashSet<>(oldPvSet);
newPvSet.removeAll(clearPvSet);
} else {
newPvSet = new HashSet<>();
if (oldPvSet != null) {
newPvSet = new HashSet<>(oldPvSet);
newPvSet.removeAll(clearPvSet);
} else {
newPvSet = new HashSet<>();
}
clientMap.put(listener, newPvSet);
} finally {
clientLock.unlock();
}
clientMap.put(listener, newPvSet);
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs");
}

removeFromChannels(listener, clearPvSet);
Expand All @@ -304,16 +286,21 @@ public void clearPvs(PvListener listener, Set<String> clearPvSet) {
*
* @param listener The PvListener
*/
public void addListener(PvListener listener) {
public void addListener(PvListener listener) throws InterruptedException, LockAcquisitionTimeoutException {
if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> pvSet = clientMap.get(listener);

synchronized (clientMap) {
Set<String> pvSet = clientMap.get(listener);
if (pvSet == null) {
pvSet = new HashSet<>();
}

if (pvSet == null) {
pvSet = new HashSet<>();
clientMap.put(listener, pvSet);
} finally {
clientLock.unlock();
}

clientMap.put(listener, pvSet);
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addListener");
}
}

Expand All @@ -322,9 +309,9 @@ public void addListener(PvListener listener) {
* channel then closes the channel.
*
* @param listener The PvListener
* @param pvList The PV list (and indirectly the channel list)
* @param pvSet The PV list (and indirectly the channel list)
*/
private void removeFromChannels(PvListener listener, Set<String> pvSet) {
private void removeFromChannels(PvListener listener, Set<String> pvSet) throws InterruptedException, LockAcquisitionTimeoutException {
if (pvSet != null) { // Some clients don't immediately connect to a pv so have an empty pv list
for (String pv : pvSet) {
int listenerCount = 0;
Expand All @@ -335,15 +322,21 @@ private void removeFromChannels(PvListener listener, Set<String> pvSet) {
monitor.removeListener(listener);
}

synchronized (monitorMap) {
monitor = monitorMap.get(pv);
if(monitorLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
monitor = monitorMap.get(pv);

if (monitor != null) {
listenerCount = monitor.getListenerCount();
if (listenerCount == 0) {
monitorMap.remove(pv);
if (monitor != null) {
listenerCount = monitor.getListenerCount();
if (listenerCount == 0) {
monitorMap.remove(pv);
}
}
} finally {
monitorLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in removeFromChannels");
}

// We call close without holding a lock
Expand All @@ -364,7 +357,7 @@ private void removeFromChannels(PvListener listener, Set<String> pvSet) {
*
* @param listener The PvListener
*/
public void removeListener(PvListener listener) {
public void removeListener(PvListener listener) throws LockAcquisitionTimeoutException, InterruptedException {
//LOGGER.log(Level.FINEST, "removeListener: {0}", session);
Set<String> pvSet = clientMap.remove(listener);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jlab.epics2web.websocket;

import gov.aps.jca.CAException;
import gov.aps.jca.dbr.DBR;
import gov.aps.jca.dbr.DBRType;
import java.io.IOException;
Expand All @@ -20,6 +21,7 @@
import javax.websocket.Session;
import org.jlab.epics2web.Application;
import org.jlab.epics2web.epics.PvListener;
import org.jlab.util.LockAcquisitionTimeoutException;

/**
* Manages web socket sessions and ties them to channel access monitors.
Expand Down Expand Up @@ -148,7 +150,12 @@ public void recordInteractionDate(Session session) {
public void addClient(Session session) {
WebSocketSessionMonitor listener = getListener(session);

Application.channelManager.addListener(listener);
try {
Application.channelManager.addListener(listener);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addClient: " + session.getId(), e);
// TODO: Retry?
}
}

/**
Expand All @@ -162,7 +169,12 @@ public void removeClient(Session session) {
if (listener != null) {
listenerMap.remove(session);

Application.channelManager.removeListener(listener);
try {
Application.channelManager.removeListener(listener);
} catch (LockAcquisitionTimeoutException | InterruptedException e) {
LOGGER.log(Level.WARNING, "Unable to removeClient: " + session.getId(), e);
// TODO: Retry?
}
}
}

Expand All @@ -175,7 +187,12 @@ public void removeClient(Session session) {
public void addPvs(Session session, Set<String> pvSet) {
WebSocketSessionMonitor listener = getListener(session);

Application.channelManager.addPvs(listener, pvSet);
try {
Application.channelManager.addPvs(listener, pvSet);
} catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e);
// TODO: Retry?
}
}

/**
Expand All @@ -187,7 +204,12 @@ public void addPvs(Session session, Set<String> pvSet) {
public void clearPvs(Session session, Set<String> pvSet) {
WebSocketSessionMonitor listener = getListener(session);

Application.channelManager.clearPvs(listener, pvSet);
try {
Application.channelManager.clearPvs(listener, pvSet);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to clearPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e);
// TODO: Retry?
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.jlab.util;

public class LockAcquisitionTimeoutException extends Exception {
public LockAcquisitionTimeoutException(String message) {
super(message);
}
}

0 comments on commit abc4aed

Please sign in to comment.