Skip to content

Commit

Permalink
Fine tune lock handling / timeout / async code (#15)
Browse files Browse the repository at this point in the history
Fixes #14
  • Loading branch information
slominskir authored Feb 13, 2024
1 parent 9568946 commit 8a13828
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 174 deletions.
219 changes: 79 additions & 140 deletions src/main/java/org/jlab/epics2web/epics/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,18 @@ public class ChannelManager {
private final ScheduledExecutorService timeoutExecutor;
private final ExecutorService callbackExecutor;

private final ReentrantLock clientLock = new ReentrantLock();
private final ReentrantLock monitorLock = new ReentrantLock();
private final ReentrantLock managerLock = new ReentrantLock();

private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 5;
private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 60;
/**
* Overloaded server will reject create channel requests after 30 seconds; may also shake deadlock bug in
* CAJ createChannel.
*/
private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 30;

/**
* After 15 minutes we assume better to leak resource than stay stuck
*/
private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 900;

/**
* Create a new ChannelMonitorManager.
Expand Down Expand Up @@ -178,175 +185,96 @@ private DBR doGet(CAJChannel channel, boolean enumLabel) throws CAException {
}

/**
* Registers PV monitors on the supplied PVs for the given listener. Note
* Registers a PV monitor on the supplied PV for the given listener. Note
* that internally only a single monitor is used for any given PV. PVs for
* which the given listener is already listening to are skipped (duplicate
* PVs are ignored). There is no need to call addListener before calling
* this method.
*
* @param listener The PvListener to receive notifications
* @param addPvSet The set of PVs to monitor
* @param pv The PV to monitor
*/
public void addPvs(PvListener listener, Set<String> addPvSet) throws InterruptedException, CAException, LockAcquisitionTimeoutException {
Set<String> newPvSet = new HashSet<>();

if (addPvSet != null) {
// Make sure empty string isn't included as a PV as that is invalid and is ignored
boolean emptyIncluded = addPvSet.remove("");

if (emptyIncluded) {
LOGGER.log(Level.FINEST, "Empty string ignored in add PV request");
}
public void addPv(PvListener listener, String pv) throws InterruptedException, CAException, LockAcquisitionTimeoutException {
LOGGER.log(Level.FINEST, "addPv: {0} {1}", new Object[] {listener, pv});
ChannelMonitor monitor = null;
monitor = monitorMap.get(pv);

newPvSet.addAll(addPvSet);

for (String pv : addPvSet) {
//LOGGER.log(Level.FINEST, "addListener pv: {0}; pv: {1}", new Object[]{session, pv});

ChannelMonitor monitor = null;

if(monitorLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
monitor = monitorMap.get(pv);

if (monitor == null) {
//LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv);
monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor);
monitorMap.put(pv, monitor);
} else {
//LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv);
}
} finally {
monitorLock.unlock();
}
// INTERNAL HOLDING LOCK
if(managerLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
if (monitor == null) {
//LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv);
// HERE IS THE HEAVYWEIGHT ACTION: It's an async create channel request, but is still
// bottleneck; We're holding a lock while we wait...
monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor);
monitorMap.put(pv, monitor);
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in addPvs");
//LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv);
}

if (monitor != null) {
monitor.addListener(listener);
}
}
}

if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> oldPvSet = clientMap.get(listener);
Set<String> clientPvSet = clientMap.get(listener);

if (oldPvSet != null) {
newPvSet.addAll(oldPvSet);
if (clientPvSet == null) {
clientPvSet = new HashSet<>();
}

clientMap.put(listener, newPvSet);
clientPvSet.add(pv);

clientMap.put(listener, clientPvSet);
} finally {
clientLock.unlock();
managerLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs");
throw new LockAcquisitionTimeoutException("Timeout while acquiring managerLock in addPv");
}

// EXTERNAL NO LOCK
monitor.addListener(listener);
}

/**
* Removes the supplied PVs from the given listener.
* Removes the PV from the given listener. If the last listener on a given channel the monitor is also removed.
*
* @param listener The PvListener
* @param clearPvSet The PV set to clear
* @param pv The PV to remove
*/
public void clearPvs(PvListener listener, Set<String> clearPvSet) throws InterruptedException, LockAcquisitionTimeoutException {
Set<String> newPvSet;
public void removePv(PvListener listener, String pv) throws InterruptedException, LockAcquisitionTimeoutException {
LOGGER.log(Level.FINEST, "removePv: {0} {1}", new Object[] {listener, pv});
int listenerCount = 0;
ChannelMonitor monitor = monitorMap.get(pv);

if(clientLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> oldPvSet = clientMap.get(listener);
if (monitor != null) {
monitor.removeListener(listener);
}

if (oldPvSet != null) {
newPvSet = new HashSet<>(oldPvSet);
newPvSet.removeAll(clearPvSet);
} else {
newPvSet = new HashSet<>();
// INTERNAL HOLDING LOCK
if(managerLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
if (monitor != null) {
listenerCount = monitor.getListenerCount();
if (listenerCount == 0) {
monitorMap.remove(pv);
}
clientMap.put(listener, newPvSet);
} finally {
clientLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs");
}

removeFromChannels(listener, clearPvSet);
}

/**
* A convenience method to add a listener without registering any PVs to
* monitor. This is a rare use-case and is equivalent to calling addPvs with
* a null set of PVs.
*
* Allowing a listener without any PVs registered may be deprecated in the
* future.
*
* @param listener The PvListener
*/
public void addListener(PvListener listener) throws InterruptedException, LockAcquisitionTimeoutException {
if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> pvSet = clientMap.get(listener);
Set<String> clientPvSet = clientMap.get(listener);

if (pvSet == null) {
pvSet = new HashSet<>();
if (clientPvSet != null) {
clientPvSet.remove(pv);
}

clientMap.put(listener, pvSet);
} finally {
clientLock.unlock();
managerLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addListener");
throw new LockAcquisitionTimeoutException("Timeout while acquiring managerLock in removePv");
}
}

/**
* Removes a listener from channels and if no listeners remain on a given
* channel then closes the channel.
*
* @param listener The PvListener
* @param pvSet The PV list (and indirectly the channel list)
*/
private void removeFromChannels(PvListener listener, Set<String> pvSet) throws InterruptedException, LockAcquisitionTimeoutException {
if (pvSet != null) { // Some clients don't immediately connect to a pv so have an empty pv list
for (String pv : pvSet) {
int listenerCount = 0;

ChannelMonitor monitor = monitorMap.get(pv);

if (monitor != null) {
monitor.removeListener(listener);
}

if(monitorLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
monitor = monitorMap.get(pv);

if (monitor != null) {
listenerCount = monitor.getListenerCount();
if (listenerCount == 0) {
monitorMap.remove(pv);
}
}
} finally {
monitorLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in removeFromChannels");
}

// We call close without holding a lock
if (monitor != null && listenerCount == 0) {
try {
monitor.close();
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Unable to close monitor", e);
}
}
// EXTERNAL NO LOCK
if (monitor != null && listenerCount == 0) {
try {
monitor.close();
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Unable to close monitor", e);
}
}
}
Expand All @@ -356,13 +284,24 @@ private void removeFromChannels(PvListener listener, Set<String> pvSet) throws I
* interested in.
*
* @param listener The PvListener
* @return a map of PV names to Exceptions for any PVs that were unable to be removed
*/
public void removeListener(PvListener listener) throws LockAcquisitionTimeoutException, InterruptedException {
//LOGGER.log(Level.FINEST, "removeListener: {0}", session);
public Map<String, Exception> removeAll(PvListener listener) {
LOGGER.log(Level.FINEST, "removeAll: {0}", listener);
Set<String> pvSet = clientMap.remove(listener);

// Don't do this while holding writeLock above since this method could be called by monitorChanged or from websocket close!
removeFromChannels(listener, pvSet);
Map<String, Exception> failed = new HashMap<>();
if(pvSet != null) {
for (String pv : pvSet) {
try {
removePv(listener, pv);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
failed.put(pv, e);
}
}
}

return failed;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public String onMessage(String message, Session session) {
JsonArray pvs = obj.getJsonArray("pvs");
Set<String> pvSet = Application.sessionManager.getPvSetFromJson(pvs);

Application.sessionManager.clearPvs(session, pvSet);
Application.sessionManager.removePvs(session, pvSet);
} else {
LOGGER.log(Level.WARNING, "Unknown client request: {0}", message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,8 @@ public void recordInteractionDate(Session session) {
* @param session The session (client) to manage
*/
public void addClient(Session session) {
WebSocketSessionMonitor listener = getListener(session);

try {
Application.channelManager.addListener(listener);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addClient: " + session.getId(), e);
// TODO: Retry?
}
// Only a "real" client once actually monitoring something via addPvs()
// In other words, let's lazily create state only once needed
}

/**
Expand All @@ -164,18 +158,7 @@ public void addClient(Session session) {
* @param session The session (client) to remove
*/
public void removeClient(Session session) {
WebSocketSessionMonitor listener = listenerMap.get(session);

if (listener != null) {
listenerMap.remove(session);

try {
Application.channelManager.removeListener(listener);
} catch (LockAcquisitionTimeoutException | InterruptedException e) {
LOGGER.log(Level.WARNING, "Unable to removeClient: " + session.getId(), e);
// TODO: Retry?
}
}
removePvs(session, null);
}

/**
Expand All @@ -187,28 +170,57 @@ public void removeClient(Session session) {
public void addPvs(Session session, Set<String> pvSet) {
WebSocketSessionMonitor listener = getListener(session);

try {
Application.channelManager.addPvs(listener, pvSet);
} catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e);
// TODO: Retry?
if (pvSet != null) {
// Make sure empty string isn't included as a PV as that is invalid and is ignored
boolean emptyIncluded = pvSet.remove("");

if (emptyIncluded) {
LOGGER.log(Level.FINEST, "Empty string ignored in add PV request");
}

for (String pv : pvSet) {
try {
Application.channelManager.addPv(listener, pv);
} catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addPv: " + pv, e);
// TODO: Retry?
}
}
}
}

/**
* Stop monitoring the provided PVs for the specified client.
* Stop monitoring the provided PVs for the specified client. Completely remove the session and all PVs by setting
* pvSet to null.
*
* @param session The client session
* @param pvSet The set of PVs
* @param pvSet The set of PVs. Remove all if pvSet is null
*/
public void clearPvs(Session session, Set<String> pvSet) {
public void removePvs(Session session, Set<String> pvSet) {
WebSocketSessionMonitor listener = getListener(session);

try {
Application.channelManager.clearPvs(listener, pvSet);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to clearPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e);
// TODO: Retry?
if (pvSet != null) {
// Make sure empty string isn't included as a PV as that is invalid and is ignored
boolean emptyIncluded = pvSet.remove("");

if (emptyIncluded) {
LOGGER.log(Level.FINEST, "Empty string ignored in remove PV request");
}

for (String pv : pvSet) {
try {
Application.channelManager.removePv(listener, pv);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to removePv: " + pv, e);
// TODO: Retry?
}
}
} else { // pvSet == null (removeAll)
Map<String, Exception> failed = Application.channelManager.removeAll(listener);
for(String pv: failed.keySet()) {
LOGGER.log(Level.WARNING, "Unable to (bulk) removePv: " + pv, failed.get(pv));
// TODO: Retry?
}
}
}

Expand Down

0 comments on commit 8a13828

Please sign in to comment.