diff --git a/src/main/java/org/jlab/epics2web/epics/ChannelManager.java b/src/main/java/org/jlab/epics2web/epics/ChannelManager.java index 140b978..ed59cac 100644 --- a/src/main/java/org/jlab/epics2web/epics/ChannelManager.java +++ b/src/main/java/org/jlab/epics2web/epics/ChannelManager.java @@ -42,11 +42,18 @@ public class ChannelManager { private final ScheduledExecutorService timeoutExecutor; private final ExecutorService callbackExecutor; - private final ReentrantLock clientLock = new ReentrantLock(); - private final ReentrantLock monitorLock = new ReentrantLock(); + private final ReentrantLock managerLock = new ReentrantLock(); - private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 5; - private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 60; + /** + * Overloaded server will reject create channel requests after 30 seconds; may also shake deadlock bug in + * CAJ createChannel. + */ + private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 30; + + /** + * After 15 minutes we assume better to leak resource than stay stuck + */ + private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 900; /** * Create a new ChannelMonitorManager. @@ -178,175 +185,96 @@ private DBR doGet(CAJChannel channel, boolean enumLabel) throws CAException { } /** - * Registers PV monitors on the supplied PVs for the given listener. Note + * Registers a PV monitor on the supplied PV for the given listener. Note * that internally only a single monitor is used for any given PV. PVs for * which the given listener is already listening to are skipped (duplicate * PVs are ignored). There is no need to call addListener before calling * this method. * * @param listener The PvListener to receive notifications - * @param addPvSet The set of PVs to monitor + * @param pv The PV to monitor */ - public void addPvs(PvListener listener, Set addPvSet) throws InterruptedException, CAException, LockAcquisitionTimeoutException { - Set newPvSet = new HashSet<>(); - - if (addPvSet != null) { - // Make sure empty string isn't included as a PV as that is invalid and is ignored - boolean emptyIncluded = addPvSet.remove(""); - - if (emptyIncluded) { - LOGGER.log(Level.FINEST, "Empty string ignored in add PV request"); - } + public void addPv(PvListener listener, String pv) throws InterruptedException, CAException, LockAcquisitionTimeoutException { + LOGGER.log(Level.FINEST, "addPv: {0} {1}", new Object[] {listener, pv}); + ChannelMonitor monitor = null; + monitor = monitorMap.get(pv); - newPvSet.addAll(addPvSet); - - for (String pv : addPvSet) { - //LOGGER.log(Level.FINEST, "addListener pv: {0}; pv: {1}", new Object[]{session, pv}); - - ChannelMonitor monitor = null; - - if(monitorLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - try { - monitor = monitorMap.get(pv); - - if (monitor == null) { - //LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv); - monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor); - monitorMap.put(pv, monitor); - } else { - //LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv); - } - } finally { - monitorLock.unlock(); - } + // INTERNAL HOLDING LOCK + if(managerLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try { + if (monitor == null) { + //LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv); + // HERE IS THE HEAVYWEIGHT ACTION: It's an async create channel request, but is still + // bottleneck; We're holding a lock while we wait... + monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor); + monitorMap.put(pv, monitor); } else { - throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in addPvs"); + //LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv); } - if (monitor != null) { - monitor.addListener(listener); - } - } - } - - if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - try { - Set oldPvSet = clientMap.get(listener); + Set clientPvSet = clientMap.get(listener); - if (oldPvSet != null) { - newPvSet.addAll(oldPvSet); + if (clientPvSet == null) { + clientPvSet = new HashSet<>(); } - clientMap.put(listener, newPvSet); + clientPvSet.add(pv); + + clientMap.put(listener, clientPvSet); } finally { - clientLock.unlock(); + managerLock.unlock(); } } else { - throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs"); + throw new LockAcquisitionTimeoutException("Timeout while acquiring managerLock in addPv"); } + + // EXTERNAL NO LOCK + monitor.addListener(listener); } /** - * Removes the supplied PVs from the given listener. + * Removes the PV from the given listener. If the last listener on a given channel the monitor is also removed. * * @param listener The PvListener - * @param clearPvSet The PV set to clear + * @param pv The PV to remove */ - public void clearPvs(PvListener listener, Set clearPvSet) throws InterruptedException, LockAcquisitionTimeoutException { - Set newPvSet; + public void removePv(PvListener listener, String pv) throws InterruptedException, LockAcquisitionTimeoutException { + LOGGER.log(Level.FINEST, "removePv: {0} {1}", new Object[] {listener, pv}); + int listenerCount = 0; + ChannelMonitor monitor = monitorMap.get(pv); - if(clientLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - try { - Set oldPvSet = clientMap.get(listener); + if (monitor != null) { + monitor.removeListener(listener); + } - if (oldPvSet != null) { - newPvSet = new HashSet<>(oldPvSet); - newPvSet.removeAll(clearPvSet); - } else { - newPvSet = new HashSet<>(); + // INTERNAL HOLDING LOCK + if(managerLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + if (monitor != null) { + listenerCount = monitor.getListenerCount(); + if (listenerCount == 0) { + monitorMap.remove(pv); } - clientMap.put(listener, newPvSet); - } finally { - clientLock.unlock(); } - } else { - throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs"); - } - - removeFromChannels(listener, clearPvSet); - } - /** - * A convenience method to add a listener without registering any PVs to - * monitor. This is a rare use-case and is equivalent to calling addPvs with - * a null set of PVs. - * - * Allowing a listener without any PVs registered may be deprecated in the - * future. - * - * @param listener The PvListener - */ - public void addListener(PvListener listener) throws InterruptedException, LockAcquisitionTimeoutException { - if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { try { - Set pvSet = clientMap.get(listener); + Set clientPvSet = clientMap.get(listener); - if (pvSet == null) { - pvSet = new HashSet<>(); + if (clientPvSet != null) { + clientPvSet.remove(pv); } - - clientMap.put(listener, pvSet); } finally { - clientLock.unlock(); + managerLock.unlock(); } } else { - throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addListener"); + throw new LockAcquisitionTimeoutException("Timeout while acquiring managerLock in removePv"); } - } - - /** - * Removes a listener from channels and if no listeners remain on a given - * channel then closes the channel. - * - * @param listener The PvListener - * @param pvSet The PV list (and indirectly the channel list) - */ - private void removeFromChannels(PvListener listener, Set pvSet) throws InterruptedException, LockAcquisitionTimeoutException { - if (pvSet != null) { // Some clients don't immediately connect to a pv so have an empty pv list - for (String pv : pvSet) { - int listenerCount = 0; - - ChannelMonitor monitor = monitorMap.get(pv); - - if (monitor != null) { - monitor.removeListener(listener); - } - - if(monitorLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - try { - monitor = monitorMap.get(pv); - - if (monitor != null) { - listenerCount = monitor.getListenerCount(); - if (listenerCount == 0) { - monitorMap.remove(pv); - } - } - } finally { - monitorLock.unlock(); - } - } else { - throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in removeFromChannels"); - } - // We call close without holding a lock - if (monitor != null && listenerCount == 0) { - try { - monitor.close(); - } catch (IOException e) { - LOGGER.log(Level.WARNING, "Unable to close monitor", e); - } - } + // EXTERNAL NO LOCK + if (monitor != null && listenerCount == 0) { + try { + monitor.close(); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Unable to close monitor", e); } } } @@ -356,13 +284,24 @@ private void removeFromChannels(PvListener listener, Set pvSet) throws I * interested in. * * @param listener The PvListener + * @return a map of PV names to Exceptions for any PVs that were unable to be removed */ - public void removeListener(PvListener listener) throws LockAcquisitionTimeoutException, InterruptedException { - //LOGGER.log(Level.FINEST, "removeListener: {0}", session); + public Map removeAll(PvListener listener) { + LOGGER.log(Level.FINEST, "removeAll: {0}", listener); Set pvSet = clientMap.remove(listener); - // Don't do this while holding writeLock above since this method could be called by monitorChanged or from websocket close! - removeFromChannels(listener, pvSet); + Map failed = new HashMap<>(); + if(pvSet != null) { + for (String pv : pvSet) { + try { + removePv(listener, pv); + } catch (InterruptedException | LockAcquisitionTimeoutException e) { + failed.put(pv, e); + } + } + } + + return failed; } /** diff --git a/src/main/java/org/jlab/epics2web/websocket/MonitorEndpoint.java b/src/main/java/org/jlab/epics2web/websocket/MonitorEndpoint.java index b1fcae9..8024dea 100644 --- a/src/main/java/org/jlab/epics2web/websocket/MonitorEndpoint.java +++ b/src/main/java/org/jlab/epics2web/websocket/MonitorEndpoint.java @@ -186,7 +186,7 @@ public String onMessage(String message, Session session) { JsonArray pvs = obj.getJsonArray("pvs"); Set pvSet = Application.sessionManager.getPvSetFromJson(pvs); - Application.sessionManager.clearPvs(session, pvSet); + Application.sessionManager.removePvs(session, pvSet); } else { LOGGER.log(Level.WARNING, "Unknown client request: {0}", message); } diff --git a/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java b/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java index 5516a1f..5465dee 100644 --- a/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java +++ b/src/main/java/org/jlab/epics2web/websocket/WebSocketSessionManager.java @@ -148,14 +148,8 @@ public void recordInteractionDate(Session session) { * @param session The session (client) to manage */ public void addClient(Session session) { - WebSocketSessionMonitor listener = getListener(session); - - try { - Application.channelManager.addListener(listener); - } catch (InterruptedException | LockAcquisitionTimeoutException e) { - LOGGER.log(Level.WARNING, "Unable to addClient: " + session.getId(), e); - // TODO: Retry? - } + // Only a "real" client once actually monitoring something via addPvs() + // In other words, let's lazily create state only once needed } /** @@ -164,18 +158,7 @@ public void addClient(Session session) { * @param session The session (client) to remove */ public void removeClient(Session session) { - WebSocketSessionMonitor listener = listenerMap.get(session); - - if (listener != null) { - listenerMap.remove(session); - - try { - Application.channelManager.removeListener(listener); - } catch (LockAcquisitionTimeoutException | InterruptedException e) { - LOGGER.log(Level.WARNING, "Unable to removeClient: " + session.getId(), e); - // TODO: Retry? - } - } + removePvs(session, null); } /** @@ -187,28 +170,57 @@ public void removeClient(Session session) { public void addPvs(Session session, Set pvSet) { WebSocketSessionMonitor listener = getListener(session); - try { - Application.channelManager.addPvs(listener, pvSet); - } catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) { - LOGGER.log(Level.WARNING, "Unable to addPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e); - // TODO: Retry? + if (pvSet != null) { + // Make sure empty string isn't included as a PV as that is invalid and is ignored + boolean emptyIncluded = pvSet.remove(""); + + if (emptyIncluded) { + LOGGER.log(Level.FINEST, "Empty string ignored in add PV request"); + } + + for (String pv : pvSet) { + try { + Application.channelManager.addPv(listener, pv); + } catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) { + LOGGER.log(Level.WARNING, "Unable to addPv: " + pv, e); + // TODO: Retry? + } + } } } /** - * Stop monitoring the provided PVs for the specified client. + * Stop monitoring the provided PVs for the specified client. Completely remove the session and all PVs by setting + * pvSet to null. * * @param session The client session - * @param pvSet The set of PVs + * @param pvSet The set of PVs. Remove all if pvSet is null */ - public void clearPvs(Session session, Set pvSet) { + public void removePvs(Session session, Set pvSet) { WebSocketSessionMonitor listener = getListener(session); - try { - Application.channelManager.clearPvs(listener, pvSet); - } catch (InterruptedException | LockAcquisitionTimeoutException e) { - LOGGER.log(Level.WARNING, "Unable to clearPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e); - // TODO: Retry? + if (pvSet != null) { + // Make sure empty string isn't included as a PV as that is invalid and is ignored + boolean emptyIncluded = pvSet.remove(""); + + if (emptyIncluded) { + LOGGER.log(Level.FINEST, "Empty string ignored in remove PV request"); + } + + for (String pv : pvSet) { + try { + Application.channelManager.removePv(listener, pv); + } catch (InterruptedException | LockAcquisitionTimeoutException e) { + LOGGER.log(Level.WARNING, "Unable to removePv: " + pv, e); + // TODO: Retry? + } + } + } else { // pvSet == null (removeAll) + Map failed = Application.channelManager.removeAll(listener); + for(String pv: failed.keySet()) { + LOGGER.log(Level.WARNING, "Unable to (bulk) removePv: " + pv, failed.get(pv)); + // TODO: Retry? + } } }