Skip to content

Commit

Permalink
Parallel Authentication Fix (apache#2551)
Browse files Browse the repository at this point in the history
* fix: failing authentication when multiple initially requests are executed concurrently

* Fix nits in PR for TINKERPOP-3061 regarding sasl authentication.

---------

Co-authored-by: Tiến Nguyễn Khắc <[email protected]>
  • Loading branch information
kenhuuu and tien authored Apr 9, 2024
1 parent ae2860d commit 22db8cf
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 111 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Deprecated `ltrim()` and `rTrim()` in favor of `l_trim()` and `r_trim` in Python.
* Fixed bug in `onCreate` for `mergeV()` where use of the `Cardinality` functions was not properly handled.
* Fixed multiple concurrent initially requests caused authentication to fail.
[[release-3-7-1]]
=== TinkerPop 3.7.1 (November 20, 2023)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -49,7 +51,7 @@ public AbstractClient(final String threadPattern) {

@Override
public void submit(final RequestMessage requestMessage, final Consumer<ResponseMessage> callback) throws Exception {
callbackResponseHandler.callback = callback;
callbackResponseHandler.callbackByRequestId.put(requestMessage.getRequestId(), callback);
writeAndFlush(requestMessage);
}

Expand All @@ -65,7 +67,7 @@ public List<ResponseMessage> submit(final RequestMessage requestMessage) throws
public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage requestMessage) throws Exception {
final List<ResponseMessage> results = new ArrayList<>();
final CompletableFuture<List<ResponseMessage>> f = new CompletableFuture<>();
callbackResponseHandler.callback = response -> {
callbackResponseHandler.callbackByRequestId.put(requestMessage.getRequestId(), response -> {
if (f.isDone())
throw new RuntimeException("A terminating message was already encountered - no more messages should have been received");

Expand All @@ -75,19 +77,19 @@ public CompletableFuture<List<ResponseMessage>> submitAsync(final RequestMessage
if (response.getStatus().getCode().isFinalResponse()) {
f.complete(results);
}
};
});

writeAndFlush(requestMessage);

return f;
}

static class CallbackResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> {
public Consumer<ResponseMessage> callback;
public Map<UUID, Consumer<ResponseMessage>> callbackByRequestId = new HashMap<>();

@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception {
callback.accept(response);
callbackByRequestId.get(response.getRequestId()).accept(response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ describe('DriverRemoteConnection', function () {
});
});

it('should be able to send multiple requests concurrently with valid credentials and parse the response', async function () {
connection = helper.getSecureConnectionWithPlainTextSaslAuthenticator(null, 'stephen', 'password');

const submissions = await Promise.all(
Array.from({ length: 10 }).map(() => connection.submit(new Bytecode().addStep('V', []).addStep('tail', []))),
);

submissions.forEach((response) => {
assert.ok(response);
assert.ok(response.traversers);
});
});

it('should send the request with invalid credentials and parse the response error', function () {
connection = helper.getSecureConnectionWithPlainTextSaslAuthenticator(null, 'Bob', 'password');

Expand Down
Loading

0 comments on commit 22db8cf

Please sign in to comment.