Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server continuations #3030

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fdb-relational-grpc/fdb-relational-grpc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies {
runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}")
implementation("com.google.protobuf:protobuf-java:${protobufVersion}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}")
testImplementation("org.junit.jupiter:junit-jupiter-params:${junitVersion}")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:${junitVersion}")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public boolean next() throws SQLException {
return ++rowIndex < rows;
}

public boolean hasNext() {
return rowIndex < rows;
}

@Override
public boolean isClosed() throws SQLException {
return this.closed;
Expand Down Expand Up @@ -265,9 +269,10 @@ public RelationalResultSetMetaData getMetaData() throws SQLException {
@Nonnull
@SpotBugsSuppressWarnings("NP") // TODO: Will need to fix null handling
public Continuation getContinuation() throws SQLException {
// Not implemented. We need to thread through the continuation from the query response, but for now,
// returning "null" is enough for the existing tests to pass.
return null;
if (hasNext()) {
throw new SQLException("Continuation can only be returned for the last row");
}
return new RelationalRpcContinuation(delegate.getContinuation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* RelationalRpcContinuation.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.relational.jdbc;

import com.apple.foundationdb.relational.api.Continuation;
import com.apple.foundationdb.relational.jdbc.grpc.v1.RpcContinuation;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Objects;

public class RelationalRpcContinuation implements Continuation {
public static final int CURRENT_VERSION = 1;

@Nonnull
private final RpcContinuation proto;

public RelationalRpcContinuation(@Nonnull RpcContinuation proto) {
this.proto = proto;
}

@Override
public byte[] serialize() {
// Serialize the INTERNAL STATE. This is done since on the client side, when the query is modified to include an
// encoded version of the continuation as a string, we have to use the INTERNAL flavor of the continuation to have
// it properly deserialized by the server. The server only accepts the internal continuation as part of the query.
ScottDugas marked this conversation as resolved.
Show resolved Hide resolved
return proto.getInternalState().toByteArray();
}

@Nullable
@Override
public byte[] getExecutionState() {
if (proto.hasInternalState()) {
return proto.getInternalState().toByteArray();
} else {
return null;
}
}

@Override
public Reason getReason() {
if (proto.hasReason()) {
return TypeConversion.toReason(proto.getReason());
} else {
return null;
}
}

@Override
public boolean atBeginning() {
return proto.getAtBeginning();
}

@Override
public boolean atEnd() {
return proto.getAtEnd();
}

@Nonnull
public RpcContinuation getProto() {
return proto;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof RelationalRpcContinuation)) {
return false;
}
RelationalRpcContinuation that = (RelationalRpcContinuation) o;
return Objects.equals(proto, that.proto);
}

@Override
public int hashCode() {
return Objects.hashCode(proto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.apple.foundationdb.annotation.API;

import com.apple.foundationdb.relational.api.ArrayMetaData;
import com.apple.foundationdb.relational.api.Continuation;
import com.apple.foundationdb.relational.api.StructMetaData;
import com.apple.foundationdb.relational.api.RelationalArray;
import com.apple.foundationdb.relational.api.RelationalResultSet;
Expand All @@ -33,15 +34,16 @@
import com.apple.foundationdb.relational.jdbc.grpc.v1.KeySet;
import com.apple.foundationdb.relational.jdbc.grpc.v1.KeySetValue;
import com.apple.foundationdb.relational.jdbc.grpc.v1.ResultSet;
import com.apple.foundationdb.relational.jdbc.grpc.v1.RpcContinuationReason;
import com.apple.foundationdb.relational.jdbc.grpc.v1.ResultSetMetadata;
import com.apple.foundationdb.relational.jdbc.grpc.v1.RpcContinuation;
import com.apple.foundationdb.relational.jdbc.grpc.v1.column.Array;
import com.apple.foundationdb.relational.jdbc.grpc.v1.column.Column;
import com.apple.foundationdb.relational.jdbc.grpc.v1.column.ColumnMetadata;
import com.apple.foundationdb.relational.jdbc.grpc.v1.column.ListColumn;
import com.apple.foundationdb.relational.jdbc.grpc.v1.column.ListColumnMetadata;
import com.apple.foundationdb.relational.jdbc.grpc.v1.column.Struct;
import com.apple.foundationdb.relational.util.PositionalIndex;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;

Expand Down Expand Up @@ -324,6 +326,61 @@ public static ResultSet toProtobuf(RelationalResultSet relationalResultSet) thro
}
resultSetBuilder.addRow(toRow(relationalResultSet));
}
// Set the continuation after all the rows have been traversed
Continuation existingContinuation = relationalResultSet.getContinuation();
RpcContinuation rpcContinuation = toContinuation(existingContinuation);
resultSetBuilder.setContinuation(rpcContinuation);

return resultSetBuilder.build();
}

private static RpcContinuation toContinuation(@Nonnull Continuation existingContinuation) {
RpcContinuation.Builder builder = RpcContinuation.newBuilder()
.setVersion(RelationalRpcContinuation.CURRENT_VERSION)
.setAtBeginning(existingContinuation.atBeginning())
.setAtEnd(existingContinuation.atEnd());
// Here, we serialize the entire continuation - this will make it easier to recreate the original once
// we send it back
byte[] state = existingContinuation.serialize();
if (state != null) {
builder.setInternalState(ByteString.copyFrom(state));
}
Continuation.Reason reason = existingContinuation.getReason();
if (reason != null) {
builder.setReason(toReason(reason));
}
return builder.build();
}

public static RpcContinuationReason toReason(Continuation.Reason reason) {
if (reason == null) {
return null;
}
switch (reason) {
case TRANSACTION_LIMIT_REACHED:
return RpcContinuationReason.TRANSACTION_LIMIT_REACHED;
case QUERY_EXECUTION_LIMIT_REACHED:
return RpcContinuationReason.QUERY_EXECUTION_LIMIT_REACHED;
case CURSOR_AFTER_LAST:
return RpcContinuationReason.CURSOR_AFTER_LAST;
default:
throw new IllegalStateException("Unrecognized continuation reason: " + reason);
}
}

public static Continuation.Reason toReason(RpcContinuationReason reason) {
if (reason == null) {
return null;
}
switch (reason) {
case TRANSACTION_LIMIT_REACHED:
return Continuation.Reason.TRANSACTION_LIMIT_REACHED;
case QUERY_EXECUTION_LIMIT_REACHED:
return Continuation.Reason.QUERY_EXECUTION_LIMIT_REACHED;
case CURSOR_AFTER_LAST:
return Continuation.Reason.CURSOR_AFTER_LAST;
default:
throw new IllegalStateException("Unrecognized continuation reason: " + reason);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* continuation.proto
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2025 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Continuation support for the GRPC client-server implementation.
// This is modeled after the internal continuation.
// The model has its own "reason" so that it can communicate that at the API level, as well as a wrapper around any
// internal state it needs to communicate to the server in order to recreate the internal continuation instance.
syntax = "proto3";
package grpc.relational.jdbc.v1;
option java_multiple_files = true;
// Put the generated classes into grpc subpackage so I can exclude
// these classes from checkstyle and spotbug checks.
option java_package = "com.apple.foundationdb.relational.jdbc.grpc.v1";
option java_outer_classname = "RpcContinuationProto";

enum RpcContinuationReason {
// Reached a transaction limit, such as byte scan limit, row scan limit or time limit.
TRANSACTION_LIMIT_REACHED = 0;
// Reached a query execution limit, such as the maximum number of rows allowed in a result set.
QUERY_EXECUTION_LIMIT_REACHED = 1;
// All rows were returned.
CURSOR_AFTER_LAST = 2;
}

message RpcContinuation {
int32 version = 1; // Version of the continuation model; can be used to denote support for continuation features over time
optional bytes internal_state = 2; // Wrapper around any server-side continuation returned
optional RpcContinuationReason reason = 3;
bool atBeginning = 4;
bool atEnd = 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ option java_package = "com.apple.foundationdb.relational.jdbc.grpc.v1";
option java_outer_classname = "ResultSetProto";

import "grpc/relational/jdbc/v1/column.proto";
import "grpc/relational/jdbc/v1/continuation.proto";

// Results from Direct Access API Get or Scan or from sql execute.
message ResultSet {
Expand All @@ -41,6 +42,10 @@ message ResultSet {
// Each element in `rows` is a row whose format is defined by metadata. The ith element
// in each row matches the ith field in ResultSetMetadata.
repeated Struct row = 2;

// Optional continuation that can continue the query from the point the current results ended.
// A result set is non-null for the RelationalResultSet API but is not always supported (exception thrown when not).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean "continuation"?

Suggested change
// A result set is non-null for the RelationalResultSet API but is not always supported (exception thrown when not).
// A continuation is non-null for the RelationalResultSet API but is not always supported (exception thrown when not).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

optional RpcContinuation continuation = 3;
}

// Metadata about a [ResultSet][grpc.relational.jdbc.v1.ResultSet]
Expand Down
Loading