Skip to content

Commit

Permalink
Merge pull request #4 from shipunyc/master
Browse files Browse the repository at this point in the history
Import changes from TFS.
  • Loading branch information
shipunyc committed Dec 19, 2014
2 parents fccb88f + ae3dabd commit dd0fb95
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 113 deletions.
66 changes: 0 additions & 66 deletions src/com/microsoft/azure/documentdb/ConnectionPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
public final class ConnectionPolicy {

private static final int DEFAULT_MAX_CONNECTIONS = 20;
private static final int DEFAULT_MAX_CONCURRENT_CALLS_PER_CONNECTION = 50;
private static final int DEFAULT_REQUEST_TIMEOUT = 60;
// defaultMediaRequestTimeout is based upon the blob client timeout and the retry policy.
private static final int DEFAULT_MEDIA_REQUEST_TIMEOUT = 300;
private static final int DEFAULT_MAX_CONCURRENT_FANOUT_REQUESTS = 32;
private static final int DEFAULT_MAX_POOL_SIZE = 100;
private static final int DEFAULT_IDLE_CONNECTION_TIMEOUT = 60;

Expand All @@ -28,8 +26,6 @@ public ConnectionPolicy() {
this.requestTimeout = ConnectionPolicy.DEFAULT_REQUEST_TIMEOUT;
this.mediaRequestTimeout = ConnectionPolicy.DEFAULT_MEDIA_REQUEST_TIMEOUT;
this.connectionMode = ConnectionMode.Gateway;
this.maxCallsPerConnection = ConnectionPolicy.DEFAULT_MAX_CONCURRENT_CALLS_PER_CONNECTION;
this.maxConcurrentFanoutRequests = DEFAULT_MAX_CONCURRENT_FANOUT_REQUESTS;
this.mediaReadMode = MediaReadMode.Buffered;
this.maxPoolSize = DEFAULT_MAX_POOL_SIZE;
this.idleConnectionTimeout = DEFAULT_IDLE_CONNECTION_TIMEOUT;
Expand Down Expand Up @@ -57,48 +53,6 @@ public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}

private int maxCallsPerConnection;

/**
* Gets the number of maximum simultaneous calls permitted on a single data connection. Currently used only for
* Protocol.Tcp.
*
* @return the max calls per connection.
*/
public int getMaxCallsPerConnection() {
return this.maxCallsPerConnection;
}

/**
* Sets the number of maximum simultaneous calls permitted on a single data connection. Currently used only for
* Protocol.Tcp.
*
* @param maxCallsPerConnection the max calls per connection.
*/
public void setMaxCallsPerConnection(int maxCallsPerConnection) {
this.maxCallsPerConnection = maxCallsPerConnection;
}

private int maxConcurrentFanoutRequests;

/**
* Gets the maximum number of concurrent fanout requests.
*
* @return the maximum number of concurrent fanout requests.
*/
public int getMaxConcurrentFanoutRequest() {
return this.maxConcurrentFanoutRequests;
}

/**
* Sets the maximum number of concurrent fanout requests.
*
* @param maxConcurrentFanoutRequests the max concurrent fanout requests.
*/
public void setMaxConcurrentFanoutRequest(int maxConcurrentFanoutRequests) {
this.maxConcurrentFanoutRequests = maxConcurrentFanoutRequests;
}

private int requestTimeout;

/**
Expand Down Expand Up @@ -179,26 +133,6 @@ public void setMediaReadMode(MediaReadMode mediaReadMode) {
this.mediaReadMode = mediaReadMode;
}

private String connectBindingConfigName;

/**
* Gets the connection bindign config name. Ignored if ConnectionMode is Gateway or ConnectionProtocol is not TCP.
*
* @return the connect binding config name.
*/
public String getConnectBindingConfigName() {
return this.connectBindingConfigName;
}

/**
* Sets the connection binding config name. Ignored if ConnectionMode is Gateway or ConnectionProtocol is not TCP.
*
* @param connectBindingConfigName the connect binding config name.
*/
public void setConnectBindingConfigName(String connectBindingConfigName) {
this.connectBindingConfigName = connectBindingConfigName;
}

private int maxPoolSize;

/**
Expand Down
15 changes: 8 additions & 7 deletions src/com/microsoft/azure/documentdb/FeedResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
public final class FeedResponse<T extends Resource> {

private QueryIterable<T> inner;
private Map<String, String> responseHeaders;
private Map<String, Long> usageHeaders;
private Map<String, Long> quotaHeaders;

Expand Down Expand Up @@ -167,7 +166,6 @@ public long getUserDefinedFunctionsUsage() {

FeedResponse(QueryIterable<T> result) {
this.inner = result;
this.responseHeaders = result.getResponseHeaders();
this.usageHeaders = new HashMap<String, Long>();
this.quotaHeaders = new HashMap<String, Long>();
}
Expand All @@ -179,7 +177,8 @@ public long getUserDefinedFunctionsUsage() {
* @return the max resource quota.
*/
public String getMaxResourceQuota() {
return FeedResponse.getValueOrNull(this.inner.getResponseHeaders(), HttpConstants.HttpHeaders.MAX_RESOURCE_QUOTA);
return FeedResponse.getValueOrNull(this.inner.getResponseHeaders(),
HttpConstants.HttpHeaders.MAX_RESOURCE_QUOTA);
}

/**
Expand All @@ -188,7 +187,8 @@ public String getMaxResourceQuota() {
* @return the current resource quota usage.
*/
public String getCurrentResourceQuotaUsage() {
return FeedResponse.getValueOrNull(this.inner.getResponseHeaders(), HttpConstants.HttpHeaders.CURRENT_RESOURCE_QUOTA_USAGE);
return FeedResponse.getValueOrNull(this.inner.getResponseHeaders(),
HttpConstants.HttpHeaders.CURRENT_RESOURCE_QUOTA_USAGE);
}

/**
Expand All @@ -197,7 +197,8 @@ public String getCurrentResourceQuotaUsage() {
* @return the request charge.
*/
public double getRequestCharge() {
String value = this.responseHeaders.get(HttpConstants.HttpHeaders.REQUEST_CHARGE);
String value = FeedResponse.getValueOrNull(this.inner.getResponseHeaders(),
HttpConstants.HttpHeaders.REQUEST_CHARGE);
if (StringUtils.isEmpty(value)) {
return 0;
}
Expand Down Expand Up @@ -237,7 +238,7 @@ public String getSessionToken() {
* @return the response headers.
*/
public Map<String, String> getResponseHeaders() {
return this.responseHeaders;
return this.inner.getResponseHeaders();
}

/**
Expand Down Expand Up @@ -323,7 +324,7 @@ private void populateQuotaHeader(String headerMaxQuota,

private static String getValueOrNull(Map<String, String> map, String key) {
if (map != null) {
return map.get(key);
return map.get(key);
}
return null;
}
Expand Down
73 changes: 43 additions & 30 deletions src/com/microsoft/azure/documentdb/QueryIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ public class QueryIterable<T extends Resource> implements Iterable<T> {
private boolean hasStarted = false;
private List<T> items = new ArrayList<T>();
private Map<String, String> responseHeaders;
private int currentIndex = 0;
private boolean hasNext = true;

QueryIterable(DocumentClient client,
DocumentServiceRequest request,
ReadType readType,
Class<T> classT) {
DocumentServiceRequest request,
ReadType readType,
Class<T> classT) {
this.client = client;
this.retryPolicy = new ResourceThrottleRetryPolicy(
client.getRetryPolicy().getMaxRetryAttemptsOnQuery());
Expand All @@ -39,15 +41,6 @@ public class QueryIterable<T extends Resource> implements Iterable<T> {
this.reset();
}

private void reset() {
if (this.request != null && this.request.getHeaders() != null) {
String continuationToken = this.request.getHeaders().get(HttpConstants.HttpHeaders.CONTINUATION);
if (!StringUtils.isBlank(continuationToken)) {
this.continuation = continuationToken;
}
}
}

/**
* Gets the response headers.
*
Expand Down Expand Up @@ -75,14 +68,12 @@ String getContinuation() {
public Iterator<T> iterator() {
Iterator<T> it = new Iterator<T>() {

private int currentIndex = 0;
private boolean hasNext = true;

private BackoffRetryUtilityDelegate delegate = new BackoffRetryUtilityDelegate() {

@Override
public void apply() throws Exception {
if (fetchNextBlock() <= 0) {
List<T> results = fetchNextBlock();
if (results == null || results.size() <= 0) {
hasNext = false;
}
}
Expand All @@ -95,11 +86,11 @@ public void apply() throws Exception {
*/
@Override
public boolean hasNext() {
if (this.currentIndex >= items.size() && this.hasNext) {
if (currentIndex >= items.size() && hasNext) {
BackoffRetryUtility.execute(this.delegate, retryPolicy);
}

return this.hasNext;
return hasNext;
}

/**
Expand All @@ -109,12 +100,12 @@ public boolean hasNext() {
*/
@Override
public T next() {
if (this.currentIndex >= items.size() && this.hasNext) {
if (currentIndex >= items.size() && hasNext) {
BackoffRetryUtility.execute(this.delegate, retryPolicy);
}

if (!this.hasNext) return null;
return items.get(this.currentIndex++);
if (!hasNext) return null;
return items.get(currentIndex++);
}

/**
Expand All @@ -123,8 +114,8 @@ public T next() {
@Override
public void remove() {
if (!hasNext()) throw new NoSuchElementException();
if (this.currentIndex < items.size() - 1) {
items.remove(this.currentIndex);
if (currentIndex < items.size()) {
items.remove(currentIndex);
}
}

Expand All @@ -145,16 +136,39 @@ public List<T> toList() {
return list;
}

private int fetchNextBlock()
/**
* Resets the iterable.
*/
public void reset() {
this.hasStarted = false;
this.continuation = null;
this.items = new ArrayList<T>();
this.currentIndex = 0;
this.hasNext = true;
if (this.request != null && this.request.getHeaders() != null) {
String continuationToken = this.request.getHeaders().get(HttpConstants.HttpHeaders.CONTINUATION);
if (!StringUtils.isBlank(continuationToken)) {
this.continuation = continuationToken;
}
}
}

/**
* Fetch the next block of query results.
*
* @return the list of fetched resources.
* @throws DocumentClientException
*/
public List<T> fetchNextBlock()
throws DocumentClientException {
DocumentServiceResponse response = null;
List<T> fetchedItems = null;

while (!this.isNullEmptyOrFalse(this.continuation) ||
!this.hasStarted) {
while (!this.isNullEmptyOrFalse(this.continuation) || !this.hasStarted) {
if (!this.isNullEmptyOrFalse(this.continuation)) {
request.getHeaders().put(HttpConstants.HttpHeaders.CONTINUATION,
this.continuation);
request.getHeaders().put(HttpConstants.HttpHeaders.CONTINUATION, this.continuation);
} else {
request.getHeaders().remove(HttpConstants.HttpHeaders.CONTINUATION);
}

if (this.readType == ReadType.Feed) {
Expand All @@ -181,11 +195,10 @@ private int fetchNextBlock()
}
}

return fetchedItems != null ? fetchedItems.size() : 0;
return fetchedItems;
}

private boolean isNullEmptyOrFalse(String s) {
return s == null || s.isEmpty() || s == "false" || s == "False";
}

}
Loading

0 comments on commit dd0fb95

Please sign in to comment.