Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine tune lock handling / timeout / async code #15

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 79 additions & 140 deletions src/main/java/org/jlab/epics2web/epics/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,18 @@ public class ChannelManager {
private final ScheduledExecutorService timeoutExecutor;
private final ExecutorService callbackExecutor;

private final ReentrantLock clientLock = new ReentrantLock();
private final ReentrantLock monitorLock = new ReentrantLock();
private final ReentrantLock managerLock = new ReentrantLock();

private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 5;
private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 60;
/**
* Overloaded server will reject create channel requests after 30 seconds; may also shake deadlock bug in
* CAJ createChannel.
*/
private final long ACQUIRE_RESOURCE_TIMEOUT_SECONDS = 30;

/**
* After 15 minutes we assume better to leak resource than stay stuck
*/
private final long CLEANUP_RESOURCE_TIMEOUT_SECONDS = 900;

/**
* Create a new ChannelMonitorManager.
Expand Down Expand Up @@ -178,175 +185,96 @@ private DBR doGet(CAJChannel channel, boolean enumLabel) throws CAException {
}

/**
* Registers PV monitors on the supplied PVs for the given listener. Note
* Registers a PV monitor on the supplied PV for the given listener. Note
* that internally only a single monitor is used for any given PV. PVs for
* which the given listener is already listening to are skipped (duplicate
* PVs are ignored). There is no need to call addListener before calling
* this method.
*
* @param listener The PvListener to receive notifications
* @param addPvSet The set of PVs to monitor
* @param pv The PV to monitor
*/
public void addPvs(PvListener listener, Set<String> addPvSet) throws InterruptedException, CAException, LockAcquisitionTimeoutException {
Set<String> newPvSet = new HashSet<>();

if (addPvSet != null) {
// Make sure empty string isn't included as a PV as that is invalid and is ignored
boolean emptyIncluded = addPvSet.remove("");

if (emptyIncluded) {
LOGGER.log(Level.FINEST, "Empty string ignored in add PV request");
}
public void addPv(PvListener listener, String pv) throws InterruptedException, CAException, LockAcquisitionTimeoutException {
LOGGER.log(Level.FINEST, "addPv: {0} {1}", new Object[] {listener, pv});
ChannelMonitor monitor = null;
monitor = monitorMap.get(pv);

newPvSet.addAll(addPvSet);

for (String pv : addPvSet) {
//LOGGER.log(Level.FINEST, "addListener pv: {0}; pv: {1}", new Object[]{session, pv});

ChannelMonitor monitor = null;

if(monitorLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
monitor = monitorMap.get(pv);

if (monitor == null) {
//LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv);
monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor);
monitorMap.put(pv, monitor);
} else {
//LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv);
}
} finally {
monitorLock.unlock();
}
// INTERNAL HOLDING LOCK
if(managerLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
if (monitor == null) {
//LOGGER.log(Level.FINEST, "Opening ChannelMonitor: {0}", pv);
// HERE IS THE HEAVYWEIGHT ACTION: It's an async create channel request, but is still
// bottleneck; We're holding a lock while we wait...
monitor = new ChannelMonitor(pv, context, timeoutExecutor, callbackExecutor);
monitorMap.put(pv, monitor);
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in addPvs");
//LOGGER.log(Level.FINEST, "Joining ChannelMonitor: {0}", pv);
}

if (monitor != null) {
monitor.addListener(listener);
}
}
}

if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> oldPvSet = clientMap.get(listener);
Set<String> clientPvSet = clientMap.get(listener);

if (oldPvSet != null) {
newPvSet.addAll(oldPvSet);
if (clientPvSet == null) {
clientPvSet = new HashSet<>();
}

clientMap.put(listener, newPvSet);
clientPvSet.add(pv);

clientMap.put(listener, clientPvSet);
} finally {
clientLock.unlock();
managerLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs");
throw new LockAcquisitionTimeoutException("Timeout while acquiring managerLock in addPv");
}

// EXTERNAL NO LOCK
monitor.addListener(listener);
}

/**
* Removes the supplied PVs from the given listener.
* Removes the PV from the given listener. If the last listener on a given channel the monitor is also removed.
*
* @param listener The PvListener
* @param clearPvSet The PV set to clear
* @param pv The PV to remove
*/
public void clearPvs(PvListener listener, Set<String> clearPvSet) throws InterruptedException, LockAcquisitionTimeoutException {
Set<String> newPvSet;
public void removePv(PvListener listener, String pv) throws InterruptedException, LockAcquisitionTimeoutException {
LOGGER.log(Level.FINEST, "removePv: {0} {1}", new Object[] {listener, pv});
int listenerCount = 0;
ChannelMonitor monitor = monitorMap.get(pv);

if(clientLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> oldPvSet = clientMap.get(listener);
if (monitor != null) {
monitor.removeListener(listener);
}

if (oldPvSet != null) {
newPvSet = new HashSet<>(oldPvSet);
newPvSet.removeAll(clearPvSet);
} else {
newPvSet = new HashSet<>();
// INTERNAL HOLDING LOCK
if(managerLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
if (monitor != null) {
listenerCount = monitor.getListenerCount();
if (listenerCount == 0) {
monitorMap.remove(pv);
}
clientMap.put(listener, newPvSet);
} finally {
clientLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addPvs");
}

removeFromChannels(listener, clearPvSet);
}

/**
* A convenience method to add a listener without registering any PVs to
* monitor. This is a rare use-case and is equivalent to calling addPvs with
* a null set of PVs.
*
* Allowing a listener without any PVs registered may be deprecated in the
* future.
*
* @param listener The PvListener
*/
public void addListener(PvListener listener) throws InterruptedException, LockAcquisitionTimeoutException {
if(clientLock.tryLock(ACQUIRE_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
Set<String> pvSet = clientMap.get(listener);
Set<String> clientPvSet = clientMap.get(listener);

if (pvSet == null) {
pvSet = new HashSet<>();
if (clientPvSet != null) {
clientPvSet.remove(pv);
}

clientMap.put(listener, pvSet);
} finally {
clientLock.unlock();
managerLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring clientLock in addListener");
throw new LockAcquisitionTimeoutException("Timeout while acquiring managerLock in removePv");
}
}

/**
* Removes a listener from channels and if no listeners remain on a given
* channel then closes the channel.
*
* @param listener The PvListener
* @param pvSet The PV list (and indirectly the channel list)
*/
private void removeFromChannels(PvListener listener, Set<String> pvSet) throws InterruptedException, LockAcquisitionTimeoutException {
if (pvSet != null) { // Some clients don't immediately connect to a pv so have an empty pv list
for (String pv : pvSet) {
int listenerCount = 0;

ChannelMonitor monitor = monitorMap.get(pv);

if (monitor != null) {
monitor.removeListener(listener);
}

if(monitorLock.tryLock(CLEANUP_RESOURCE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
try {
monitor = monitorMap.get(pv);

if (monitor != null) {
listenerCount = monitor.getListenerCount();
if (listenerCount == 0) {
monitorMap.remove(pv);
}
}
} finally {
monitorLock.unlock();
}
} else {
throw new LockAcquisitionTimeoutException("Timeout while acquiring monitorLock in removeFromChannels");
}

// We call close without holding a lock
if (monitor != null && listenerCount == 0) {
try {
monitor.close();
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Unable to close monitor", e);
}
}
// EXTERNAL NO LOCK
if (monitor != null && listenerCount == 0) {
try {
monitor.close();
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Unable to close monitor", e);
}
}
}
Expand All @@ -356,13 +284,24 @@ private void removeFromChannels(PvListener listener, Set<String> pvSet) throws I
* interested in.
*
* @param listener The PvListener
* @return a map of PV names to Exceptions for any PVs that were unable to be removed
*/
public void removeListener(PvListener listener) throws LockAcquisitionTimeoutException, InterruptedException {
//LOGGER.log(Level.FINEST, "removeListener: {0}", session);
public Map<String, Exception> removeAll(PvListener listener) {
LOGGER.log(Level.FINEST, "removeAll: {0}", listener);
Set<String> pvSet = clientMap.remove(listener);

// Don't do this while holding writeLock above since this method could be called by monitorChanged or from websocket close!
removeFromChannels(listener, pvSet);
Map<String, Exception> failed = new HashMap<>();
if(pvSet != null) {
for (String pv : pvSet) {
try {
removePv(listener, pv);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
failed.put(pv, e);
}
}
}

return failed;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public String onMessage(String message, Session session) {
JsonArray pvs = obj.getJsonArray("pvs");
Set<String> pvSet = Application.sessionManager.getPvSetFromJson(pvs);

Application.sessionManager.clearPvs(session, pvSet);
Application.sessionManager.removePvs(session, pvSet);
} else {
LOGGER.log(Level.WARNING, "Unknown client request: {0}", message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,8 @@ public void recordInteractionDate(Session session) {
* @param session The session (client) to manage
*/
public void addClient(Session session) {
WebSocketSessionMonitor listener = getListener(session);

try {
Application.channelManager.addListener(listener);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addClient: " + session.getId(), e);
// TODO: Retry?
}
// Only a "real" client once actually monitoring something via addPvs()
// In other words, let's lazily create state only once needed
}

/**
Expand All @@ -164,18 +158,7 @@ public void addClient(Session session) {
* @param session The session (client) to remove
*/
public void removeClient(Session session) {
WebSocketSessionMonitor listener = listenerMap.get(session);

if (listener != null) {
listenerMap.remove(session);

try {
Application.channelManager.removeListener(listener);
} catch (LockAcquisitionTimeoutException | InterruptedException e) {
LOGGER.log(Level.WARNING, "Unable to removeClient: " + session.getId(), e);
// TODO: Retry?
}
}
removePvs(session, null);
}

/**
Expand All @@ -187,28 +170,57 @@ public void removeClient(Session session) {
public void addPvs(Session session, Set<String> pvSet) {
WebSocketSessionMonitor listener = getListener(session);

try {
Application.channelManager.addPvs(listener, pvSet);
} catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e);
// TODO: Retry?
if (pvSet != null) {
// Make sure empty string isn't included as a PV as that is invalid and is ignored
boolean emptyIncluded = pvSet.remove("");

if (emptyIncluded) {
LOGGER.log(Level.FINEST, "Empty string ignored in add PV request");
}

for (String pv : pvSet) {
try {
Application.channelManager.addPv(listener, pv);
} catch (InterruptedException | CAException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to addPv: " + pv, e);
// TODO: Retry?
}
}
}
}

/**
* Stop monitoring the provided PVs for the specified client.
* Stop monitoring the provided PVs for the specified client. Completely remove the session and all PVs by setting
* pvSet to null.
*
* @param session The client session
* @param pvSet The set of PVs
* @param pvSet The set of PVs. Remove all if pvSet is null
*/
public void clearPvs(Session session, Set<String> pvSet) {
public void removePvs(Session session, Set<String> pvSet) {
WebSocketSessionMonitor listener = getListener(session);

try {
Application.channelManager.clearPvs(listener, pvSet);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to clearPvs: " + String.join(",", pvSet == null ? new HashSet<>() : pvSet), e);
// TODO: Retry?
if (pvSet != null) {
// Make sure empty string isn't included as a PV as that is invalid and is ignored
boolean emptyIncluded = pvSet.remove("");

if (emptyIncluded) {
LOGGER.log(Level.FINEST, "Empty string ignored in remove PV request");
}

for (String pv : pvSet) {
try {
Application.channelManager.removePv(listener, pv);
} catch (InterruptedException | LockAcquisitionTimeoutException e) {
LOGGER.log(Level.WARNING, "Unable to removePv: " + pv, e);
// TODO: Retry?
}
}
} else { // pvSet == null (removeAll)
Map<String, Exception> failed = Application.channelManager.removeAll(listener);
for(String pv: failed.keySet()) {
LOGGER.log(Level.WARNING, "Unable to (bulk) removePv: " + pv, failed.get(pv));
// TODO: Retry?
}
}
}

Expand Down
Loading