Skip to content

Commit

Permalink
refactor(js/core): refactored flow to be an action (#1433)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelgj authored Dec 5, 2024
1 parent 4b85cb1 commit 19b7b40
Show file tree
Hide file tree
Showing 35 changed files with 408 additions and 664 deletions.
13 changes: 5 additions & 8 deletions genkit-tools/cli/src/commands/flow-batch-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

import { FlowInvokeEnvelopeMessage } from '@genkit-ai/tools-common';
import { logger } from '@genkit-ai/tools-common/utils';
import { Command } from 'commander';
import { readFile, writeFile } from 'fs/promises';
Expand Down Expand Up @@ -59,13 +58,11 @@ export const flowBatchRun = new Command('flow:batchRun')
logger.info(`Running '/flow/${flowName}'...`);
let response = await manager.runAction({
key: `/flow/${flowName}`,
input: {
start: {
input: data,
labels: options.label ? { batchRun: options.label } : undefined,
auth: options.auth ? JSON.parse(options.auth) : undefined,
},
} as FlowInvokeEnvelopeMessage,
input: data,
context: options.auth ? JSON.parse(options.auth) : undefined,
telemetryLabels: options.label
? { batchRun: options.label }
: undefined,
});
logger.info(
'Result:\n' + JSON.stringify(response.result, undefined, ' ')
Expand Down
9 changes: 2 additions & 7 deletions genkit-tools/cli/src/commands/flow-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* limitations under the License.
*/

import { FlowInvokeEnvelopeMessage } from '@genkit-ai/tools-common';
import { logger } from '@genkit-ai/tools-common/utils';
import { Command } from 'commander';
import { writeFile } from 'fs/promises';
Expand Down Expand Up @@ -50,12 +49,8 @@ export const flowRun = new Command('flow:run')
await manager.runAction(
{
key: `/flow/${flowName}`,
input: {
start: {
input: data ? JSON.parse(data) : undefined,
},
auth: options.auth ? JSON.parse(options.auth) : undefined,
} as FlowInvokeEnvelopeMessage,
input: data ? JSON.parse(data) : undefined,
context: options.auth ? JSON.parse(options.auth) : undefined,
},
options.stream
? (chunk) => console.log(JSON.stringify(chunk, undefined, ' '))
Expand Down
10 changes: 2 additions & 8 deletions genkit-tools/common/src/eval/evaluate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
EvalKeyAugments,
EvalRun,
EvalRunKey,
FlowActionInputSchema,
GenerateRequest,
GenerateRequestSchema,
GenerateResponseSchema,
Expand Down Expand Up @@ -257,15 +256,10 @@ async function runFlowAction(params: {
const { manager, actionRef, testCase, auth } = { ...params };
let state: InferenceRunState;
try {
const flowInput = FlowActionInputSchema.parse({
start: {
input: testCase.input,
},
auth: auth ? JSON.parse(auth) : undefined,
});
const runActionResponse = await manager.runAction({
key: actionRef,
input: flowInput,
input: testCase.input,
context: auth ? JSON.parse(auth) : undefined,
});
state = {
...testCase,
Expand Down
23 changes: 23 additions & 0 deletions genkit-tools/common/src/manager/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {

const STREAM_DELIMITER = '\n';
const HEALTH_CHECK_INTERVAL = 5000;
export const GENKIT_REFLECTION_API_SPEC_VERSION = 1;

interface RuntimeManagerOptions {
/** URL of the telemetry server. */
Expand Down Expand Up @@ -278,6 +279,7 @@ export class RuntimeManager {
try {
await axios.post(`${runtime.reflectionServerUrl}/api/notify`, {
telemetryServerUrl: this.telemetryServerUrl,
reflectionApiSpecVersion: GENKIT_REFLECTION_API_SPEC_VERSION,
});
} catch (error) {
logger.error(`Failed to notify runtime ${runtime.id}: ${error}`);
Expand Down Expand Up @@ -326,6 +328,27 @@ export class RuntimeManager {
if (isValidRuntimeInfo(runtimeInfo)) {
const fileName = path.basename(filePath);
if (await checkServerHealth(runtimeInfo.reflectionServerUrl)) {
if (
runtimeInfo.reflectionApiSpecVersion !=
GENKIT_REFLECTION_API_SPEC_VERSION
) {
if (
!runtimeInfo.reflectionApiSpecVersion ||
runtimeInfo.reflectionApiSpecVersion <
GENKIT_REFLECTION_API_SPEC_VERSION
) {
logger.warn(
'Genkit CLI is newer than runtime library. Some feature may not be supported. ' +
'Consider upgrading your runtime library version (debug info: expected ' +
`${GENKIT_REFLECTION_API_SPEC_VERSION}, got ${runtimeInfo.reflectionApiSpecVersion}).`
);
} else {
logger.error(
'Genkit CLI version is outdated. Please update `genkit-cli` to the latest version.'
);
process.exit(1);
}
}
this.filenameToRuntimeMap[fileName] = runtimeInfo;
this.idToFileMap[runtimeInfo.id] = fileName;
this.eventEmitter.emit(RuntimeEvent.ADD, runtimeInfo);
Expand Down
4 changes: 4 additions & 0 deletions genkit-tools/common/src/manager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ export interface RuntimeInfo {
timestamp: string;
/** Display name for the project, typically basename of the root folder */
projectName?: string;
/** Genkit runtime library version. Ex: nodejs/0.9.5 or go/0.2.0 */
genkitVersion?: string;
/** Reflection API specification version. Ex: 1 */
reflectionApiSpecVersion?: number;
}

export enum RuntimeEvent {
Expand Down
4 changes: 2 additions & 2 deletions genkit-tools/common/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ export function startServer(manager: RuntimeManager, port: number) {
});

app.post('/api/streamAction', bodyParser.json(), async (req, res) => {
const { key, input } = req.body;
const { key, input, context } = req.body;
res.writeHead(200, {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type',
'Content-Type': 'text/plain',
'Transfer-Encoding': 'chunked',
});

const result = await manager.runAction({ key, input }, (chunk) => {
const result = await manager.runAction({ key, input, context }, (chunk) => {
res.write(JSON.stringify(chunk) + '\n');
});
res.write(JSON.stringify(result));
Expand Down
8 changes: 8 additions & 0 deletions genkit-tools/common/src/types/apis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ export const RunActionRequestSchema = z.object({
.any()
.optional()
.describe('An input with the type that this action expects.'),
context: z
.any()
.optional()
.describe('Additional runtime context data (ex. auth context data).'),
telemetryLabels: z
.record(z.string(), z.string())
.optional()
.describe('Labels to be applied to telemetry data.'),
});

export type RunActionRequest = z.infer<typeof RunActionRequestSchema>;
Expand Down
100 changes: 0 additions & 100 deletions genkit-tools/common/src/types/flow.ts

This file was deleted.

1 change: 0 additions & 1 deletion genkit-tools/common/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export * from './apis';
export * from './env';
export * from './eval';
export * from './evaluators';
export * from './flow';
export * from './model';
export * from './prompt';
export * from './retrievers';
Expand Down
1 change: 0 additions & 1 deletion genkit-tools/common/tests/utils/trace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ export class MockTrace {
let baseFlowSpan = { ...this.BASE_FLOW_SPAN };
baseFlowSpan.attributes['genkit:input'] = JSON.stringify(flowInput);
baseFlowSpan.attributes['genkit:output'] = JSON.stringify(flowOutput);
baseFlowSpan.attributes['genkit:metadata:flow:state'] = baseFlowState;

let wrapperActionSpan = { ...this.WRAPPER_ACTION_SPAN };
wrapperActionSpan.attributes['genkit:input'] = JSON.stringify({
Expand Down
Loading

0 comments on commit 19b7b40

Please sign in to comment.