Skip to content

Commit

Permalink
Fix #8: change the timestamp to nanosecond
Browse files Browse the repository at this point in the history
Write capacity and availbw back to datastore.
So this patch should also fix #10.

Signed-off-by: jensenzhang <[email protected]>
  • Loading branch information
fno2010 committed Nov 9, 2017
1 parent f924711 commit 67dd230
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class BwFetchingService implements DataTreeChangeListener<NodeConnector>
/**
* Parameter to calculate the speed (magic number?)
*/
private final Integer timeSpan = 3;
private final static Integer TIME_SPAN = 3;
private final static Long NANOSECOND_PER_SECOND = 1000000000L;

/**
* Contructor to get essential variables
Expand All @@ -56,7 +57,7 @@ public BwFetchingService(DataBroker dataBroker){
LOG.info("BwFetchingService initialized");
}

class Statistic{
class Statistic {
/**
* Timestamp -> Bytes
*/
Expand All @@ -68,7 +69,7 @@ class Statistic{
Long capacity;
Long availBw;

public Statistic(){
public Statistic() {
rxHistory = new HashMap<>();
txHistory = new HashMap<>();
rxSpeed = 0L;
Expand All @@ -81,9 +82,10 @@ public Statistic(){
Map<String, Statistic> statisticData = new HashMap<>();

private void syncToDataBroker(String name, Statistic statistic) {
BwmonitorUtils.writeToSpeeds(name, statistic.rxSpeed, statistic.txSpeed, dataBroker);
LOG.debug("Bwmonitor speeds updated: rxSpeed={}, txSpeed={}",
statistic.rxSpeed, statistic.txSpeed);
BwmonitorUtils.writeToSpeeds(name, statistic.rxSpeed, statistic.txSpeed,
statistic.capacity, statistic.availBw, dataBroker);
LOG.debug("Bwmonitor speeds updated: rxSpeed={}, txSpeed={}, capacity={}, availBw={}",
statistic.rxSpeed, statistic.txSpeed, statistic.capacity, statistic.availBw);
}

private void registerPortListener() {
Expand All @@ -97,7 +99,6 @@ private void registerPortListener() {

@Override
public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<NodeConnector>> changes) {
LOG.debug("Get Data Changed");
exec.submit(() -> {
for (DataTreeModification<NodeConnector> change: changes) {
DataObjectModification<NodeConnector> rootNode = change.getRootNode();
Expand Down Expand Up @@ -132,25 +133,34 @@ private void onFlowCapableNodeConnectorStatisticsDataUpdated(
String id = identifier.firstKeyOf(NodeConnector.class).getId().getValue();
if (statisticData.containsKey(id)) {
statistic = statisticData.get(id);
LOG.debug("Get historical statistic of port {}: {}", id, statistic);
} else {
LOG.debug("Port is not subscribed for monitoring!");
return;
}
if (updatedStatistic != null) {
LOG.debug("Reading new statistic of port {}", id);
Bytes bytes = updatedStatistic.getFlowCapableNodeConnectorStatistics().getBytes();
LOG.debug("Done to read port statistic");
if (bytes != null) {
LOG.debug("Processing new statistic");
Long timestamp = updatedStatistic.getFlowCapableNodeConnectorStatistics()
.getDuration().getSecond().getValue();
.getDuration().getSecond().getValue() * NANOSECOND_PER_SECOND + updatedStatistic.getFlowCapableNodeConnectorStatistics()
.getDuration().getNanosecond().getValue();
statistic.rxHistory.put(timestamp, bytes.getReceived().longValue());
statistic.txHistory.put(timestamp, bytes.getTransmitted().longValue());
statistic.rxSpeed = computeStatisticFromHistory(statistic.rxHistory, timestamp);
statistic.txSpeed = computeStatisticFromHistory(statistic.txHistory, timestamp);
LOG.debug("Compute new rate of port {}: rx={}, tx={}", id, statistic.rxSpeed, statistic.txSpeed);
}
}
LOG.debug("Done to process new statistic");
FlowCapableNodeConnector updatedFlowPort = updatedPort
.getAugmentation(FlowCapableNodeConnector.class);
if (updatedFlowPort != null) {
LOG.debug("Reading capacity of port {}", id);
statistic.capacity = updatedFlowPort.getCurrentSpeed();
LOG.debug("Done to read capacity");
statistic.availBw = statistic.capacity - statistic.txSpeed / 128;
}
if (statistic != null) {
Expand All @@ -160,7 +170,7 @@ private void onFlowCapableNodeConnectorStatisticsDataUpdated(

private void cleanStatisticHistory(Map<Long, Long> history, Long timestamp, boolean inTimeSpan) {
if (inTimeSpan) {
history.entrySet().removeIf(e -> e.getKey() < (timestamp - timeSpan));
history.entrySet().removeIf(e -> e.getKey() < (timestamp - TIME_SPAN));
} else {
Long maxTime = Long.valueOf(0);
for (Map.Entry<Long, Long> item : history.entrySet()) {
Expand All @@ -176,13 +186,19 @@ private void cleanStatisticHistory(Map<Long, Long> history, Long timestamp, bool
}
}

/**
* Compute ewma of port statistic.
* @param history the historical rate statistic of this port (bytes)
* @param timestamp the duration time since start statistic (nanoseconds)
* @return the ewma of port statistic
*/
private Long computeStatisticFromHistory(Map<Long, Long> history, Long timestamp) {
/**
* current speed = (average speed in timeSpan) * 0.8 + (speed from last history record) * 0.2
*/
boolean inTimeSpan = false;
for (Map.Entry<Long, Long> item : history.entrySet()) {
if (!item.getKey().equals(timestamp) && item.getKey() > timestamp - timeSpan) {
if (!item.getKey().equals(timestamp) && item.getKey() > timestamp - TIME_SPAN) {
inTimeSpan = true;
break;
}
Expand All @@ -196,9 +212,9 @@ private Long computeStatisticFromHistory(Map<Long, Long> history, Long timestamp
if (item.getKey() != timestamp && item.getKey() > maxTime)
maxTime = item.getKey();
}
Long speedFromLastRecord = (history.get(maxTime) - history.get(timestamp)) / (timestamp - maxTime);
Long speedFromLastRecord = (history.get(maxTime) - history.get(timestamp)) * NANOSECOND_PER_SECOND / (timestamp - maxTime);
if (inTimeSpan) {
Long speedFromTimeSpan = (history.get(minTime) - history.get(timestamp)) / (timestamp - minTime);
Long speedFromTimeSpan = (history.get(minTime) - history.get(timestamp)) * NANOSECOND_PER_SECOND / (timestamp - minTime);
return (long)(speedFromTimeSpan * 0.8 + speedFromLastRecord * 0.2);
} else {
return speedFromLastRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Future<RpcResult<BwmonitorSubscribeOutput>> bwmonitorSubscribe(BwmonitorS
List<String> ports = input.getPortId();
if (ports != null) {
for (String port : ports) {
success = BwmonitorUtils.writeToSpeeds(port, 0L, 0L, db);
success = BwmonitorUtils.writeToSpeeds(port, 0L, 0L, 0L, 0L, db);
if (success) {
/**
* Deliver databroker into utility function is inefficient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ private static InstanceIdentifier<Port> toInstanceIdentifier(String nodeId) {
return iid;
}

public static boolean writeToSpeeds(String portId, Long rxSpeed, Long txSpeed, DataBroker db) {
public static boolean writeToSpeeds(String portId, Long rxSpeed, Long txSpeed,
Long capacity, Long availBw, DataBroker db) {
WriteTransaction transaction = db.newWriteOnlyTransaction();
InstanceIdentifier<Port> iid = BwmonitorUtils.toInstanceIdentifier(portId);
Port node = new PortBuilder().setPortId(portId)
.setRxSpeed(BigInteger.valueOf(rxSpeed))
.setTxSpeed(BigInteger.valueOf(txSpeed))
.setCapacity(capacity)
.setAvailBw(availBw)
.build();
transaction.put(LogicalDatastoreType.OPERATIONAL, iid, node);
CheckedFuture<Void, TransactionCommitFailedException> future = transaction.submit();
Expand Down

0 comments on commit 67dd230

Please sign in to comment.