Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade conformance tests to v1.0.3 #1208

Merged
merged 20 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/connect-conformance/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"connectconformance": "bin/connectconformance.cjs"
},
"scripts": {
"generate": "buf generate buf.build/connectrpc/conformance:v1.0.2",
"generate": "buf generate buf.build/connectrpc/conformance:v1.0.3",
"postgenerate": "license-header src/gen",
"prebuild": "rm -rf ./dist/*",
"build": "npm run build:cjs && npm run build:esm",
Expand Down
221 changes: 85 additions & 136 deletions packages/connect-conformance/src/callback-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { createCallbackClient, ConnectError } from "@connectrpc/connect";
import { createCallbackClient, ConnectError, Code } from "@connectrpc/connect";
import type { CallbackClient, Transport } from "@connectrpc/connect";
import {
ClientCompatRequest,
ClientResponseResult,
} from "./gen/connectrpc/conformance/v1/client_compat_pb.js";
import {
UnaryRequest,
Header as ConformanceHeader,
ServerStreamRequest,
ConformancePayload,
UnimplementedRequest,
IdempotentUnaryRequest,
} from "./gen/connectrpc/conformance/v1/service_pb.js";
import {
convertToProtoError,
convertToProtoHeaders,
appendProtoHeaders,
wait,
getCancelTiming,
getRequestHeaders,
getSingleRequestMessage,
setClientErrorResult,
} from "./protocol.js";
import { ConformanceService } from "./gen/connectrpc/conformance/v1/service_connect.js";

Expand Down Expand Up @@ -59,189 +59,138 @@ export function invokeWithCallbackClient(

async function unary(
client: ConformanceClient,
req: ClientCompatRequest,
compatRequest: ClientCompatRequest,
idempotent: boolean = false,
) {
if (req.requestMessages.length !== 1) {
throw new Error("Unary method requires exactly one request message");
}
const msg = req.requestMessages[0];
const uReq = idempotent ? new IdempotentUnaryRequest() : new UnaryRequest();
if (!msg.unpackTo(uReq)) {
throw new Error("Could not unpack request message to unary request");
}
const reqHeader = new Headers();
appendProtoHeaders(reqHeader, req.requestHeaders);
let error: ConnectError | undefined = undefined;
let resHeaders: ConformanceHeader[] = [];
let resTrailers: ConformanceHeader[] = [];
const payloads: ConformancePayload[] = [];

let call = client.unary;
if (idempotent) {
call = client.idempotentUnary;
}

await wait(req.requestDelayMs);
await wait(compatRequest.requestDelayMs);
const result = new ClientResponseResult();
return new Promise<ClientResponseResult>((resolve) => {
call(
uReq,
(err, uRes) => {
const call = idempotent ? client.idempotentUnary : client.unary;
let clientCancelled = false;
const clientCancelFn = call(
getSingleRequestMessage(
compatRequest,
idempotent ? IdempotentUnaryRequest : UnaryRequest,
),
(err, response) => {
// Callback clients swallow client triggered cancellations and never
// call the callback. This will trigger the global error handler and
// fail the process.
if (clientCancelled) {
throw new Error("Aborted requests should not trigger the callback");
}
if (err !== undefined) {
error = ConnectError.from(err);
// We can't distinguish between headers and trailers here, so we just
// add the metadata to both.
//
// But if the headers are already set, we don't need to overwrite them.
resHeaders =
resHeaders.length === 0
? convertToProtoHeaders(error.metadata)
: resHeaders;
resTrailers = convertToProtoHeaders(error.metadata);
setClientErrorResult(result, err);
} else {
payloads.push(uRes.payload!);
result.payloads.push(response.payload!);
}
resolve(
new ClientResponseResult({
payloads: payloads,
responseHeaders: resHeaders,
responseTrailers: resTrailers,
error: convertToProtoError(error),
}),
);
resolve(result);
},
{
headers: reqHeader,
headers: getRequestHeaders(compatRequest),
onHeader(headers) {
resHeaders = convertToProtoHeaders(headers);
result.responseHeaders = convertToProtoHeaders(headers);
},
onTrailer(trailers) {
resTrailers = convertToProtoHeaders(trailers);
result.responseTrailers = convertToProtoHeaders(trailers);
},
},
);
const { afterCloseSendMs } = getCancelTiming(compatRequest);
if (afterCloseSendMs >= 0) {
setTimeout(() => {
clientCancelled = true;
clientCancelFn();
// Callback clients swallow client triggered cancellations and never
// call the callback. We report a fake error to the test runner to let
// it know that the call was cancelled.
result.error = convertToProtoError(
new ConnectError("client cancelled", Code.Canceled),
);
resolve(result);
}, afterCloseSendMs);
}
});
}

async function serverStream(
client: ConformanceClient,
req: ClientCompatRequest,
compatRequest: ClientCompatRequest,
) {
if (req.requestMessages.length !== 1) {
throw new Error("ServerStream method requires exactly one request message");
}
const msg = req.requestMessages[0];
const uReq = new ServerStreamRequest();
if (!msg.unpackTo(uReq)) {
throw new Error(
"Could not unpack request message to server stream request",
);
}
const reqHeader = new Headers();
appendProtoHeaders(reqHeader, req.requestHeaders);
let error: ConnectError | undefined = undefined;
let resHeaders: ConformanceHeader[] = [];
let resTrailers: ConformanceHeader[] = [];
const payloads: ConformancePayload[] = [];
const cancelTiming = getCancelTiming(req);
let count = 0;

await wait(req.requestDelayMs);
const cancelTiming = getCancelTiming(compatRequest);
await wait(compatRequest.requestDelayMs);
const result = new ClientResponseResult();
return new Promise<ClientResponseResult>((resolve) => {
const cancelFn = client.serverStream(
uReq,
(uResp) => {
if (cancelTiming.afterNumResponses === 0) {
cancelFn();
}
payloads.push(uResp.payload!);
count++;
if (count === cancelTiming.afterNumResponses) {
cancelFn();
let clientCancelled = false;
const clientCancelFn = client.serverStream(
getSingleRequestMessage(compatRequest, ServerStreamRequest),
(response) => {
result.payloads.push(response.payload!);
if (result.payloads.length === cancelTiming.afterNumResponses) {
clientCancelled = true;
clientCancelFn();
}
},
(err) => {
// Callback clients call the closeCallback without an error for client
// triggered cancellation. We report a fake error to the test runner to let
// it know that the call was cancelled.
if (clientCancelled) {
if (err !== undefined) {
throw new Error(
"Aborted requests should not trigger the closeCallback with an error",
);
}
result.error = convertToProtoError(
new ConnectError("client cancelled", Code.Canceled),
);
}
if (err !== undefined) {
error = ConnectError.from(err);
// We can't distinguish between headers and trailers here, so we just
// add the metadata to both.
//
// But if the headers are already set, we don't need to overwrite them.
resHeaders =
resHeaders.length === 0
? convertToProtoHeaders(error.metadata)
: resHeaders;
resTrailers = convertToProtoHeaders(error.metadata);
setClientErrorResult(result, err);
}
resolve(
new ClientResponseResult({
responseHeaders: resHeaders,
responseTrailers: resTrailers,
payloads: payloads,
error: convertToProtoError(error),
}),
);
resolve(result);
},
{
headers: reqHeader,
headers: getRequestHeaders(compatRequest),
onHeader(headers) {
resHeaders = convertToProtoHeaders(headers);
result.responseHeaders = convertToProtoHeaders(headers);
},
onTrailer(trailers) {
resTrailers = convertToProtoHeaders(trailers);
result.responseTrailers = convertToProtoHeaders(trailers);
},
},
);
if (cancelTiming.afterCloseSendMs >= 0) {
setTimeout(() => {
clientCancelled = true;
clientCancelFn();
}, cancelTiming.afterCloseSendMs);
}
});
}

async function unimplemented(
client: ConformanceClient,
req: ClientCompatRequest,
compatRequest: ClientCompatRequest,
) {
const msg = req.requestMessages[0];
const unReq = new UnimplementedRequest();
if (!msg.unpackTo(unReq)) {
throw new Error("Could not unpack request message to unary request");
}
const reqHeader = new Headers();
appendProtoHeaders(reqHeader, req.requestHeaders);
let error: ConnectError | undefined = undefined;
let resHeaders: ConformanceHeader[] = [];
let resTrailers: ConformanceHeader[] = [];

const result = new ClientResponseResult();
return new Promise<ClientResponseResult>((resolve) => {
client.unimplemented(
unReq,
getSingleRequestMessage(compatRequest, UnimplementedRequest),
// eslint-disable-next-line @typescript-eslint/no-unused-vars
(err, _) => {
if (err !== undefined) {
error = ConnectError.from(err);
// We can't distinguish between headers and trailers here, so we just
// add the metadata to both.
//
// But if the headers are already set, we don't need to overwrite them.
resHeaders =
resHeaders.length === 0
? convertToProtoHeaders(error.metadata)
: resHeaders;
resTrailers = convertToProtoHeaders(error.metadata);
setClientErrorResult(result, err);
}
resolve(
new ClientResponseResult({
responseHeaders: resHeaders,
responseTrailers: resTrailers,
error: convertToProtoError(error),
}),
);
resolve(result);
},
{
headers: reqHeader,
headers: getRequestHeaders(compatRequest),
onHeader(headers) {
resHeaders = convertToProtoHeaders(headers);
result.responseHeaders = convertToProtoHeaders(headers);
},
onTrailer(trailers) {
resTrailers = convertToProtoHeaders(trailers);
result.responseTrailers = convertToProtoHeaders(trailers);
},
},
);
Expand Down
30 changes: 17 additions & 13 deletions packages/connect-conformance/src/conformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,18 @@ import { execFileSync } from "node:child_process";
import { fetch } from "undici";
import { scripts } from "../package.json";

// Extract conformance runner version from the `generate` script
const [, version] = /conformance:(v\d+\.\d+\.\d+)/.exec(scripts.generate) ?? [
"?",
];

const downloadUrl = `https://github.com/connectrpc/conformance/releases/download/${version}`;

export async function run() {
const { archive, bin } = getArtifactNameForEnv();
const tempDir = getTempDir();
// Extract conformance runner version from the `generate` script
const [, version] = /conformance:(v\d+\.\d+\.\d+)/.exec(scripts.generate) ?? [
"?",
];
const { archive, bin } = getArtifactNameForEnv(version);
const tempDir = getTempDir(version);
const binPath = joinPath(tempDir, bin);
if (!existsSync(binPath)) {
const downloadUrl = `https://github.com/connectrpc/conformance/releases/download/${version}/${archive}`;
const archivePath = joinPath(tempDir, archive);
await download(`${downloadUrl}/${archive}`, archivePath);
await download(downloadUrl, archivePath);
await extractBin(archivePath, binPath);
}
execFileSync(binPath, process.argv.slice(2), {
Expand Down Expand Up @@ -101,15 +99,21 @@ async function extractBin(archivePath: string, binPath: string) {
);
}

function getTempDir() {
const tempDir = joinPath(process.env["TEMP"] ?? os.tmpdir(), "conformance");
function getTempDir(version: string) {
const tempDir = joinPath(
process.env["TEMP"] ?? os.tmpdir(),
`conformance-${version}`,
);
if (!existsSync(tempDir)) {
mkdirSync(tempDir, { recursive: true });
}
return tempDir;
}

function getArtifactNameForEnv(): { archive: string; bin: string } {
function getArtifactNameForEnv(version: string): {
archive: string;
bin: string;
} {
let build = "";
let ext = ".tar.gz";
let bin = "connectconformance";
Expand Down
Loading