diff --git a/ribbon-httpclient/build.gradle b/ribbon-httpclient/build.gradle index f595022e5..75466efe3 100644 --- a/ribbon-httpclient/build.gradle +++ b/ribbon-httpclient/build.gradle @@ -9,7 +9,6 @@ dependencies { api "org.slf4j:slf4j-api:${slf4j_version}" api "com.netflix.servo:servo-core:${servo_version}" api "com.google.guava:guava:${guava_version}" - api 'com.netflix.netflix-commons:netflix-commons-util:0.1.1' testImplementation 'junit:junit:4.11' testImplementation "org.slf4j:slf4j-log4j12:${slf4j_version}" testImplementation 'commons-io:commons-io:2.0.1' diff --git a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java index 737a0928c..752e02b40 100644 --- a/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java +++ b/ribbon-httpclient/src/main/java/com/netflix/niws/client/http/RestClient.java @@ -67,7 +67,7 @@ import com.netflix.http4.ssl.KeyStoreAwareSocketFactory; import com.netflix.loadbalancer.BaseLoadBalancer; import com.netflix.loadbalancer.ILoadBalancer; -import com.netflix.util.Pair; +import internal.com.netflix.util.Pair; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; diff --git a/ribbon-loadbalancer/build.gradle b/ribbon-loadbalancer/build.gradle index ad7cbd275..029de7514 100644 --- a/ribbon-loadbalancer/build.gradle +++ b/ribbon-loadbalancer/build.gradle @@ -1,11 +1,9 @@ dependencies { api project(':ribbon-core') - api 'com.netflix.netflix-commons:netflix-statistics:0.1.1' api "io.reactivex:rxjava:${rx_java_version}" api "org.slf4j:slf4j-api:${slf4j_version}" api "com.netflix.servo:servo-core:${servo_version}" api "com.google.guava:guava:${guava_version}" - api 'com.netflix.netflix-commons:netflix-commons-util:0.1.1' testImplementation project(":ribbon-archaius") testImplementation 'junit:junit:4.11' diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java index af49283f7..7283b7f0e 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/BaseLoadBalancer.java @@ -29,7 +29,7 @@ import com.netflix.servo.annotations.Monitor; import com.netflix.servo.monitor.Counter; import com.netflix.servo.monitor.Monitors; -import com.netflix.util.concurrent.ShutdownEnabledTimer; +import internal.com.netflix.util.concurrent.ShutdownEnabledTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerContext.java b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerContext.java index 8dd9eecc8..f32677bf8 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerContext.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/LoadBalancerContext.java @@ -27,7 +27,7 @@ import com.netflix.client.config.IClientConfig; import com.netflix.servo.monitor.Monitors; import com.netflix.servo.monitor.Timer; -import com.netflix.util.Pair; +import internal.com.netflix.util.Pair; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/Server.java b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/Server.java index 0f07ded7b..6d4763203 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/Server.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/Server.java @@ -17,7 +17,7 @@ */ package com.netflix.loadbalancer; -import com.netflix.util.Pair; +import internal.com.netflix.util.Pair; /** * Class that represents a typical Server (or an addressable Node) i.e. a diff --git a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/ServerStats.java b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/ServerStats.java index 44a2c2f17..baaf1a484 100644 --- a/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/ServerStats.java +++ b/ribbon-loadbalancer/src/main/java/com/netflix/loadbalancer/ServerStats.java @@ -19,19 +19,16 @@ import com.google.common.annotations.VisibleForTesting; -import com.netflix.client.config.CommonClientConfigKey; -import com.netflix.client.config.IClientConfigKey; import com.netflix.client.config.Property; import com.netflix.client.config.UnboxedIntProperty; import com.netflix.servo.annotations.DataSourceType; import com.netflix.servo.annotations.Monitor; -import com.netflix.stats.distribution.DataDistribution; -import com.netflix.stats.distribution.DataPublisher; -import com.netflix.stats.distribution.Distribution; -import com.netflix.util.MeasuredRate; +import internal.com.netflix.stats.distribution.DataDistribution; +import internal.com.netflix.stats.distribution.DataPublisher; +import internal.com.netflix.stats.distribution.Distribution; +import internal.com.netflix.util.MeasuredRate; import java.util.Date; -import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataAccumulator.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataAccumulator.java new file mode 100644 index 000000000..561791bd8 --- /dev/null +++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataAccumulator.java @@ -0,0 +1,122 @@ +/* +* +* Copyright 2013 Netflix, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +package internal.com.netflix.stats.distribution; + +import java.util.concurrent.locks.Lock; + + +/** + * A double-buffer of {@link DataBuffer} objects. + * One is the "current" buffer, and new data is added to it. + * The other is the "previous" buffer, and is used as a sorce + * of computed statistics. + * + * @see DataPublisher + * + * @author $Author: netflixoss $ + */ +public abstract class DataAccumulator implements DataCollector { + + private DataBuffer current; + private DataBuffer previous; + private final Object swapLock = new Object(); + + /* + * Constructor(s) + */ + + /** + * Creates a new initially empty DataAccumulator. + * + * @param bufferSize the size of the buffers to use + */ + public DataAccumulator(int bufferSize) { + this.current = new DataBuffer(bufferSize); + this.previous = new DataBuffer(bufferSize); + } + + /* + * Accumulating new values + */ + + /** {@inheritDoc} */ + public void noteValue(double val) { + synchronized (swapLock) { + Lock l = current.getLock(); + l.lock(); + try { + current.noteValue(val); + } finally { + l.unlock(); + } + } + } + + /** + * Swaps the data collection buffers, and computes statistics + * about the data collected up til now. + */ + public void publish() { + /* + * Some care is required here to correctly swap the DataBuffers, + * but not hold the synchronization object while compiling stats + * (a potentially long computation). This ensures that continued + * data collection (calls to noteValue()) will not be blocked for any + * significant period. + */ + DataBuffer tmp = null; + Lock l = null; + synchronized (swapLock) { + // Swap buffers + tmp = current; + current = previous; + previous = tmp; + // Start collection in the new "current" buffer + l = current.getLock(); + l.lock(); + try { + current.startCollection(); + } finally { + l.unlock(); + } + // Grab lock on new "previous" buffer + l = tmp.getLock(); + l.lock(); + } + // Release synchronizaton *before* publishing data + try { + tmp.endCollection(); + publish(tmp); + } finally { + l.unlock(); + } + } + + /** + * Called to publish recently collected data. + * When called, the {@link Lock} associated with the "previous" + * buffer is held, so the data will not be changed. + * Other locks have been released, and so new data can be + * collected in the "current" buffer. + * The data in the buffer has also been sorted in increasing order. + * + * @param buf the {@code DataBuffer} that is now "previous". + */ + protected abstract void publish(DataBuffer buf); + +} // DataAccumulator diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataBuffer.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataBuffer.java new file mode 100644 index 000000000..e06bd5249 --- /dev/null +++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataBuffer.java @@ -0,0 +1,202 @@ +/* +* +* Copyright 2013 Netflix, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +package internal.com.netflix.stats.distribution; + +import java.util.Arrays; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +/** + * A fixed-size data collection buffer that holds a sliding window + * of the most recent values added. + * The {@code DataBuffer} is also a {@link Distribution} and so collects + * basic statistics about the data added to the buffer. + * This statistical data is managed on-the-fly, and reflects all the data + * added, if those values that may have been dropped due to buffer overflow. + *
+ * This class is not synchronized, but can instead managed by a + * {@link Lock} attached to the {@code DataBuffer} (see {@link #getLock}). + * @author netflixoss + */ +public class DataBuffer extends Distribution { + + private final Lock lock; + private final double[] buf; + private long startMillis; + private long endMillis; + private int size; + private int insertPos; + + /* + * Constructors + */ + + /** + * Creates a new {@code DataBuffer} with a given capacity. + */ + public DataBuffer(int capacity) { + lock = new ReentrantLock(); + buf = new double[capacity]; + startMillis = 0; + size = 0; + insertPos = 0; + } + + /* + * Accessors + */ + + /** + * Gets the {@link Lock} to use to manage access to the + * contents of the {@code DataBuffer}. + */ + public Lock getLock() { + return lock; + } + + /** + * Gets the capacity of the {@code DataBuffer}; that is, + * the maximum number of values that the {@code DataBuffer} can hold. + */ + public int getCapacity() { + return buf.length; + } + + /** + * Gets the length of time over which the data was collected, + * in milliseconds. + * The value is only valid after {@link #endCollection} + * has been called (and before a subsequent call to {@link #startCollection}). + */ + public long getSampleIntervalMillis() { + return (endMillis - startMillis); + } + + /** + * Gets the number of values currently held in the buffer. + * This value may be smaller than the value of {@link #getNumValues} + * depending on how the percentile values were computed. + */ + public int getSampleSize() { + return size; + } + + /* + * Managing the data + */ + + /** {@inheritDoc} */ + @Override + public void clear() { + super.clear(); + startMillis = 0; + size = 0; + insertPos = 0; + } + + /** + * Notifies the buffer that data is collection is now enabled. + */ + public void startCollection() { + clear(); + startMillis = System.currentTimeMillis(); + } + + /** + * Notifies the buffer that data has just ended. + *
+ * Performance Note:
+ *
This method sorts the underlying data buffer,
+ * and so may be slow. It is best to call this at most once
+ * and fetch all percentile values desired, instead of making
+ * a number of repeated calls.
+ */
+ public void endCollection() {
+ endMillis = System.currentTimeMillis();
+ Arrays.sort(buf, 0, size);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * The buffer wraps-around if it is full, overwriting the oldest + * entry with the new value. + */ + @Override + public void noteValue(double val) { + super.noteValue(val); + buf[insertPos++] = val; + if (insertPos >= buf.length) { + insertPos = 0; + size = buf.length; + } else if (insertPos > size) { + size = insertPos; + } + } + + /** + * Gets the requested percentile statistics. + * + * @param percents array of percentile values to compute, + * which must be in the range {@code [0 .. 100]} + * @param percentiles array to fill in with the percentile values; + * must be the same length as {@code percents} + * @return the {@code percentiles} array + * @see Percentile (Wikipedia) + * @see Percentile + */ + public double[] getPercentiles(double[] percents, double[] percentiles) { + for (int i = 0; i < percents.length; i++) { + percentiles[i] = computePercentile(percents[i]); + } + return percentiles; + } + + private double computePercentile(double percent) { + // Some just-in-case edge cases + if (size <= 0) { + return 0.0; + } else if (percent <= 0.0) { + return buf[0]; + } else if (percent >= 100.0) { // SUPPRESS CHECKSTYLE MagicNumber + return buf[size - 1]; + } + /* + * Note: + * Documents like http://cnx.org/content/m10805/latest + * use a one-based ranking, while this code uses a zero-based index, + * so the code may not look exactly like the formulas. + */ + double index = (percent / 100.0) * size; // SUPPRESS CHECKSTYLE MagicNumber + int iLow = (int) Math.floor(index); + int iHigh = (int) Math.ceil(index); + assert 0 <= iLow && iLow <= index && index <= iHigh && iHigh <= size; + assert (iHigh - iLow) <= 1; + if (iHigh >= size) { + // Another edge case + return buf[size - 1]; + } else if (iLow == iHigh) { + return buf[iLow]; + } else { + // Interpolate between the two bounding values + return buf[iLow] + (index - iLow) * (buf[iHigh] - buf[iLow]); + } + } + +} // DataBuffer diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataCollector.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataCollector.java new file mode 100644 index 000000000..ec0e90630 --- /dev/null +++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataCollector.java @@ -0,0 +1,36 @@ +/* +* +* Copyright 2013 Netflix, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +package internal.com.netflix.stats.distribution; + + +/** + * An object that collects new values incrementally. + * + * @author netflixoss + * @version $Revision: $ + */ +public interface DataCollector { + + /** + * Adds a value to the collected data. + * This must run very quickly, and so can safely + * be called in time-critical code. + */ + void noteValue(double val); + +} // DataCollector diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataDistribution.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataDistribution.java new file mode 100644 index 000000000..e0987d8ca --- /dev/null +++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataDistribution.java @@ -0,0 +1,166 @@ +/* +* +* Copyright 2013 Netflix, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +package internal.com.netflix.stats.distribution; + +import java.util.Date; + + +/** + * A {@link DataAccumulator} that also publishes statistics about the "previous" buffer. + * This implements {@link DataDistributionMBean} + * and so can be registered as an MBean and accessed via JMX if desired. + * + * @author netflixoss + * @version $Revision: $ + */ +public class DataDistribution extends DataAccumulator implements DataDistributionMBean { + + private long numValues = 0L; + private double mean = 0.0; + private double variance = 0.0; + private double stddev = 0.0; + private double min = 0.0; + private double max = 0.0; + private long ts = 0L; + private long interval = 0L; + private int size = 0; + private final double[] percents; + private final double[] percentiles; + + /** + * Creates a new DataDistribution with no data summarized. + * + * @param bufferSize the size of each buffer held by the {@code DataAccumulator} + * @param percents array of percentile values to calculate when buffers + * are swapped and new data is published. + * The array values must be in the range {@code [0 .. 100]}. + */ + public DataDistribution(int bufferSize, double[] percents) { + super(bufferSize); + assert percentsOK(percents); + this.percents = percents; + this.percentiles = new double[percents.length]; + } + + private static boolean percentsOK(double[] percents) { + if (percents == null) { + return false; + } + for (int i = 0; i < percents.length; i++) { + if (percents[i] < 0.0 || percents[i] > 100.0) { // SUPPRESS CHECKSTYLE MagicNumber + return false; + } + } + return true; + } + + /** {@inheritDoc} */ + protected void publish(DataBuffer buf) { + ts = System.currentTimeMillis(); + numValues = buf.getNumValues(); + mean = buf.getMean(); + variance = buf.getVariance(); + stddev = buf.getStdDev(); + min = buf.getMinimum(); + max = buf.getMaximum(); + interval = buf.getSampleIntervalMillis(); + size = buf.getSampleSize(); + buf.getPercentiles(percents, percentiles); + } + + /* + * DataDistributionMBean protocol + */ + + /** {@inheritDoc} */ + public void clear() { + numValues = 0L; + mean = 0.0; + variance = 0.0; + stddev = 0.0; + min = 0.0; + max = 0.0; + ts = 0L; + interval = 0L; + size = 0; + for (int i = 0; i < percentiles.length; i++) { + percentiles[i] = 0.0; + } + } + + /** {@inheritDoc} */ + public long getNumValues() { + return numValues; + } + + /** {@inheritDoc} */ + public double getMean() { + return mean; + } + + /** {@inheritDoc} */ + public double getVariance() { + return variance; + } + + /** {@inheritDoc} */ + public double getStdDev() { + return stddev; + } + + /** {@inheritDoc} */ + public double getMinimum() { + return min; + } + + /** {@inheritDoc} */ + public double getMaximum() { + return max; + } + + /** {@inheritDoc} */ + public String getTimestamp() { + return new Date(getTimestampMillis()).toString(); + } + + /** {@inheritDoc} */ + public long getTimestampMillis() { + return ts; + } + + /** {@inheritDoc} */ + public long getSampleIntervalMillis() { + return interval; + } + + /** {@inheritDoc} */ + public int getSampleSize() { + return size; + } + + /** {@inheritDoc} */ + public double[] getPercents() { + return percents; + } + + /** {@inheritDoc} */ + public double[] getPercentiles() { + return percentiles; + } + +} // DataDistribution diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataDistributionMBean.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataDistributionMBean.java new file mode 100644 index 000000000..f12ca1eb0 --- /dev/null +++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataDistributionMBean.java @@ -0,0 +1,68 @@ +/* +* +* Copyright 2013 Netflix, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +package internal.com.netflix.stats.distribution; + + +/** + * Abstract MBean interface for objects that hold information about a distribution + * of (double) values. The information includes basic statistics (count, mean, + * min, max) as well as information about the percentile values for some number + * of percent values. + *
+ * This interface supports the standard MBean management interface, + * so implementing classes will support JMX monitoring. + * + * @author netflixoss $ + * @version $Revision: $ + */ +public interface DataDistributionMBean extends DistributionMBean { + + /** + * Gets a String representation of the time when this data was produced. + */ + String getTimestamp(); + + /** + * Gets the time when this data was produced, in milliseconds since the epoch. + */ + long getTimestampMillis(); + + /** + * Gets the length of time over which the data was collected, + * in milliseconds. + */ + long getSampleIntervalMillis(); + + /** + * Gets the number of values used to compute the percentile values. + * This value may be smaller than the value of {@link #getNumValues} + * depending on how the percentile values were computed. + */ + int getSampleSize(); + + /** + * Gets the array of known percentile percents. + */ + double[] getPercents(); + + /** + * Gets the array of known percentile values. + */ + double[] getPercentiles(); + +} // DataDistributionMBean diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataPublisher.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataPublisher.java new file mode 100644 index 000000000..304403032 --- /dev/null +++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DataPublisher.java @@ -0,0 +1,144 @@ +/* +* +* Copyright 2013 Netflix, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +package internal.com.netflix.stats.distribution; + +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + + +/** + * An object that periodically updates a {@link DataAccumulator}, + * swapping between the buffers. + * + * @author netlixoss $ + * @version $Revision: $ + */ +public class DataPublisher { + + private static final String THREAD_NAME = "DataPublisher"; + private static final boolean DAEMON_THREADS = true; + private static ScheduledExecutorService sharedExecutor = null; + + private final DataAccumulator accumulator; + private final long delayMillis; + private Future> future = null; + + /** + * Creates a new {@code DataPublisher}. + * When created it is not running; it is up to the caller to call {@link #start}. + * + * @param accumulator the DataAccumulator to periodically publish + * @param delayMillis the number of milliseconds between publish events + */ + public DataPublisher(DataAccumulator accumulator, + long delayMillis) { + this.accumulator = accumulator; + this.delayMillis = delayMillis; + } + + /** + * Gets the {@code DataAccumulator} that is managed by this publisher. + */ + public DataAccumulator getDataAccumulator() { + return accumulator; + } + + /** + * Is the {@code DataPublisher} scheduled to run? + */ + public synchronized boolean isRunning() { + return (future != null); + } + + /* + * Scheduling data publication + */ + + /** + * Starts the {@code DataPublisher}. + * The method {@link DataAccumulator#publish} will be called approximately + * every {@code delayMillis} milliseconds. + * If the publisher has already been started, does nothing. + * + * @see #stop + */ + public synchronized void start() { + if (future == null) { + Runnable task = new Runnable() { + public void run() { + try { + accumulator.publish(); + } catch (Exception e) { + handleException(e); + } + } + }; + future = getExecutor().scheduleWithFixedDelay(task, + delayMillis, delayMillis, + TimeUnit.MILLISECONDS); + } + } + + /** + * Gets the {@link ScheduledExecutorService} to use to run the task to periodically + * update the {@code DataAccumulator}. + * The default uses a global executor pool for all {@code DataPublisher}s. + * Subclasses are free to override this if desired, for example to use + * a per-publisher executor pool. + */ + protected synchronized ScheduledExecutorService getExecutor() { + if (sharedExecutor == null) { + sharedExecutor = Executors.newScheduledThreadPool(1, new PublishThreadFactory()); + } + return sharedExecutor; + } + + private static final class PublishThreadFactory implements ThreadFactory { + PublishThreadFactory() { } + public Thread newThread(Runnable r) { + Thread t = new Thread(r, THREAD_NAME); + t.setDaemon(DAEMON_THREADS); + return t; + } + } + + /** + * Stops publishing new data. + * + * @see #start + */ + public synchronized void stop() { + if (future != null) { + future.cancel(false); + future = null; + } + } + + /** + * Called if an attempt to publish data throws an exception. + * The default does nothing. + * Subclasses are free to override this. + */ + protected void handleException(Exception e) { + // Do nothing, for now + } + +} // DataPublisher diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/Distribution.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/Distribution.java new file mode 100644 index 000000000..2c77875fa --- /dev/null +++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/Distribution.java @@ -0,0 +1,163 @@ +/* +* +* Copyright 2013 Netflix, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*/ +package internal.com.netflix.stats.distribution; + + +/** + * Accumulator of statistics about a distribution of + * observed values that are produced incrementally. + *
+ * Note that the implementation is not synchronized, + * and simultaneous updates may produce incorrect results. + * In most cases these incorrect results will be unimportant, + * but applications that care should synchronize carefully + * to ensure consistent results. + *
+ * Note that this implements {@link DistributionMBean} and so can be
+ * registered as an MBean and accessed via JMX if desired.
+ *
+ * @author netflixoss $
+ * @version $Revision: $
+ */
+public class Distribution implements DistributionMBean, DataCollector {
+
+ private long numValues;
+ private double sumValues;
+ private double sumSquareValues;
+ private double minValue;
+ private double maxValue;
+
+ /*
+ * Constructors
+ */
+
+ /**
+ * Creates a new initially empty Distribution.
+ */
+ public Distribution() {
+ numValues = 0L;
+ sumValues = 0.0;
+ sumSquareValues = 0.0;
+ minValue = 0.0;
+ maxValue = 0.0;
+ }
+
+ /*
+ * Accumulating new values
+ */
+
+ /** {@inheritDoc} */
+ public void noteValue(double val) {
+ numValues++;
+ sumValues += val;
+ sumSquareValues += val * val;
+ if (numValues == 1) {
+ minValue = val;
+ maxValue = val;
+ } else if (val < minValue) {
+ minValue = val;
+ } else if (val > maxValue) {
+ maxValue = val;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void clear() {
+ numValues = 0L;
+ sumValues = 0.0;
+ sumSquareValues = 0.0;
+ minValue = 0.0;
+ maxValue = 0.0;
+ }
+
+ /*
+ * Accessors
+ */
+
+ /** {@inheritDoc} */
+ public long getNumValues() {
+ return numValues;
+ }
+
+ /** {@inheritDoc} */
+ public double getMean() {
+ if (numValues < 1) {
+ return 0.0;
+ } else {
+ return sumValues / numValues;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public double getVariance() {
+ if (numValues < 2) {
+ return 0.0;
+ } else if (sumValues == 0.0) {
+ return 0.0;
+ } else {
+ double mean = getMean();
+ return (sumSquareValues / numValues) - mean * mean;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public double getStdDev() {
+ return Math.sqrt(getVariance());
+ }
+
+ /** {@inheritDoc} */
+ public double getMinimum() {
+ return minValue;
+ }
+
+ /** {@inheritDoc} */
+ public double getMaximum() {
+ return maxValue;
+ }
+
+ /**
+ * Add another {@link Distribution}'s values to this one.
+ *
+ * @param anotherDistribution
+ * the other {@link Distribution} instance
+ */
+ public void add(Distribution anotherDistribution) {
+ if (anotherDistribution != null) {
+ numValues += anotherDistribution.numValues;
+ sumValues += anotherDistribution.sumValues;
+ sumSquareValues += anotherDistribution.sumSquareValues;
+ minValue = (minValue < anotherDistribution.minValue) ? minValue
+ : anotherDistribution.minValue;
+ maxValue = (maxValue > anotherDistribution.maxValue) ? maxValue
+ : anotherDistribution.maxValue;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("{Distribution:")
+ .append("N=").append(getNumValues())
+ .append(": ").append(getMinimum())
+ .append("..").append(getMean())
+ .append("..").append(getMaximum())
+ .append("}")
+ .toString();
+ }
+
+} // Distribution
diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DistributionMBean.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DistributionMBean.java
new file mode 100644
index 000000000..07daffc7f
--- /dev/null
+++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/stats/distribution/DistributionMBean.java
@@ -0,0 +1,68 @@
+/*
+*
+* Copyright 2013 Netflix, Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*/
+package internal.com.netflix.stats.distribution;
+
+
+/**
+ * Abstract MBean interface for objects that describe the general
+ * characteristics of a distribution of (double) values.
+ * This interface supports the standard MBean management interface,
+ * so implementing classes will support JMX monitoring.
+ *
+ * @author netflixoss $
+ * @version $Revision: $
+ */
+public interface DistributionMBean {
+
+ /**
+ * Clears out the distribution, resetting it to its initial state.
+ */
+ void clear();
+
+ /**
+ * Get the number of values in the distribution.
+ */
+ long getNumValues();
+
+ /**
+ * Get the average value in the distribtion.
+ */
+ double getMean();
+
+ /**
+ * Get the variance (the square of the standard deviation)
+ * of values in the distribution.
+ */
+ double getVariance();
+
+ /**
+ * Get the standard deviation of values in the distribution.
+ */
+ double getStdDev();
+
+ /**
+ * Get the minimum value found in the distribution.
+ */
+ double getMinimum();
+
+ /**
+ * Get the maximum value found in the distribution.
+ */
+ double getMaximum();
+
+} // DistributionMBean
diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/util/MeasuredRate.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/util/MeasuredRate.java
new file mode 100644
index 000000000..c49c74ae5
--- /dev/null
+++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/util/MeasuredRate.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package internal.com.netflix.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utility class for getting a count per last X milliseconds
+ *
+ * @author stonse
+ * @author gkim
+ *
+ */
+public class MeasuredRate {
+ private final AtomicLong _lastBucket = new AtomicLong(0);
+ private final AtomicLong _currentBucket = new AtomicLong(0);
+ private final long _sampleInterval;
+ private volatile long _threshold;
+
+ /**
+ * @param sampleInterval in milliseconds
+ */
+ public MeasuredRate(long sampleInterval){
+ _sampleInterval = sampleInterval;
+
+ _threshold = System.currentTimeMillis() + sampleInterval;
+ }
+
+ /**
+ * @return the count in the last sample interval
+ */
+ public long getCount() {
+ checkAndResetWindow();
+ return _lastBucket.get();
+ }
+
+ /**
+ * @return the count in the current sample interval which will be incomplete.
+ * If you are looking for accurate counts/interval - use {@link MeasuredRate#getCount()}
+ * instead.
+ */
+ public long getCurrentCount() {
+ checkAndResetWindow();
+ return _currentBucket.get();
+ }
+
+ /**
+ * Increments the count in the current sample interval. If the current
+ * interval has exceeded, assigns the current count to the
+ * last bucket and zeros out the current bucket
+ */
+ public void increment() {
+ checkAndResetWindow();
+ _currentBucket.incrementAndGet();
+ }
+
+ private void checkAndResetWindow() {
+ long now = System.currentTimeMillis();
+ if(_threshold < now) {
+ _lastBucket.set(_currentBucket.get());
+ _currentBucket.set(0);
+ _threshold = now + _sampleInterval;
+ }
+ }
+
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("count:" + getCount());
+ sb.append("currentCount:" + getCurrentCount());
+ return sb.toString();
+ }
+
+ public static void main(String args[]){
+ MeasuredRate mr = new MeasuredRate(500);
+
+ for(int i=0; i<1000; i++){
+ mr.increment();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ System.out.println("mr:" + mr);
+
+ }
+}
diff --git a/ribbon-loadbalancer/src/main/java/internal/com/netflix/util/Pair.java b/ribbon-loadbalancer/src/main/java/internal/com/netflix/util/Pair.java
new file mode 100644
index 000000000..ff23ae5d7
--- /dev/null
+++ b/ribbon-loadbalancer/src/main/java/internal/com/netflix/util/Pair.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package internal.com.netflix.util;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+
+/**
+ * A simple class that holds a pair of values.
+ * This may be useful for methods that care to
+ * return two values (instead of just one).
+ */
+public class Pair