diff --git a/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwFetchingService.java b/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwFetchingService.java index 46e96ca..ba494e6 100644 --- a/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwFetchingService.java +++ b/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwFetchingService.java @@ -44,7 +44,8 @@ public class BwFetchingService implements DataTreeChangeListener /** * Parameter to calculate the speed (magic number?) */ - private final Integer timeSpan = 3; + private final static Integer TIME_SPAN = 3; + private final static Long NANOSECOND_PER_SECOND = 1000000000L; /** * Contructor to get essential variables @@ -56,7 +57,7 @@ public BwFetchingService(DataBroker dataBroker){ LOG.info("BwFetchingService initialized"); } - class Statistic{ + class Statistic { /** * Timestamp -> Bytes */ @@ -68,7 +69,7 @@ class Statistic{ Long capacity; Long availBw; - public Statistic(){ + public Statistic() { rxHistory = new HashMap<>(); txHistory = new HashMap<>(); rxSpeed = 0L; @@ -81,9 +82,10 @@ public Statistic(){ Map statisticData = new HashMap<>(); private void syncToDataBroker(String name, Statistic statistic) { - BwmonitorUtils.writeToSpeeds(name, statistic.rxSpeed, statistic.txSpeed, dataBroker); - LOG.debug("Bwmonitor speeds updated: rxSpeed={}, txSpeed={}", - statistic.rxSpeed, statistic.txSpeed); + BwmonitorUtils.writeToSpeeds(name, statistic.rxSpeed, statistic.txSpeed, + statistic.capacity, statistic.availBw, dataBroker); + LOG.debug("Bwmonitor speeds updated: rxSpeed={}, txSpeed={}, capacity={}, availBw={}", + statistic.rxSpeed, statistic.txSpeed, statistic.capacity, statistic.availBw); } private void registerPortListener() { @@ -97,7 +99,6 @@ private void registerPortListener() { @Override public void onDataTreeChanged(@Nonnull Collection> changes) { - LOG.debug("Get Data Changed"); exec.submit(() -> { for (DataTreeModification change: changes) { DataObjectModification rootNode = change.getRootNode(); @@ -132,25 +133,34 @@ private void onFlowCapableNodeConnectorStatisticsDataUpdated( String id = identifier.firstKeyOf(NodeConnector.class).getId().getValue(); if (statisticData.containsKey(id)) { statistic = statisticData.get(id); + LOG.debug("Get historical statistic of port {}: {}", id, statistic); } else { LOG.debug("Port is not subscribed for monitoring!"); return; } if (updatedStatistic != null) { + LOG.debug("Reading new statistic of port {}", id); Bytes bytes = updatedStatistic.getFlowCapableNodeConnectorStatistics().getBytes(); + LOG.debug("Done to read port statistic"); if (bytes != null) { + LOG.debug("Processing new statistic"); Long timestamp = updatedStatistic.getFlowCapableNodeConnectorStatistics() - .getDuration().getSecond().getValue(); + .getDuration().getSecond().getValue() * NANOSECOND_PER_SECOND + updatedStatistic.getFlowCapableNodeConnectorStatistics() + .getDuration().getNanosecond().getValue(); statistic.rxHistory.put(timestamp, bytes.getReceived().longValue()); statistic.txHistory.put(timestamp, bytes.getTransmitted().longValue()); statistic.rxSpeed = computeStatisticFromHistory(statistic.rxHistory, timestamp); statistic.txSpeed = computeStatisticFromHistory(statistic.txHistory, timestamp); + LOG.debug("Compute new rate of port {}: rx={}, tx={}", id, statistic.rxSpeed, statistic.txSpeed); } } + LOG.debug("Done to process new statistic"); FlowCapableNodeConnector updatedFlowPort = updatedPort .getAugmentation(FlowCapableNodeConnector.class); if (updatedFlowPort != null) { + LOG.debug("Reading capacity of port {}", id); statistic.capacity = updatedFlowPort.getCurrentSpeed(); + LOG.debug("Done to read capacity"); statistic.availBw = statistic.capacity - statistic.txSpeed / 128; } if (statistic != null) { @@ -160,7 +170,7 @@ private void onFlowCapableNodeConnectorStatisticsDataUpdated( private void cleanStatisticHistory(Map history, Long timestamp, boolean inTimeSpan) { if (inTimeSpan) { - history.entrySet().removeIf(e -> e.getKey() < (timestamp - timeSpan)); + history.entrySet().removeIf(e -> e.getKey() < (timestamp - TIME_SPAN)); } else { Long maxTime = Long.valueOf(0); for (Map.Entry item : history.entrySet()) { @@ -176,13 +186,19 @@ private void cleanStatisticHistory(Map history, Long timestamp, bool } } + /** + * Compute ewma of port statistic. + * @param history the historical rate statistic of this port (bytes) + * @param timestamp the duration time since start statistic (nanoseconds) + * @return the ewma of port statistic + */ private Long computeStatisticFromHistory(Map history, Long timestamp) { /** * current speed = (average speed in timeSpan) * 0.8 + (speed from last history record) * 0.2 */ boolean inTimeSpan = false; for (Map.Entry item : history.entrySet()) { - if (!item.getKey().equals(timestamp) && item.getKey() > timestamp - timeSpan) { + if (!item.getKey().equals(timestamp) && item.getKey() > timestamp - TIME_SPAN) { inTimeSpan = true; break; } @@ -196,9 +212,9 @@ private Long computeStatisticFromHistory(Map history, Long timestamp if (item.getKey() != timestamp && item.getKey() > maxTime) maxTime = item.getKey(); } - Long speedFromLastRecord = (history.get(maxTime) - history.get(timestamp)) / (timestamp - maxTime); + Long speedFromLastRecord = (history.get(maxTime) - history.get(timestamp)) * NANOSECOND_PER_SECOND / (timestamp - maxTime); if (inTimeSpan) { - Long speedFromTimeSpan = (history.get(minTime) - history.get(timestamp)) / (timestamp - minTime); + Long speedFromTimeSpan = (history.get(minTime) - history.get(timestamp)) * NANOSECOND_PER_SECOND / (timestamp - minTime); return (long)(speedFromTimeSpan * 0.8 + speedFromLastRecord * 0.2); } else { return speedFromLastRecord; diff --git a/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwmonitorUtils.java b/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwmonitorUtils.java index 1fe21da..30cd7bf 100644 --- a/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwmonitorUtils.java +++ b/alto-bwmonitor/impl/src/main/java/org/opendaylight/alto/ext/impl/BwmonitorUtils.java @@ -35,12 +35,15 @@ private static InstanceIdentifier toInstanceIdentifier(String nodeId) { return iid; } - public static boolean writeToSpeeds(String portId, Long rxSpeed, Long txSpeed, DataBroker db) { + public static boolean writeToSpeeds(String portId, Long rxSpeed, Long txSpeed, + Long capacity, Long availBw, DataBroker db) { WriteTransaction transaction = db.newWriteOnlyTransaction(); InstanceIdentifier iid = BwmonitorUtils.toInstanceIdentifier(portId); Port node = new PortBuilder().setPortId(portId) .setRxSpeed(BigInteger.valueOf(rxSpeed)) .setTxSpeed(BigInteger.valueOf(txSpeed)) + .setCapacity(capacity) + .setAvailBw(availBw) .build(); transaction.put(LogicalDatastoreType.OPERATIONAL, iid, node); CheckedFuture future = transaction.submit();