Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Runner gRPC endpoint #446

Merged
merged 18 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runner/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
**/dist
/node_modules
src/generated/
2,287 changes: 2,011 additions & 276 deletions runner/package-lock.json

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions runner/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"node": "18.17"
},
"scripts": {
"build": "rm -rf ./dist && tsc",
"build": "rm -rf ./dist && npm run codegen && tsc",
"codegen": "rm -rf ./src/generated && proto-loader-gen-types --longs=String --enums=String --defaults --oneofs --grpcLib=@grpc/grpc-js --outDir=src/generated/ protos/runner.proto",
"start": "npm run build && node dist/index.js",
"start:dev": "ts-node ./src/index.ts",
"start:docker": "node dist/index.js",
"test": "node --experimental-vm-modules ./node_modules/.bin/jest",
"test": "npm run codegen && node --experimental-vm-modules ./node_modules/.bin/jest",
darunrs marked this conversation as resolved.
Show resolved Hide resolved
"lint": "eslint -c .eslintrc.js"
},
"keywords": [],
Expand Down Expand Up @@ -43,8 +44,11 @@
},
"dependencies": {
"@aws-sdk/client-s3": "^3.414.0",
"@grpc/grpc-js": "^1.9.12",
"@grpc/proto-loader": "^0.7.10",
"@near-lake/primitives": "^0.1.0",
"express": "^4.18.2",
"long": "^5.2.3",
"node-fetch": "^2.6.11",
"node-sql-parser": "^4.10.0",
"pg": "^8.11.1",
Expand Down
56 changes: 56 additions & 0 deletions runner/protos/runner.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
syntax = "proto3";
package spec;

service Runner {
// Starts a new Runner executor
rpc StartExecutor (StartExecutorRequest) returns (StartExecutorResponse);

// Stops an existing Runner executor
rpc StopExecutor (StopExecutorRequest) returns (StopExecutorResponse);

// Lists all Runner executor
rpc ListExecutors (ListExecutorsRequest) returns (ListExecutorsResponse);
}

// Start Executor Request
message StartExecutorRequest {
string executor_id = 1;
string redis_stream = 2;
string account_id = 3;
string function_name = 4;
string code = 5;
string schema = 6;
}

// Start Executor Response
message StartExecutorResponse {
string executor_id = 1;
}

// Stop Executor Request
message StopExecutorRequest {
string executor_id = 1;
}

// Stop Executor Response
message StopExecutorResponse {
string executor_id = 1;
}

// List Executor Request
message ListExecutorsRequest {
}

// List Executor Response
message ListExecutorsResponse {
// List of all executors, including stopped or crashed ones
repeated ExecutorInfo executors = 1;
}

// Information about a single BlockExecutor instance.
message ExecutorInfo {
string executor_id = 1;
string account_id = 2;
string function_name = 3;
string status = 4;
}
4 changes: 2 additions & 2 deletions runner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { startServer as startMetricsServer } from './metrics';
import RedisClient from './redis-client';
import StreamHandler from './stream-handler';

const STREAM_HANDLER_THROTTLE_MS = 500;

const redisClient = new RedisClient();

startMetricsServer().catch((err) => {
console.error('Failed to start metrics server', err);
});

const STREAM_HANDLER_THROTTLE_MS = 500;

type StreamHandlers = Record<string, StreamHandler>;

void (async function main () {
Expand Down
1 change: 1 addition & 0 deletions runner/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { default } from './runner-server';
15 changes: 15 additions & 0 deletions runner/src/server/runner-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import { type ProtoGrpcType } from '../generated/runner';
import { type RunnerClient } from '../generated/spec/Runner';

// TODO: Replace this client with a Rust client

const PROTO_PATH = 'protos/runner.proto';

const packageDefinition = protoLoader.loadSync(PROTO_PATH);
const runner = (grpc.loadPackageDefinition(packageDefinition) as unknown) as ProtoGrpcType;
// TODO: Use secure creds with env variable Port number
const runnerClient: RunnerClient = new runner.spec.Runner('localhost:50007', grpc.credentials.createInsecure());

export default runnerClient;
32 changes: 32 additions & 0 deletions runner/src/server/runner-server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import getRunnerService from './runner-service';
import { type ProtoGrpcType } from '../generated/runner';
import StreamHandler from '../stream-handler/stream-handler';

const PROTO_PATH = 'protos/runner.proto';

export default function startRunnerServer (): grpc.Server {
const packageDefinition = protoLoader.loadSync(PROTO_PATH);
const runnerProto = (grpc.loadPackageDefinition(
packageDefinition
) as unknown) as ProtoGrpcType;

const server = new grpc.Server();
server.addService(runnerProto.spec.Runner.service, getRunnerService(StreamHandler));
const credentials = grpc.ServerCredentials;

server.bindAsync(
'0.0.0.0:50007', // TODO: Read port from ENV
credentials.createInsecure(), // TODO: Use secure credentials with allow for Coordinator
(err: Error | null, port: number) => {
if (err) {
console.error(`Server error: ${err.message}`);
} else {
console.log(`Server bound on port: ${port}`);
server.start();
}
}
);
return server;
}
Loading