Skip to content

Commit

Permalink
Update service code for mocking
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Dec 16, 2023
1 parent 0d3885f commit 8c55f73
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 80 deletions.
5 changes: 3 additions & 2 deletions runner/src/service/runner-server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';
import RunnerService from './runner-service';
import getRunnerService from './runner-service';
import { type ProtoGrpcType } from '../generated/runner';
import StreamHandler from '../stream-handler/stream-handler';

const PROTO_PATH = 'protos/runner.proto';

Expand All @@ -12,7 +13,7 @@ export default function startServer (): grpc.Server {
) as unknown) as ProtoGrpcType;

const server = new grpc.Server();
server.addService(runnerProto.spec.Runner.service, RunnerService);
server.addService(runnerProto.spec.Runner.service, getRunnerService(StreamHandler));
const credentials = grpc.ServerCredentials;

server.bindAsync(
Expand Down
24 changes: 24 additions & 0 deletions runner/src/service/runner-service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import type StreamHandler from '../stream-handler/stream-handler';
import getRunnerService from './runner-service';

describe('Runner gRPC Service', () => {
let genericStreamHandlerType: typeof StreamHandler;
beforeEach(() => {
genericStreamHandlerType = jest.fn().mockImplementation(() => {
return { updateIndexerConfig: jest.fn() };
});
});

it('starts a stream', () => {
const service = getRunnerService(genericStreamHandlerType);
const mockCallback = jest.fn() as unknown as any;
const request = {
request: {
streamId: 'test-stream-id',
}
} as unknown as any;
service.StartStream(request, mockCallback);
// expect(genericStreamHandlerType).toHaveBeenCalledWith(undefined, undefined);
expect(mockCallback).toHaveBeenCalledWith({}, { streamId: 'test-stream-id' });
});
});
166 changes: 88 additions & 78 deletions runner/src/service/runner-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,91 +8,101 @@ import { type StopStreamRequest } from '../generated/spec/StopStreamRequest';
import { type StopStreamResponse } from '../generated/spec/StopStreamResponse';
import { type ListStreamsRequest } from '../generated/spec/ListStreamsRequest';
import { type ListStreamsResponse } from '../generated/spec/ListStreamsResponse';
import StreamHandler from '../stream-handler';
import type StreamHandler from '../stream-handler';
import * as grpc from '@grpc/grpc-js';
import assert from 'assert';

type StreamHandlers = Map<string, StreamHandler>;
const streamHandlers: StreamHandlers = new Map();

const RunnerService: RunnerHandlers = {
StartStream (call: ServerUnaryCall<StartStreamRequest, StartStreamResponse>, callback: sendUnaryData<StartStreamResponse>): void {
console.log('StartStream called', call.request);
// Validate request
const validationResult = validateStartStreamRequest(call.request);
if (validationResult !== null) {
callback(validationResult, null);
return;
}

// Handle request
try {
assert(call.request.streamId !== undefined && call.request.redisStream !== undefined, 'Validation failed to catch invalid start request');
const streamHandler = new StreamHandler(call.request.redisStream); // TODO: Supply validated IndexerConfig
streamHandlers.set(call.request.streamId, streamHandler);
callback(null, { streamId: call.request.streamId });
} catch (error) {
callback(handleInternalError(error), null);
}
},

UpdateStream (call: ServerUnaryCall<UpdateStreamRequest, UpdateStreamResponse>, callback: sendUnaryData<UpdateStreamResponse>): void {
console.log('UpdateStream called', call.request);
// Validate request
const validationResult = validateUpdateStreamRequest(call.request);
if (validationResult !== null) {
callback(validationResult, null);
return;
}

// Handle request
try {
assert(call.request.streamId !== undefined && call.request.indexerConfig !== undefined, 'Validation failed to catch invalid update request');
const config = JSON.parse(call.request.indexerConfig);
streamHandlers.get(call.request.streamId)?.updateIndexerConfig({
account_id: config.account_id,
function_name: config.function_name,
code: config.code,
schema: config.schema
function getRunnerService (StreamHandlerType: typeof StreamHandler): RunnerHandlers {
const RunnerService: RunnerHandlers = {
StartStream (call: ServerUnaryCall<StartStreamRequest, StartStreamResponse>, callback: sendUnaryData<StartStreamResponse>): void {
console.log('StartStream called', call.request);
// Validate request
const validationResult = validateStartStreamRequest(call.request);
if (validationResult !== null) {
callback(validationResult, null);
return;
}

// Handle request
try {
assert(call.request.streamId !== undefined && call.request.redisStream !== undefined && call.request.indexerConfig !== undefined,
'Validation failed to catch invalid start request');
const config = JSON.parse(call.request.indexerConfig);
const streamHandler = new StreamHandlerType(call.request.redisStream, {
account_id: config.account_id,
function_name: config.function_name,
code: config.code,
schema: config.schema
});
streamHandlers.set(call.request.streamId, streamHandler);
callback(null, { streamId: call.request.streamId });
} catch (error) {
callback(handleInternalError(error), null);
}
},

UpdateStream (call: ServerUnaryCall<UpdateStreamRequest, UpdateStreamResponse>, callback: sendUnaryData<UpdateStreamResponse>): void {
console.log('UpdateStream called', call.request);
// Validate request
const validationResult = validateUpdateStreamRequest(call.request);
if (validationResult !== null) {
callback(validationResult, null);
return;
}

// Handle request
try {
assert(call.request.streamId !== undefined && call.request.indexerConfig !== undefined, 'Validation failed to catch invalid update request');
const config = JSON.parse(call.request.indexerConfig);
streamHandlers.get(call.request.streamId)?.updateIndexerConfig({
account_id: config.account_id,
function_name: config.function_name,
code: config.code,
schema: config.schema
});
callback(null, { streamId: call.request.streamId });
} catch (error) {
callback(handleInternalError(error), null);
}
},

StopStream (call: ServerUnaryCall<StopStreamRequest, StopStreamResponse>, callback: sendUnaryData<StopStreamResponse>): void {
console.log('StopStream called', call.request);
// Validate request
const validationResult = validateStopStreamRequest(call.request);
if (validationResult !== null) {
callback(validationResult, null);
return;
}

// Handle request
assert(call.request.streamId !== undefined, 'Validation failed to catch invalid stop request');
const streamId: string = call.request.streamId;
streamHandlers.get(streamId)?.stop()
.then(() => {
callback(null, { streamId });
streamHandlers.delete(streamId);
}).catch(error => {
const grpcError = handleInternalError(error);
callback(grpcError, null);
});
},

ListStreams (call: ServerUnaryCall<ListStreamsRequest, ListStreamsResponse>, callback: sendUnaryData<ListStreamsResponse>): void {
console.log('ListStreams called', call.request);
// TODO: Return more information than just streamId
callback(null, {
streams: Object.keys(streamHandlers).map(stream => {
return { streamId: stream };
})
});
callback(null, { streamId: call.request.streamId });
} catch (error) {
callback(handleInternalError(error), null);
}
},

StopStream (call: ServerUnaryCall<StopStreamRequest, StopStreamResponse>, callback: sendUnaryData<StopStreamResponse>): void {
console.log('StopStream called', call.request);
// Validate request
const validationResult = validateStopStreamRequest(call.request);
if (validationResult !== null) {
callback(validationResult, null);
return;
}

// Handle request
assert(call.request.streamId !== undefined, 'Validation failed to catch invalid stop request');
const streamId: string = call.request.streamId;
streamHandlers.get(streamId)?.stop()
.then(() => {
callback(null, { streamId });
streamHandlers.delete(streamId);
}).catch(error => {
const grpcError = handleInternalError(error);
callback(grpcError, null);
});
},

ListStreams (call: ServerUnaryCall<ListStreamsRequest, ListStreamsResponse>, callback: sendUnaryData<ListStreamsResponse>): void {
console.log('ListStreams called', call.request);
// TODO: Return more information than just streamId
callback(null, {
streams: Object.keys(streamHandlers).map(stream => {
return { streamId: stream };
})
});
}
};
};
return RunnerService;
}

function handleInternalError (error: unknown): any {
let errorMessage = 'An unknown error occurred';
Expand Down Expand Up @@ -218,4 +228,4 @@ function validateStopStreamRequest (request: StartStreamRequest): any | null {
return null;
}

export default RunnerService;
export default getRunnerService;

0 comments on commit 8c55f73

Please sign in to comment.