diff --git a/runner/src/service/runner-server.ts b/runner/src/service/runner-server.ts index 39a201c1d..402f808d3 100644 --- a/runner/src/service/runner-server.ts +++ b/runner/src/service/runner-server.ts @@ -1,7 +1,8 @@ import * as grpc from '@grpc/grpc-js'; import * as protoLoader from '@grpc/proto-loader'; -import RunnerService from './runner-service'; +import getRunnerService from './runner-service'; import { type ProtoGrpcType } from '../generated/runner'; +import StreamHandler from '../stream-handler/stream-handler'; const PROTO_PATH = 'protos/runner.proto'; @@ -12,7 +13,7 @@ export default function startServer (): grpc.Server { ) as unknown) as ProtoGrpcType; const server = new grpc.Server(); - server.addService(runnerProto.spec.Runner.service, RunnerService); + server.addService(runnerProto.spec.Runner.service, getRunnerService(StreamHandler)); const credentials = grpc.ServerCredentials; server.bindAsync( diff --git a/runner/src/service/runner-service.test.ts b/runner/src/service/runner-service.test.ts new file mode 100644 index 000000000..375fede79 --- /dev/null +++ b/runner/src/service/runner-service.test.ts @@ -0,0 +1,24 @@ +import type StreamHandler from '../stream-handler/stream-handler'; +import getRunnerService from './runner-service'; + +describe('Runner gRPC Service', () => { + let genericStreamHandlerType: typeof StreamHandler; + beforeEach(() => { + genericStreamHandlerType = jest.fn().mockImplementation(() => { + return { updateIndexerConfig: jest.fn() }; + }); + }); + + it('starts a stream', () => { + const service = getRunnerService(genericStreamHandlerType); + const mockCallback = jest.fn() as unknown as any; + const request = { + request: { + streamId: 'test-stream-id', + } + } as unknown as any; + service.StartStream(request, mockCallback); + // expect(genericStreamHandlerType).toHaveBeenCalledWith(undefined, undefined); + expect(mockCallback).toHaveBeenCalledWith({}, { streamId: 'test-stream-id' }); + }); +}); diff --git a/runner/src/service/runner-service.ts b/runner/src/service/runner-service.ts index b3becfa83..56746ba6c 100644 --- a/runner/src/service/runner-service.ts +++ b/runner/src/service/runner-service.ts @@ -8,91 +8,101 @@ import { type StopStreamRequest } from '../generated/spec/StopStreamRequest'; import { type StopStreamResponse } from '../generated/spec/StopStreamResponse'; import { type ListStreamsRequest } from '../generated/spec/ListStreamsRequest'; import { type ListStreamsResponse } from '../generated/spec/ListStreamsResponse'; -import StreamHandler from '../stream-handler'; +import type StreamHandler from '../stream-handler'; import * as grpc from '@grpc/grpc-js'; import assert from 'assert'; type StreamHandlers = Map; const streamHandlers: StreamHandlers = new Map(); -const RunnerService: RunnerHandlers = { - StartStream (call: ServerUnaryCall, callback: sendUnaryData): void { - console.log('StartStream called', call.request); - // Validate request - const validationResult = validateStartStreamRequest(call.request); - if (validationResult !== null) { - callback(validationResult, null); - return; - } - - // Handle request - try { - assert(call.request.streamId !== undefined && call.request.redisStream !== undefined, 'Validation failed to catch invalid start request'); - const streamHandler = new StreamHandler(call.request.redisStream); // TODO: Supply validated IndexerConfig - streamHandlers.set(call.request.streamId, streamHandler); - callback(null, { streamId: call.request.streamId }); - } catch (error) { - callback(handleInternalError(error), null); - } - }, - - UpdateStream (call: ServerUnaryCall, callback: sendUnaryData): void { - console.log('UpdateStream called', call.request); - // Validate request - const validationResult = validateUpdateStreamRequest(call.request); - if (validationResult !== null) { - callback(validationResult, null); - return; - } - - // Handle request - try { - assert(call.request.streamId !== undefined && call.request.indexerConfig !== undefined, 'Validation failed to catch invalid update request'); - const config = JSON.parse(call.request.indexerConfig); - streamHandlers.get(call.request.streamId)?.updateIndexerConfig({ - account_id: config.account_id, - function_name: config.function_name, - code: config.code, - schema: config.schema +function getRunnerService (StreamHandlerType: typeof StreamHandler): RunnerHandlers { + const RunnerService: RunnerHandlers = { + StartStream (call: ServerUnaryCall, callback: sendUnaryData): void { + console.log('StartStream called', call.request); + // Validate request + const validationResult = validateStartStreamRequest(call.request); + if (validationResult !== null) { + callback(validationResult, null); + return; + } + + // Handle request + try { + assert(call.request.streamId !== undefined && call.request.redisStream !== undefined && call.request.indexerConfig !== undefined, + 'Validation failed to catch invalid start request'); + const config = JSON.parse(call.request.indexerConfig); + const streamHandler = new StreamHandlerType(call.request.redisStream, { + account_id: config.account_id, + function_name: config.function_name, + code: config.code, + schema: config.schema + }); + streamHandlers.set(call.request.streamId, streamHandler); + callback(null, { streamId: call.request.streamId }); + } catch (error) { + callback(handleInternalError(error), null); + } + }, + + UpdateStream (call: ServerUnaryCall, callback: sendUnaryData): void { + console.log('UpdateStream called', call.request); + // Validate request + const validationResult = validateUpdateStreamRequest(call.request); + if (validationResult !== null) { + callback(validationResult, null); + return; + } + + // Handle request + try { + assert(call.request.streamId !== undefined && call.request.indexerConfig !== undefined, 'Validation failed to catch invalid update request'); + const config = JSON.parse(call.request.indexerConfig); + streamHandlers.get(call.request.streamId)?.updateIndexerConfig({ + account_id: config.account_id, + function_name: config.function_name, + code: config.code, + schema: config.schema + }); + callback(null, { streamId: call.request.streamId }); + } catch (error) { + callback(handleInternalError(error), null); + } + }, + + StopStream (call: ServerUnaryCall, callback: sendUnaryData): void { + console.log('StopStream called', call.request); + // Validate request + const validationResult = validateStopStreamRequest(call.request); + if (validationResult !== null) { + callback(validationResult, null); + return; + } + + // Handle request + assert(call.request.streamId !== undefined, 'Validation failed to catch invalid stop request'); + const streamId: string = call.request.streamId; + streamHandlers.get(streamId)?.stop() + .then(() => { + callback(null, { streamId }); + streamHandlers.delete(streamId); + }).catch(error => { + const grpcError = handleInternalError(error); + callback(grpcError, null); + }); + }, + + ListStreams (call: ServerUnaryCall, callback: sendUnaryData): void { + console.log('ListStreams called', call.request); + // TODO: Return more information than just streamId + callback(null, { + streams: Object.keys(streamHandlers).map(stream => { + return { streamId: stream }; + }) }); - callback(null, { streamId: call.request.streamId }); - } catch (error) { - callback(handleInternalError(error), null); - } - }, - - StopStream (call: ServerUnaryCall, callback: sendUnaryData): void { - console.log('StopStream called', call.request); - // Validate request - const validationResult = validateStopStreamRequest(call.request); - if (validationResult !== null) { - callback(validationResult, null); - return; } - - // Handle request - assert(call.request.streamId !== undefined, 'Validation failed to catch invalid stop request'); - const streamId: string = call.request.streamId; - streamHandlers.get(streamId)?.stop() - .then(() => { - callback(null, { streamId }); - streamHandlers.delete(streamId); - }).catch(error => { - const grpcError = handleInternalError(error); - callback(grpcError, null); - }); - }, - - ListStreams (call: ServerUnaryCall, callback: sendUnaryData): void { - console.log('ListStreams called', call.request); - // TODO: Return more information than just streamId - callback(null, { - streams: Object.keys(streamHandlers).map(stream => { - return { streamId: stream }; - }) - }); - } -}; + }; + return RunnerService; +} function handleInternalError (error: unknown): any { let errorMessage = 'An unknown error occurred'; @@ -218,4 +228,4 @@ function validateStopStreamRequest (request: StartStreamRequest): any | null { return null; } -export default RunnerService; +export default getRunnerService;