From 023ac3f598702a31e129ad473925a96e962fdd46 Mon Sep 17 00:00:00 2001 From: georgweiss Date: Tue, 12 Dec 2023 08:25:14 +0100 Subject: [PATCH 1/2] Handle alarm actions to show error message --- .../applications/alarm/client/AlarmClient.java | 12 ++++++------ .../org/phoebus/applications/alarm/ui/Messages.java | 2 ++ .../alarm/ui/tree/EnableComponentAction.java | 11 ++++++++++- .../applications/alarm/ui/messages.properties | 2 ++ 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java index 44ca4c0426..16707d34cf 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java @@ -166,7 +166,7 @@ public void setMode(final boolean maintenance) { final String json = new String (JsonModelWriter.commandToBytes(cmd)); final ProducerRecord record = new ProducerRecord<>(command_topic, AlarmSystem.COMMAND_PREFIX + root.getPathName(), json); - producer.send(record); + producer.send(record).get(); } catch (final Exception ex) { @@ -182,7 +182,7 @@ public void setNotify(final boolean disable_notify) { final String json = new String (JsonModelWriter.commandToBytes(cmd)); final ProducerRecord record = new ProducerRecord<>(command_topic, AlarmSystem.COMMAND_PREFIX + root.getPathName(), json); - producer.send(record); + producer.send(record).get(); } catch (final Exception ex) { @@ -518,7 +518,7 @@ public void sendItemConfigurationUpdate(final String path, final AlarmTreeItem record = new ProducerRecord<>(config_topic, AlarmSystem.CONFIG_PREFIX + path, json); - producer.send(record); + producer.send(record).get(); } /** Remove a component (and sub-items) from alarm tree @@ -542,10 +542,10 @@ public void removeComponent(final AlarmTreeItem item) throws Exception // The id message must arrive before the tombstone. final String json = new String(JsonModelWriter.deleteMessageToBytes()); final ProducerRecord id = new ProducerRecord<>(config_topic, AlarmSystem.CONFIG_PREFIX + item.getPathName(), json); - producer.send(id); + producer.send(id).get(); final ProducerRecord tombstone = new ProducerRecord<>(config_topic, AlarmSystem.CONFIG_PREFIX + item.getPathName(), null); - producer.send(tombstone); + producer.send(tombstone).get(); } catch (Exception ex) { @@ -563,7 +563,7 @@ public void acknowledge(final AlarmTreeItem item, final boolean acknowledge) final String cmd = acknowledge ? "acknowledge" : "unacknowledge"; final String json = new String (JsonModelWriter.commandToBytes(cmd)); final ProducerRecord record = new ProducerRecord<>(command_topic, AlarmSystem.COMMAND_PREFIX + item.getPathName(), json); - producer.send(record); + producer.send(record).get(); } catch (final Exception ex) { diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/Messages.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/Messages.java index 389fb7e00a..f36654a84d 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/Messages.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/Messages.java @@ -15,6 +15,8 @@ public class Messages public static String acknowledgeFailed; public static String addComponentFailed; + public static String disableAlarmFailed; + public static String enableAlarmFailed; public static String moveItemFailed; public static String removeComponentFailed; public static String renameItemFailed; diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/EnableComponentAction.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/EnableComponentAction.java index 308cbe0a6a..ea64bdc4c0 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/EnableComponentAction.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/EnableComponentAction.java @@ -15,8 +15,10 @@ import org.phoebus.applications.alarm.client.AlarmClientLeaf; import org.phoebus.applications.alarm.model.AlarmTreeItem; import org.phoebus.applications.alarm.ui.AlarmUI; +import org.phoebus.applications.alarm.ui.Messages; import org.phoebus.framework.jobs.JobManager; import org.phoebus.ui.dialog.DialogHelper; +import org.phoebus.ui.dialog.ExceptionDetailsErrorDialog; import org.phoebus.ui.javafx.ImageCache; import javafx.scene.Node; @@ -84,7 +86,14 @@ public EnableComponentAction(final Node node, final AlarmClient model, final Lis { final AlarmClientLeaf copy = pv.createDetachedCopy(); if (copy.setEnabled(doEnable())) - model.sendItemConfigurationUpdate(pv.getPathName(), copy); + try { + model.sendItemConfigurationUpdate(pv.getPathName(), copy); + } catch (Exception e) { + ExceptionDetailsErrorDialog.openError(Messages.error, + copy.isEnabled() ? Messages.enableAlarmFailed : Messages.disableAlarmFailed, + e); + throw e; + } } }); }); diff --git a/app/alarm/ui/src/main/resources/org/phoebus/applications/alarm/ui/messages.properties b/app/alarm/ui/src/main/resources/org/phoebus/applications/alarm/ui/messages.properties index 998f2ba791..2ed7ab7472 100644 --- a/app/alarm/ui/src/main/resources/org/phoebus/applications/alarm/ui/messages.properties +++ b/app/alarm/ui/src/main/resources/org/phoebus/applications/alarm/ui/messages.properties @@ -20,6 +20,8 @@ error=Error acknowledgeFailed=Failed to acknowledge alarm(s) addComponentFailed=Failed to add component +disableAlarmFailed=Failed to disable alarm +enableAlarmFailed=Failed to enable alarm moveItemFailed=Failed to move item removeComponentFailed=Failed to remove component renameItemFailed=Failed to rename item From 4b4539d8862073169ab7874d3a919ad35f918bc2 Mon Sep 17 00:00:00 2001 From: georgweiss Date: Thu, 14 Dec 2023 10:45:20 +0100 Subject: [PATCH 2/2] Additional changes to handle alarm (Kafka) client errors --- .../org/phoebus/pv/alarm/AlarmContext.java | 23 +- .../applications/alarm/AlarmSystem.java | 2 + .../alarm/client/AlarmClient.java | 554 +++++++++--------- .../alarm/client/KafkaHelper.java | 8 +- .../resources/alarm_preferences.properties | 5 +- .../applications/alarm/ui/AlarmUI.java | 2 +- .../alarm/ui/table/AlarmTableUI.java | 25 +- .../alarm/ui/tree/DuplicatePVAction.java | 16 +- .../alarm/ui/tree/ItemConfigDialog.java | 105 ++-- .../alarm/server/ServerModel.java | 23 +- 10 files changed, 394 insertions(+), 369 deletions(-) diff --git a/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java b/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java index c4b077d009..3c9547876c 100644 --- a/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java +++ b/app/alarm/datasource/src/main/java/org/phoebus/pv/alarm/AlarmContext.java @@ -116,11 +116,14 @@ public static synchronized void acknowledgePV(AlarmPV alarmPV, boolean ack) } if (node != null) { - try { - alarmModels.get(alarmPV.getInfo().getRoot()).acknowledge(node, ack); - } catch (Exception e) { - logger.log(Level.WARNING, "Failed to acknowledge alarm", e); - } + AlarmTreeItem alarmClientNode = node; + JobManager.schedule("Acknowledge/unacknowledge alarm", monitor -> { + try { + alarmModels.get(alarmPV.getInfo().getRoot()).acknowledge(alarmClientNode, ack); + } catch (Exception e) { + logger.log(Level.WARNING, "Failed to acknowledge alarm", e); + } + }); } } } @@ -153,8 +156,14 @@ public static synchronized void enablePV(AlarmPV alarmPV, boolean enable) for (AlarmClientLeaf pv : pvs) { final AlarmClientLeaf copy = pv.createDetachedCopy(); - if (copy.setEnabled(enable)) - alarmModels.get(alarmPV.getInfo().getRoot()).sendItemConfigurationUpdate(pv.getPathName(), copy); + if (copy.setEnabled(enable)){ + try { + alarmModels.get(alarmPV.getInfo().getRoot()).sendItemConfigurationUpdate(pv.getPathName(), copy); + } catch (Exception e) { + logger.log(Level.WARNING, "Failed to send item configuration update to " + alarmPV.getInfo().getRoot(), e); + throw e; + } + } } }); } diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java index ca784cb6b4..07d64c9706 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/AlarmSystem.java @@ -118,6 +118,8 @@ public class AlarmSystem extends AlarmSystemConstants /** "Disable until.." shortcuts */ @Preference public static String[] shelving_options; + @Preference public static int max_block_ms; + /** Macros used in UI display/command/web links */ public static MacroValueProvider macros; diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java index 16707d34cf..80d60fa470 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/AlarmClient.java @@ -7,18 +7,6 @@ *******************************************************************************/ package org.phoebus.applications.alarm.client; -import static org.phoebus.applications.alarm.AlarmSystem.logger; - -import java.time.Duration; -import java.time.Instant; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -33,71 +21,111 @@ import org.phoebus.applications.alarm.model.json.JsonTags; import org.phoebus.util.time.TimestampFormats; -/** Alarm client model +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; + +import static org.phoebus.applications.alarm.AlarmSystem.logger; + +/** + * Alarm client model * - *

Given an alarm configuration name like "Accelerator", - * subscribes to the "Accelerator" topic for configuration updates - * and the "AcceleratorState" topic for alarm state updates. + *

Given an alarm configuration name like "Accelerator", + * subscribes to the "Accelerator" topic for configuration updates + * and the "AcceleratorState" topic for alarm state updates. * - *

Updates from either topic are merged into an in-memory model - * of the complete alarm information, - * updating listeners with all changes. + *

Updates from either topic are merged into an in-memory model + * of the complete alarm information, + * updating listeners with all changes. * - * @author Kay Kasemir + * @author Kay Kasemir */ @SuppressWarnings("nls") -public class AlarmClient -{ - /** Kafka topics for config/status and commands */ +public class AlarmClient { + /** + * Kafka topics for config/status and commands + */ private final String config_topic, command_topic; - /** Listeners to this client */ + /** + * Listeners to this client + */ private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); - /** Alarm tree root */ + /** + * Alarm tree root + */ private final AlarmClientNode root; - /** Alarm tree Paths that have been deleted. + /** + * Timeout in seconds waiting for response from Kafka when sending producer messages. + */ + private static final int KAFKA_CLIENT_TIMEOUT = 10; + + /** + * Alarm tree Paths that have been deleted. * - *

Used to distinguish between paths that are not in the alarm tree - * because we have never seen a config or status update for them, - * and entries that have been deleted, so further state updates - * should be ignored until the item is again added (config message). + *

Used to distinguish between paths that are not in the alarm tree + * because we have never seen a config or status update for them, + * and entries that have been deleted, so further state updates + * should be ignored until the item is again added (config message). */ private final Set deleted_paths = ConcurrentHashMap.newKeySet(); - /** Flag for message handling thread to run or exit */ + /** + * Flag for message handling thread to run or exit + */ private final AtomicBoolean running = new AtomicBoolean(true); - /** Currently in maintenance mode? */ + /** + * Currently in maintenance mode? + */ private final AtomicBoolean maintenance_mode = new AtomicBoolean(false); - /** Currently in silent mode? */ + /** + * Currently in silent mode? + */ private final AtomicBoolean disable_notify = new AtomicBoolean(false); - /** Kafka consumer */ + /** + * Kafka consumer + */ private final Consumer consumer; - /** Kafka producer */ + /** + * Kafka producer + */ private final Producer producer; - /** Message handling thread */ + /** + * Message handling thread + */ private final Thread thread; - /** Time of last state update (ms), - * used to determine timeout + /** + * Time of last state update (ms), + * used to determine timeout */ private long last_state_update = 0; - /** Timeout, not seen any messages from server? */ + /** + * Timeout, not seen any messages from server? + */ private volatile boolean has_timed_out = false; - /** @param server Kafka Server host:port - * @param config_name Name of alarm tree root - * @param kafka_properties_file File to load additional kafka properties from + /** + * @param server Kafka Server host:port + * @param config_name Name of alarm tree root + * @param kafka_properties_file File to load additional kafka properties from */ - public AlarmClient(final String server, final String config_name, final String kafka_properties_file) - { + public AlarmClient(final String server, final String config_name, final String kafka_properties_file) { Objects.requireNonNull(server); Objects.requireNonNull(config_name); @@ -113,116 +141,125 @@ public AlarmClient(final String server, final String config_name, final String k thread.setDaemon(true); } - /** @param listener Listener to add */ - public void addListener(final AlarmClientListener listener) - { + /** + * @param listener Listener to add + */ + public void addListener(final AlarmClientListener listener) { listeners.add(listener); } - /** @param listener Listener to remove */ - public void removeListener(final AlarmClientListener listener) - { - if (! listeners.remove(listener)) + /** + * @param listener Listener to remove + */ + public void removeListener(final AlarmClientListener listener) { + if (!listeners.remove(listener)) throw new IllegalStateException("Unknown listener"); } - /** Start client - * @see #shutdown() + /** + * Start client + * + * @see #shutdown() */ - public void start() - { + public void start() { thread.start(); } - /** @return true if start() had been called */ - public boolean isRunning() - { + /** + * @return true if start() had been called + */ + public boolean isRunning() { return thread.isAlive(); } - /** @return Root of alarm configuration */ - public AlarmClientNode getRoot() - { + /** + * @return Root of alarm configuration + */ + public AlarmClientNode getRoot() { return root; } - /** @return Is alarm server in maintenance mode? */ - public boolean isMaintenanceMode() - { + /** + * @return Is alarm server in maintenance mode? + */ + public boolean isMaintenanceMode() { return maintenance_mode.get(); } - /** @return Is alarm server in disable notify mode? */ - public boolean isDisableNotify() - { + /** + * @return Is alarm server in disable notify mode? + */ + public boolean isDisableNotify() { return disable_notify.get(); } - /** @param maintenance Select maintenance mode? */ - public void setMode(final boolean maintenance) - { + /** + * Client code must not call this on the UI thread as it may block up to {@link #KAFKA_CLIENT_TIMEOUT} seconds. + * + * @param maintenance Select maintenance mode? + * @throws Exception if Kafka interaction fails for any reason. + */ + public void setMode(final boolean maintenance) throws Exception { final String cmd = maintenance ? JsonTags.MAINTENANCE : JsonTags.NORMAL; - try - { - final String json = new String (JsonModelWriter.commandToBytes(cmd)); + try { + final String json = new String(JsonModelWriter.commandToBytes(cmd)); final ProducerRecord record = new ProducerRecord<>(command_topic, AlarmSystem.COMMAND_PREFIX + root.getPathName(), json); - producer.send(record).get(); - } - catch (final Exception ex) - { + producer.send(record).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); + } catch (final Exception ex) { logger.log(Level.WARNING, "Cannot set mode for " + root + " to " + cmd, ex); + throw ex; } } - /** @param disable_notify Select notify disable ? */ - public void setNotify(final boolean disable_notify) - { + /** + * Client must not call this on the UI thread as it may block up to {@link #KAFKA_CLIENT_TIMEOUT} seconds. + * + * @param disable_notify Select notify disable ? + * @throws Exception if Kafka interaction fails for any reason. + */ + public void setNotify(final boolean disable_notify) throws Exception { final String cmd = disable_notify ? JsonTags.DISABLE_NOTIFY : JsonTags.ENABLE_NOTIFY; - try - { - final String json = new String (JsonModelWriter.commandToBytes(cmd)); + try { + final String json = new String(JsonModelWriter.commandToBytes(cmd)); final ProducerRecord record = new ProducerRecord<>(command_topic, AlarmSystem.COMMAND_PREFIX + root.getPathName(), json); - producer.send(record).get(); - } - catch (final Exception ex) - { + producer.send(record).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); + } catch (final Exception ex) { logger.log(Level.WARNING, "Cannot set mode for " + root + " to " + cmd, ex); + throw ex; } } - /** Background thread loop that checks for alarm tree updates */ - private void run() - { + /** + * Background thread loop that checks for alarm tree updates + */ + private void run() { // Send an initial "no server" notification, // to be cleared once we receive data from server. checkServerState(); - try - { - while (running.get()) - { + try { + while (running.get()) { checkUpdates(); checkServerState(); } - } - catch (final Throwable ex) - { + } catch (final Throwable ex) { if (running.get()) logger.log(Level.SEVERE, "Alarm client model error", ex); // else: Intended shutdown - } - finally - { + } finally { consumer.close(); producer.close(); } } - /** Time spent in checkUpdates() waiting for, well, updates */ + /** + * Time spent in checkUpdates() waiting for, well, updates + */ private static final Duration POLL_PERIOD = Duration.ofMillis(100); - /** Perform one check for updates */ - private void checkUpdates() - { + /** + * Perform one check for updates + */ + private void checkUpdates() { // Check for messages, with timeout. // TODO Because of Kafka bug, this will hang if Kafka isn't running. // Fixed according to https://issues.apache.org/jira/browse/KAFKA-1894 , @@ -232,20 +269,20 @@ private void checkUpdates() handleUpdate(record); } - /** Handle one received update - * @param record Kafka record + /** + * Handle one received update + * + * @param record Kafka record */ - private void handleUpdate(final ConsumerRecord record) - { + private void handleUpdate(final ConsumerRecord record) { final int sep = record.key().indexOf(':'); - if (sep < 0) - { + if (sep < 0) { logger.log(Level.WARNING, "Invalid key, expecting type:path, got " + record.key()); return; } - final String type = record.key().substring(0, sep+1); - final String path = record.key().substring(sep+1); + final String type = record.key().substring(0, sep + 1); + final String path = record.key().substring(sep + 1); final long timestamp = record.timestamp(); final String node_config = record.value(); @@ -253,34 +290,27 @@ private void handleUpdate(final ConsumerRecord record) logger.log(Level.WARNING, "Expect updates with CreateTime, got " + record.timestampType() + ": " + record.timestamp() + " " + path + " = " + node_config); logger.log(Level.FINE, () -> - record.topic() + " @ " + - TimestampFormats.MILLI_FORMAT.format(Instant.ofEpochMilli(timestamp)) + " " + - type + path + " = " + node_config); + record.topic() + " @ " + + TimestampFormats.MILLI_FORMAT.format(Instant.ofEpochMilli(timestamp)) + " " + + type + path + " = " + node_config); - try - { + try { // Only update listeners if the node changed AlarmTreeItem changed_node = null; final Object json = node_config == null ? null : JsonModelReader.parseJsonText(node_config); - if (type.equals(AlarmSystem.CONFIG_PREFIX)) - { - if (json == null) - { // No config -> Delete node + if (type.equals(AlarmSystem.CONFIG_PREFIX)) { + if (json == null) { // No config -> Delete node final AlarmTreeItem node = deleteNode(path); // If this was a known node, notify listeners - if (node != null) - { + if (node != null) { logger.log(Level.FINE, () -> "Delete " + path); for (final AlarmClientListener listener : listeners) listener.itemRemoved(node); } - } - else - { // Configuration update + } else { // Configuration update if (JsonModelReader.isStateUpdate(json)) logger.log(Level.WARNING, "Got config update with state content: " + record.key() + " " + node_config); - else - { + else { AlarmTreeItem node = findNode(path); // New node? Will need to send update. Otherwise update when there's a change if (node == null) @@ -289,27 +319,18 @@ private void handleUpdate(final ConsumerRecord record) changed_node = node; } } - } - else if (type.equals(AlarmSystem.STATE_PREFIX)) - { // State update - if (json == null) - { // State update for deleted node, ignore + } else if (type.equals(AlarmSystem.STATE_PREFIX)) { // State update + if (json == null) { // State update for deleted node, ignore logger.log(Level.FINE, () -> "Got state update for deleted node: " + record.key() + " " + node_config); return; - } - else if (! JsonModelReader.isStateUpdate(json)) - { + } else if (!JsonModelReader.isStateUpdate(json)) { logger.log(Level.WARNING, "Got state update with config content: " + record.key() + " " + node_config); return; - } - else if (deleted_paths.contains(path)) - { + } else if (deleted_paths.contains(path)) { // It it _deleted_?? logger.log(Level.FINE, () -> "Ignoring state for deleted item: " + record.key() + " " + node_config); return; - } - else - { + } else { AlarmTreeItem node = findNode(path); // New node? Create, and remember to notify if (node == null) @@ -334,40 +355,36 @@ else if (deleted_paths.contains(path)) // else: Neither config nor state update; ignore. // If there were changes, notify listeners - if (changed_node != null) - { + if (changed_node != null) { logger.log(Level.FINE, "Update " + path + " to " + changed_node.getState()); for (final AlarmClientListener listener : listeners) listener.itemUpdated(changed_node); } - } - catch (final Exception ex) - { + } catch (final Exception ex) { logger.log(Level.WARNING, - "Alarm config update error for path " + path + - ", config " + node_config, ex); + "Alarm config update error for path " + path + + ", config " + node_config, ex); } } - /** Find existing node + /** + * Find existing node * - * @param path Path to node - * @return Node, null if model does not contain the node - * @throws Exception on error + * @param path Path to node + * @return Node, null if model does not contain the node + * @throws Exception on error */ - private AlarmTreeItem findNode(final String path) throws Exception - { + private AlarmTreeItem findNode(final String path) throws Exception { final String[] path_elements = AlarmTreePath.splitPath(path); // Start of path must match the alarm tree root - if (path_elements.length < 1 || - !root.getName().equals(path_elements[0])) + if (path_elements.length < 1 || + !root.getName().equals(path_elements[0])) throw new Exception("Invalid path for alarm configuration " + root.getName() + ": " + path); // Walk down the path AlarmTreeItem node = root; - for (int i=1; i findNode(final String path) throws Exception return node; } - /** Delete node + /** + * Delete node * - *

It's OK to try delete an unknown node: - * The node might have once existed, but was then deleted. - * The last entry in the configuration database is then the deletion hint. - * A new model that reads this node-to-delete information - * thus never knew the node. + *

It's OK to try delete an unknown node: + * The node might have once existed, but was then deleted. + * The last entry in the configuration database is then the deletion hint. + * A new model that reads this node-to-delete information + * thus never knew the node. * - * @param path Path to node to delete - * @return Node that was removed, or null if model never knew that node - * @throws Exception on error + * @param path Path to node to delete + * @return Node that was removed, or null if model never knew that node + * @throws Exception on error */ - private AlarmTreeItem deleteNode(final String path) throws Exception - { + private AlarmTreeItem deleteNode(final String path) throws Exception { // Mark path as deleted so we ignore state updates deleted_paths.add(path); @@ -402,49 +419,44 @@ private AlarmTreeItem deleteNode(final String path) throws Exception return node; } - /** Find an existing alarm tree item or create a new one + /** + * Find an existing alarm tree item or create a new one * - *

Informs listener about created nodes, - * if necessary one notification for each created node along the path. + *

Informs listener about created nodes, + * if necessary one notification for each created node along the path. * - * @param path Alarm tree path - * @param is_leaf Is this the path to a leaf? - * @return {@link AlarmTreeItem} - * @throws Exception on error + * @param path Alarm tree path + * @param is_leaf Is this the path to a leaf? + * @return {@link AlarmTreeItem} + * @throws Exception on error */ - private AlarmTreeItem findOrCreateNode(final String path, final boolean is_leaf) throws Exception - { + private AlarmTreeItem findOrCreateNode(final String path, final boolean is_leaf) throws Exception { // In case it was previously deleted: deleted_paths.remove(path); final String[] path_elements = AlarmTreePath.splitPath(path); // Start of path must match the alarm tree root - if (path_elements.length < 1 || - !root.getName().equals(path_elements[0])) + if (path_elements.length < 1 || + !root.getName().equals(path_elements[0])) throw new Exception("Invalid path for alarm configuration " + root.getName() + ": " + path); // Walk down the path AlarmClientNode parent = root; - for (int i=1; i node = parent.getChild(name); // Create missing nodes - if (node == null) - { // Done when creating leaf - if (last && is_leaf) - { + if (node == null) { // Done when creating leaf + if (last && is_leaf) { node = new AlarmClientLeaf(parent.getPathName(), name); node.addToParent(parent); logger.log(Level.FINE, "Create " + path); for (final AlarmClientListener listener : listeners) listener.itemAdded(node); return node; - } - else - { + } else { node = new AlarmClientNode(parent.getPathName(), name); node.addToParent(parent); for (final AlarmClientListener listener : listeners) @@ -455,10 +467,10 @@ private AlarmTreeItem findOrCreateNode(final String path, final boolean is_le if (last) return node; // Found or created intermediate node; continue walking down the path - if (! (node instanceof AlarmClientNode)) + if (!(node instanceof AlarmClientNode)) throw new Exception("Expected intermediate node, found " + - node.getClass().getSimpleName() + " " + node.getName() + - " while traversing " + path); + node.getClass().getSimpleName() + " " + node.getName() + + " while traversing " + path); parent = (AlarmClientNode) node; } @@ -466,73 +478,69 @@ private AlarmTreeItem findOrCreateNode(final String path, final boolean is_le return parent; } - /** Add a component to the alarm tree - * @param path_name to parent Root or parent component under which to add the component - * @param new_name Name of the new component + /** + * Add a component to the alarm tree + * + * @param path_name to parent Root or parent component under which to add the component + * @param new_name Name of the new component */ - public void addComponent(final String path_name, final String new_name) throws Exception - { - try - { + public void addComponent(final String path_name, final String new_name) { + try { sendNewItemInfo(path_name, new_name, new AlarmClientNode(null, new_name)); - } - catch (final Exception ex) - { + } catch (final Exception ex) { logger.log(Level.WARNING, "Cannot add component " + new_name + " to " + path_name, ex); } } - /** Add a component to the alarm tree - * @param path_name to parent Root or parent component under which to add the component - * @param new_name Name of the new component + /** + * Add a component to the alarm tree + * + * @param path_name to parent Root or parent component under which to add the component + * @param new_name Name of the new component */ - public void addPV(final String path_name, final String new_name) - { - try - { + public void addPV(final String path_name, final String new_name) { + try { sendNewItemInfo(path_name, new_name, new AlarmClientLeaf(null, new_name)); - } - catch (final Exception ex) - { + } catch (final Exception ex) { logger.log(Level.WARNING, "Cannot add pv " + new_name + " to " + path_name, ex); } } - private void sendNewItemInfo(String path_name, final String new_name, final AlarmTreeItem content) throws Exception - { + private void sendNewItemInfo(String path_name, final String new_name, final AlarmTreeItem content) throws Exception { // Send message about new component. // All clients, including this one, will receive and then add the new component. final String new_path = AlarmTreePath.makePath(path_name, new_name); sendItemConfigurationUpdate(new_path, content); } - /** Send item configuration - * - *

All clients, including this one, will update when they receive the message + /** + * Client code must not call this on the UI thread as it may block up to {@link #KAFKA_CLIENT_TIMEOUT} seconds. + * Send item configuration. + *

All clients, including this one, will update when they receive the message * - * @param path Path to the item - * @param config A prototype item (path is ignored) that holds the new configuration - * @throws Exception on error + * @param path Path to the item + * @param config A prototype item (path is ignored) that holds the new configuration + * @throws Exception on error */ - public void sendItemConfigurationUpdate(final String path, final AlarmTreeItem config) throws Exception - { + public void sendItemConfigurationUpdate(final String path, final AlarmTreeItem config) throws Exception { final String json = new String(JsonModelWriter.toJsonBytes(config)); final ProducerRecord record = new ProducerRecord<>(config_topic, AlarmSystem.CONFIG_PREFIX + path, json); - producer.send(record).get(); + producer.send(record).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); } - /** Remove a component (and sub-items) from alarm tree - * @param item Item to remove - * @throws Exception on error + /** + * Client must should not call this on the UI thread as it may block up to 2x{@link #KAFKA_CLIENT_TIMEOUT} seconds. + * Remove a component (and sub-items) from alarm tree + * + * @param item Item to remove + * @throws Exception on error */ - public void removeComponent(final AlarmTreeItem item) throws Exception - { - try - { - // Depth first deletion of all child nodes. - final List> children = item.getChildren(); - for (final AlarmTreeItem child : children) - removeComponent(child); + public void removeComponent(final AlarmTreeItem item) throws Exception { + try { + // Depth first deletion of all child nodes. + final List> children = item.getChildren(); + for (final AlarmTreeItem child : children) + removeComponent(child); // Send message about item to remove // All clients, including this one, will receive and then remove the item. @@ -542,75 +550,67 @@ public void removeComponent(final AlarmTreeItem item) throws Exception // The id message must arrive before the tombstone. final String json = new String(JsonModelWriter.deleteMessageToBytes()); final ProducerRecord id = new ProducerRecord<>(config_topic, AlarmSystem.CONFIG_PREFIX + item.getPathName(), json); - producer.send(id).get(); + producer.send(id).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); final ProducerRecord tombstone = new ProducerRecord<>(config_topic, AlarmSystem.CONFIG_PREFIX + item.getPathName(), null); - producer.send(tombstone).get(); - } - catch (Exception ex) - { + producer.send(tombstone).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); + } catch (Exception ex) { throw new Exception("Error deleting " + item.getPathName(), ex); } } - /** @param item Item for which to acknowledge alarm - * @param acknowledge true to acknowledge, else un-acknowledge + /** + * Client must should not call this on the UI thread as it may block up to {@link #KAFKA_CLIENT_TIMEOUT} seconds. + * + * @param item Item for which to acknowledge alarm + * @param acknowledge true to acknowledge, else un-acknowledge */ - public void acknowledge(final AlarmTreeItem item, final boolean acknowledge) throws Exception - { - try - { + public void acknowledge(final AlarmTreeItem item, final boolean acknowledge) throws Exception { + try { final String cmd = acknowledge ? "acknowledge" : "unacknowledge"; - final String json = new String (JsonModelWriter.commandToBytes(cmd)); + final String json = new String(JsonModelWriter.commandToBytes(cmd)); final ProducerRecord record = new ProducerRecord<>(command_topic, AlarmSystem.COMMAND_PREFIX + item.getPathName(), json); - producer.send(record).get(); - } - catch (final Exception ex) - { + producer.send(record).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); + } catch (final Exception ex) { logger.log(Level.WARNING, "Cannot acknowledge component " + item, ex); throw ex; } } - /** @return true if connected to server, else updates have timed out */ - public boolean isServerAlive() - { + /** + * @return true if connected to server, else updates have timed out + */ + public boolean isServerAlive() { return !has_timed_out; } - /** Check if there have been any messages from server */ - private void checkServerState() - { + /** + * Check if there have been any messages from server + */ + private void checkServerState() { final long now = System.currentTimeMillis(); - if (now - last_state_update > AlarmSystem.idle_timeout_ms*3) - { - if (! has_timed_out) - { + if (now - last_state_update > AlarmSystem.idle_timeout_ms * 3) { + if (!has_timed_out) { has_timed_out = true; for (final AlarmClientListener listener : listeners) listener.serverStateChanged(false); } + } else if (has_timed_out) { + has_timed_out = false; + for (final AlarmClientListener listener : listeners) + listener.serverStateChanged(true); } - else - if (has_timed_out) - { - has_timed_out = false; - for (final AlarmClientListener listener : listeners) - listener.serverStateChanged(true); - } } - /** Stop client */ - public void shutdown() - { + /** + * Stop client + */ + public void shutdown() { running.set(false); consumer.wakeup(); - try - { + try { thread.join(2000); - } - catch (final InterruptedException ex) - { + } catch (final InterruptedException ex) { logger.log(Level.WARNING, thread.getName() + " thread doesn't shut down", ex); } logger.log(Level.INFO, () -> thread.getName() + " shut down"); diff --git a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java index 1152df1edd..17fd53d4a3 100644 --- a/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java +++ b/app/alarm/model/src/main/java/org/phoebus/applications/alarm/client/KafkaHelper.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.phoebus.applications.alarm.AlarmSystem; /** Alarm client model * @@ -117,13 +118,12 @@ public static Producer connectProducer(final String kafka_server kafka_props.put("bootstrap.servers", kafka_servers); // Collect messages for 20ms until sending them out as a batch kafka_props.put("linger.ms", 20); + kafka_props.put("max.block.ms", AlarmSystem.max_block_ms == 0 ? 10000 : AlarmSystem.max_block_ms); // Write String key, value final Serializer serializer = new StringSerializer(); - final Producer producer = new KafkaProducer<>(kafka_props, serializer, serializer); - - return producer; + return new KafkaProducer<>(kafka_props, serializer, serializer); } /** @@ -162,7 +162,7 @@ static public Properties loadPropsFromFile(String filePath) { logger.fine("loading file from path: " + filePath); Properties properties = new Properties(); if(filePath != null && !filePath.isBlank()){ - try(FileInputStream file = new FileInputStream(filePath);){ + try(FileInputStream file = new FileInputStream(filePath)){ properties.load(file); } catch(IOException e) { logger.log(Level.SEVERE, "failed to load kafka properties", e); diff --git a/app/alarm/model/src/main/resources/alarm_preferences.properties b/app/alarm/model/src/main/resources/alarm_preferences.properties index bf46de2674..d2ba480fed 100644 --- a/app/alarm/model/src/main/resources/alarm_preferences.properties +++ b/app/alarm/model/src/main/resources/alarm_preferences.properties @@ -5,7 +5,7 @@ # Kafka Server host:port server=localhost:9092 -# A file to configure the properites of kafka clients +# A file to configure the properties of kafka clients kafka_properties= # Name of alarm tree root. @@ -140,3 +140,6 @@ shelving_options=1 hour, 6 hours, 12 hours, 1 day, 7 days, 30 days # # Format: M1=Value1, M2=Value2 macros=TOP=/home/controls/displays,WEBROOT=http://localhost/controls/displays + +# Max time in ms a producer call will block. +max_block_ms=10000 \ No newline at end of file diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/AlarmUI.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/AlarmUI.java index d4ba2afc16..e62af387bd 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/AlarmUI.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/AlarmUI.java @@ -200,7 +200,7 @@ public static Color getAlarmAreaPanelBackgroundColor(final SeverityLevel severit /** Verify authorization, qualified by model's current config * @param model Alarm client model - * @param auto Authorization name + * @param authorization Authorization name * @return true if the user has authorization */ private static boolean haveQualifiedAuthorization(final AlarmClient model, final String authorization) diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableUI.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableUI.java index 85552c12a8..c07c48981b 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableUI.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/table/AlarmTableUI.java @@ -17,6 +17,7 @@ import java.util.logging.Level; import java.util.regex.Pattern; +import javafx.application.Platform; import javafx.scene.layout.Background; import javafx.scene.layout.BorderPane; import javafx.scene.layout.Priority; @@ -28,12 +29,14 @@ import org.phoebus.applications.alarm.ui.AlarmContextMenuHelper; import org.phoebus.applications.alarm.ui.AlarmUI; import org.phoebus.applications.alarm.ui.tree.ConfigureComponentAction; +import org.phoebus.framework.jobs.Job; import org.phoebus.framework.jobs.JobManager; import org.phoebus.framework.persistence.Memento; import org.phoebus.framework.selection.Selection; import org.phoebus.framework.selection.SelectionService; import org.phoebus.ui.application.ContextMenuService; import org.phoebus.ui.application.SaveSnapshotAction; +import org.phoebus.ui.dialog.ExceptionDetailsErrorDialog; import org.phoebus.ui.javafx.Brightness; import org.phoebus.ui.javafx.ClearingTextField; import org.phoebus.ui.javafx.ImageCache; @@ -289,7 +292,17 @@ ToolBar getToolbar() private ToolBar createToolbar() { setMaintenanceMode(false); - server_mode.setOnAction(event -> client.setMode(! client.isMaintenanceMode())); + server_mode.setOnAction(event -> { + JobManager.schedule(client.isMaintenanceMode() ? "Disable maintenance mode" : "Enable maintenance mode", + monitor -> { + try { + client.setMode(! client.isMaintenanceMode()); + } catch (Exception e) { + Platform.runLater(() -> ExceptionDetailsErrorDialog.openError(e.getMessage(), e)); + } + }); + + }); // Could 'bind', // server_mode.disableProperty().bind(new SimpleBooleanProperty(!AlarmUI.mayModifyMode(client))); @@ -299,7 +312,15 @@ private ToolBar createToolbar() server_mode.setDisable(true); setDisableNotify(false); - server_notify.setOnAction(event -> client.setNotify(! client.isDisableNotify())); + server_notify.setOnAction(event -> { + JobManager.schedule(client.isDisableNotify() ? "Enable alarm notification" : "Disable alarm notification", monitor -> { + try { + client.setNotify(! client.isDisableNotify()); + } catch (Exception e) { + Platform.runLater(() -> ExceptionDetailsErrorDialog.openError(e.getMessage(), e)); + } + }); + }); if (!AlarmUI.mayDisableNotify(client)) server_notify.setDisable(true); diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/DuplicatePVAction.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/DuplicatePVAction.java index c447a0373b..5fff9d078a 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/DuplicatePVAction.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/DuplicatePVAction.java @@ -7,18 +7,23 @@ *******************************************************************************/ package org.phoebus.applications.alarm.ui.tree; +import javafx.application.Platform; import org.phoebus.applications.alarm.AlarmSystem; import org.phoebus.applications.alarm.client.AlarmClient; import org.phoebus.applications.alarm.client.AlarmClientLeaf; import org.phoebus.applications.alarm.model.AlarmTreePath; import org.phoebus.framework.jobs.JobManager; import org.phoebus.ui.dialog.DialogHelper; +import org.phoebus.ui.dialog.ExceptionDetailsErrorDialog; import org.phoebus.ui.javafx.ImageCache; import javafx.scene.Node; import javafx.scene.control.MenuItem; import javafx.scene.control.TextInputDialog; +import java.util.logging.Level; +import java.util.logging.Logger; + /** Action that adds duplicate of PV to alarm tree configuration * @author Kay Kasemir */ @@ -27,7 +32,7 @@ class DuplicatePVAction extends MenuItem { /** @param node Node to position dialog * @param model Model where new component is added - * @param parent Parent item in alarm tree + * @param original Item subject to copy */ public DuplicatePVAction(final Node node, final AlarmClient model, final AlarmClientLeaf original) { @@ -59,7 +64,14 @@ public DuplicatePVAction(final Node node, final AlarmClient model, final AlarmCl // Request adding new PV final String new_path = AlarmTreePath.makePath(original.getParent().getPathName(), new_name); - JobManager.schedule(getText(), monitor -> model.sendItemConfigurationUpdate(new_path, template)); + JobManager.schedule(getText(), monitor -> { + try { + model.sendItemConfigurationUpdate(new_path, template); + } catch (Exception e) { + Logger.getLogger(DuplicatePVAction.class.getName()).log(Level.WARNING, "Failed to send item configuration", e); + Platform.runLater(() -> ExceptionDetailsErrorDialog.openError(e.getMessage(), e)); + } + }); }); } } diff --git a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/ItemConfigDialog.java b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/ItemConfigDialog.java index 7793c653ca..1d824953a5 100644 --- a/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/ItemConfigDialog.java +++ b/app/alarm/ui/src/main/java/org/phoebus/applications/alarm/ui/tree/ItemConfigDialog.java @@ -7,50 +7,40 @@ *******************************************************************************/ package org.phoebus.applications.alarm.ui.tree; -import java.time.LocalDateTime; -import java.time.temporal.TemporalAmount; - -import org.phoebus.applications.alarm.AlarmSystem; -import org.phoebus.applications.alarm.client.AlarmClient; -import org.phoebus.applications.alarm.client.AlarmClientLeaf; -import org.phoebus.applications.alarm.client.AlarmClientNode; -import org.phoebus.applications.alarm.model.AlarmTreeItem; -import org.phoebus.applications.alarm.ui.tree.datetimepicker.DateTimePicker; -import org.phoebus.ui.dialog.ExceptionDetailsErrorDialog; -import org.phoebus.util.time.SecondsParser; -import org.phoebus.util.time.TimeParser; - import javafx.application.Platform; import javafx.beans.value.ChangeListener; import javafx.event.ActionEvent; import javafx.event.EventHandler; import javafx.geometry.Pos; -import javafx.scene.control.Button; -import javafx.scene.control.ButtonType; -import javafx.scene.control.CheckBox; -import javafx.scene.control.ComboBox; -import javafx.scene.control.Dialog; -import javafx.scene.control.Label; -import javafx.scene.control.ScrollPane; -import javafx.scene.control.Spinner; -import javafx.scene.control.TextField; -import javafx.scene.control.Tooltip; +import javafx.scene.control.*; import javafx.scene.layout.ColumnConstraints; import javafx.scene.layout.GridPane; import javafx.scene.layout.HBox; import javafx.scene.layout.Priority; import javafx.stage.Modality; import javafx.util.Duration; +import org.phoebus.applications.alarm.AlarmSystem; +import org.phoebus.applications.alarm.client.AlarmClient; +import org.phoebus.applications.alarm.client.AlarmClientLeaf; +import org.phoebus.applications.alarm.client.AlarmClientNode; +import org.phoebus.applications.alarm.model.AlarmTreeItem; +import org.phoebus.applications.alarm.ui.tree.datetimepicker.DateTimePicker; +import org.phoebus.ui.dialog.ExceptionDetailsErrorDialog; +import org.phoebus.util.time.SecondsParser; +import org.phoebus.util.time.TimeParser; +import java.time.LocalDateTime; +import java.time.temporal.TemporalAmount; -/** Dialog for editing {@link AlarmTreeItem} + +/** + * Dialog for editing {@link AlarmTreeItem} * - *

When pressing "OK", dialog sends updated - * configuration. + *

When pressing "OK", dialog sends updated + * configuration. */ @SuppressWarnings("nls") -class ItemConfigDialog extends Dialog -{ +class ItemConfigDialog extends Dialog { private TextField description; private CheckBox enabled, latching, annunciating; private DateTimePicker enabled_date_picker; @@ -60,8 +50,7 @@ class ItemConfigDialog extends Dialog private final TitleDetailTable guidance, displays, commands; private final TitleDetailDelayTable actions; - public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) - { + public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) { // Allow multiple instances initModality(Modality.NONE); setTitle("Configure " + item.getName()); @@ -87,8 +76,7 @@ public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) path.setEditable(false); layout.add(path, 1, row++); - if (item instanceof AlarmClientLeaf) - { + if (item instanceof AlarmClientLeaf) { final AlarmClientLeaf leaf = (AlarmClientLeaf) item; layout.add(new Label("Description:"), 0, row); @@ -126,7 +114,7 @@ public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) enabled_date_picker.setDateTimeValue(leaf.getEnabledDate()); enabled_date_picker.setPrefSize(280, 25); - relative_date = new ComboBox(); + relative_date = new ComboBox<>(); relative_date.setTooltip(new Tooltip("Select a predefined duration for disabling the alarm")); relative_date.getItems().addAll(AlarmSystem.shelving_options); relative_date.setPrefSize(200, 25); @@ -142,15 +130,14 @@ public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) // setOnAction for relative date must be set to null as to not trigger event when setting value enabled_date_picker.setOnAction((ActionEvent e) -> { - if (enabled_date_picker.getDateTimeValue() != null) - { + if (enabled_date_picker.getDateTimeValue() != null) { relative_date.setOnAction(null); enabled.setSelected(false); enabled_date_picker.getEditor().commitValue(); relative_date.getSelectionModel().clearSelection(); relative_date.setValue(null); relative_date.setOnAction(relative_event_handler); - }; + } }); final HBox until_box = new HBox(10, enabled_date_picker, relative_date); @@ -168,8 +155,7 @@ public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) final String detail; if (seconds <= 0) detail = "With the current delay of 0 seconds, alarms trigger immediately"; - else - { + else { final String hhmmss = SecondsParser.formatSeconds(seconds); detail = "With the current delay of " + seconds + " seconds, alarms trigger after " + hhmmss + " hours:minutes:seconds"; } @@ -247,7 +233,7 @@ public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) // Scroll pane stops the content from resizing, // so tell content to use the widths of the scroll pane // minus 40 to provide space for the scroll bar, and suggest minimum width - scroll.widthProperty().addListener((p, old, width) -> layout.setPrefWidth(Math.max(width.doubleValue()-40, 450))); + scroll.widthProperty().addListener((p, old, width) -> layout.setPrefWidth(Math.max(width.doubleValue() - 40, 450))); getDialogPane().setContent(scroll); setResizable(true); @@ -256,25 +242,22 @@ public ItemConfigDialog(final AlarmClient model, final AlarmTreeItem item) final Button ok = (Button) getDialogPane().lookupButton(ButtonType.OK); ok.addEventFilter(ActionEvent.ACTION, event -> - { - if (!validateAndStore(model, item)) - event.consume(); - }); + validateAndStore(model, item, event)); setResultConverter(button -> button == ButtonType.OK); } - /** Send requested configuration - * @param model {@link AlarmClient} - * @param item Original item - * @return true on success + /** + * Send requested configuration + * + * @param model {@link AlarmClient} + * @param item Original item + * @param event Button click event, consumed if save action fails (e.g. Kafka not reachable) */ - private boolean validateAndStore(final AlarmClient model, final AlarmTreeItem item) - { + private void validateAndStore(final AlarmClient model, final AlarmTreeItem item, ActionEvent event) { final AlarmTreeItem config; - if (item instanceof AlarmClientLeaf) - { + if (item instanceof AlarmClientLeaf) { final AlarmClientLeaf pv = new AlarmClientLeaf(null, item.getName()); pv.setDescription(description.getText().trim()); pv.setEnabled(enabled.isSelected()); @@ -294,31 +277,25 @@ private boolean validateAndStore(final AlarmClient model, final AlarmTreeItem else pv.setEnabled(true); - if (relative_enable_date != null) - { + if (relative_enable_date != null) { final TemporalAmount amount = TimeParser.parseTemporalAmount(relative_enable_date); final LocalDateTime update_date = LocalDateTime.now().plus(amount); pv.setEnabledDate(update_date); - }; + } + ; config = pv; - } - else + } else config = new AlarmClientNode(null, item.getName()); config.setGuidance(guidance.getItems()); config.setDisplays(displays.getItems()); config.setCommands(commands.getItems()); config.setActions(actions.getItems()); - try - { + try { model.sendItemConfigurationUpdate(item.getPathName(), config); - } - catch (Exception ex) - { + } catch (Exception ex) { ExceptionDetailsErrorDialog.openError("Error", "Cannot update item", ex); - return false; + event.consume(); } - - return true; } } \ No newline at end of file diff --git a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java index 6021d88ab9..6c6fb2ad1e 100644 --- a/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java +++ b/services/alarm-server/src/main/java/org/phoebus/applications/alarm/server/ServerModel.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import org.apache.kafka.clients.consumer.Consumer; @@ -76,22 +77,23 @@ class ServerModel /** Did the last connectivity check fail? */ private boolean connection_lost = false; + /** + * Timeout in seconds waiting for response from Kafka when sending producer messages. + */ + private static final int KAFKA_CLIENT_TIMEOUT = 10; /** @param kafka_servers Servers * @param config_name Name of alarm tree root * @param initial_states * @param listener * @param kafka_properties_file Additional properties to pass to the kafka client - * @throws Exception on error */ public ServerModel(final String kafka_servers, final String config_name, final ConcurrentHashMap initial_states, final ServerModelListener listener, - final String kafka_properties_file) throws Exception + final String kafka_properties_file) { this.initial_states = initial_states; - // initial_states.entrySet().forEach(state -> - // System.out.println("Initial state for " + state.getKey() + " : " + state.getValue())); config_state_topic = Objects.requireNonNull(config_name); command_topic = config_name + AlarmSystem.COMMAND_TOPIC_SUFFIX; @@ -187,7 +189,7 @@ private void checkConnectivity(final long now) // but silently drops them, so clients will get out of sync, // and since Kafka is down, it won't track the most recent alarm state // for future clients... - if (connected == false && connection_lost == false) + if (!connected && !connection_lost) logger.log(Level.WARNING, "Lost Kafka connectitity"); else if (connected && connection_lost) { @@ -334,9 +336,8 @@ public AlarmTreeItem findNode(final String path) throws Exception * * @param name PV name * @return Node, null if model does not contain the PV - * @throws Exception on error */ - public AlarmServerPV findPV(final String name) throws Exception + public AlarmServerPV findPV(final String name) { return findPV(name, root); } @@ -471,7 +472,7 @@ public void sendStateUpdate(final String path, final BasicState new_state) { final String json = new_state == null ? null : new String(JsonModelWriter.toJsonBytes(new_state, AlarmLogic.getMaintenanceMode(), AlarmLogic.getDisableNotify())); final ProducerRecord record = new ProducerRecord<>(config_state_topic, AlarmSystem.STATE_PREFIX + path, json); - producer.send(record); + producer.send(record).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); last_state_update = System.currentTimeMillis(); } catch (Throwable ex) @@ -482,7 +483,7 @@ public void sendStateUpdate(final String path, final BasicState new_state) /** Send alarm update to 'config' topic * @param path Path of item that has a new state - * @param new_state That new state + * @param config That new state */ public void sendConfigUpdate(final String path, final AlarmTreeItem config) { @@ -490,7 +491,7 @@ public void sendConfigUpdate(final String path, final AlarmTreeItem { final String json = config == null ? null : new String(JsonModelWriter.toJsonBytes(config)); final ProducerRecord record = new ProducerRecord<>(config_state_topic, AlarmSystem.CONFIG_PREFIX + path, json); - producer.send(record); + producer.send(record).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); } catch (Throwable ex) { @@ -511,7 +512,7 @@ public void sendAnnunciatorMessage(final String path, final SeverityLevel severi final String json = JsonModelWriter.talkToString(severity, message); final ProducerRecord record = new ProducerRecord<>(talk_topic, AlarmSystem.TALK_PREFIX + path, json); - producer.send(record); + producer.send(record).get(KAFKA_CLIENT_TIMEOUT, TimeUnit.SECONDS); } catch (Throwable ex) {