Skip to content

Commit

Permalink
feat: Runner gRPC endpoint (#446)
Browse files Browse the repository at this point in the history
As part of implementing a new control plane to oversee QueryApi
resources, Runner needed endpoints with which control plane can send
commands. This PR adds code to create a new gRPC server with endpoints
to start, stop, or list Runner executors. This provides the control
plane the ability to fully control Runner, and removes the need for
bilateral decision making, which was a problem of the previous design.

The changes will be changed further as needs become more concrete,
closer to integration/release.
  • Loading branch information
darunrs committed Dec 21, 2023
1 parent e63024a commit 38ea895
Show file tree
Hide file tree
Showing 12 changed files with 2,671 additions and 283 deletions.
1 change: 1 addition & 0 deletions runner/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
**/dist
/node_modules
src/generated/
2,287 changes: 2,011 additions & 276 deletions runner/package-lock.json

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions runner/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"node": "18.17"
},
"scripts": {
"build": "rm -rf ./dist && tsc",
"build": "rm -rf ./dist && npm run codegen && tsc",
"codegen": "rm -rf ./src/generated && proto-loader-gen-types --longs=String --enums=String --defaults --oneofs --grpcLib=@grpc/grpc-js --outDir=src/generated/ protos/runner.proto",
"start": "npm run build && node dist/index.js",
"start:dev": "ts-node ./src/index.ts",
"start:docker": "node dist/index.js",
"test": "node --experimental-vm-modules ./node_modules/.bin/jest",
"test": "npm run codegen && node --experimental-vm-modules ./node_modules/.bin/jest",
"lint": "eslint -c .eslintrc.js"
},
"keywords": [],
Expand Down Expand Up @@ -43,8 +44,11 @@
},
"dependencies": {
"@aws-sdk/client-s3": "^3.414.0",
"@grpc/grpc-js": "^1.9.12",
"@grpc/proto-loader": "^0.7.10",
"@near-lake/primitives": "^0.1.0",
"express": "^4.18.2",
"long": "^5.2.3",
"node-fetch": "^2.6.11",
"node-sql-parser": "^4.10.0",
"pg": "^8.11.1",
Expand Down
56 changes: 56 additions & 0 deletions runner/protos/runner.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
syntax = "proto3";
package spec;

service Runner {
// Starts a new Runner executor
rpc StartExecutor (StartExecutorRequest) returns (StartExecutorResponse);

// Stops an existing Runner executor
rpc StopExecutor (StopExecutorRequest) returns (StopExecutorResponse);

// Lists all Runner executor
rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse);
}

// Start Executor Request
message StartExecutorRequest {
string executor_id = 1;
string redis_stream = 2;
string account_id = 3;
string function_name = 4;
string code = 5;
string schema = 6;
}

// Start Executor Response
message StartExecutorResponse {
string executor_id = 1;
}

// Stop Executor Request
message StopExecutorRequest {
string executor_id = 1;
}

// Stop Executor Response
message StopExecutorResponse {
string executor_id = 1;
}

// List Executor Request
message ListExecutorsRequest {
}

// List Executor Response
message ListExecutorsResponse {
// List of all executors, including stopped or crashed ones
repeated ExecutorInfo executors = 1;
}

// Information about a single BlockExecutor instance.
message ExecutorInfo {
string executor_id = 1;
string account_id = 2;
string function_name = 3;
string status = 4;
}
4 changes: 2 additions & 2 deletions runner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { startServer as startMetricsServer } from './metrics';
import RedisClient from './redis-client';
import StreamHandler from './stream-handler';

const STREAM_HANDLER_THROTTLE_MS = 500;

const redisClient = new RedisClient();

startMetricsServer().catch((err) => {
console.error('Failed to start metrics server', err);
});

const STREAM_HANDLER_THROTTLE_MS = 500;

type StreamHandlers = Record<string, StreamHandler>;

void (async function main () {
Expand Down
1 change: 1 addition & 0 deletions runner/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { default } from './runner-server';
15 changes: 15 additions & 0 deletions runner/src/server/runner-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import { type ProtoGrpcType } from '../generated/runner';
import { type RunnerClient } from '../generated/spec/Runner';

// TODO: Replace this client with a Rust client

const PROTO_PATH = 'protos/runner.proto';

const packageDefinition = protoLoader.loadSync(PROTO_PATH);
const runner = (grpc.loadPackageDefinition(packageDefinition) as unknown) as ProtoGrpcType;
// TODO: Use secure creds with env variable Port number
const runnerClient: RunnerClient = new runner.spec.Runner('localhost:50007', grpc.credentials.createInsecure());

export default runnerClient;
32 changes: 32 additions & 0 deletions runner/src/server/runner-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import getRunnerService from './runner-service';
import { type ProtoGrpcType } from '../generated/runner';
import StreamHandler from '../stream-handler/stream-handler';

const PROTO_PATH = 'protos/runner.proto';

export default function startRunnerServer (): grpc.Server {
const packageDefinition = protoLoader.loadSync(PROTO_PATH);
const runnerProto = (grpc.loadPackageDefinition(
packageDefinition
) as unknown) as ProtoGrpcType;

const server = new grpc.Server();
server.addService(runnerProto.spec.Runner.service, getRunnerService(StreamHandler));
const credentials = grpc.ServerCredentials;

server.bindAsync(
'0.0.0.0:50007', // TODO: Read port from ENV
credentials.createInsecure(), // TODO: Use secure credentials with allow for Coordinator
(err: Error | null, port: number) => {
if (err) {
console.error(`Server error: ${err.message}`);
} else {
console.log(`Server bound on port: ${port}`);
server.start();
}
}
);
return server;
}
Loading

0 comments on commit 38ea895

Please sign in to comment.