Skip to content

Commit

Permalink
feat(js/plugins/express): added new express plugin for various expres…
Browse files Browse the repository at this point in the history
…s integrations (#1434)
  • Loading branch information
pavelgj authored Dec 10, 2024
1 parent cab3bd2 commit ec0c95d
Show file tree
Hide file tree
Showing 14 changed files with 948 additions and 29 deletions.
33 changes: 22 additions & 11 deletions js/core/src/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ export interface StreamingFlowConfig<
streamSchema?: S;
}

export interface FlowCallOptions {
export interface FlowCallOptions<S> {
/** @deprecated use {@link context} instead. */
withLocalAuthContext?: unknown;
context?: unknown;
onChunk?: StreamingCallback<S>;
}

/**
Expand All @@ -95,9 +96,12 @@ export interface CallableFlow<
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
> {
(input?: z.infer<I>, opts?: FlowCallOptions): Promise<z.infer<O>>;
(input?: z.infer<I>, opts?: FlowCallOptions<z.infer<S>>): Promise<z.infer<O>>;

stream(input?: z.infer<I>, opts?: FlowCallOptions): StreamingResponse<O, S>;
stream(
input?: z.infer<I>,
opts?: FlowCallOptions<z.infer<S>>
): StreamingResponse<O, S>;

flow: Flow<I, O, S>;
}
Expand All @@ -111,7 +115,10 @@ export interface StreamableFlow<
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
> {
(input?: z.infer<I>, opts?: FlowCallOptions): StreamingResponse<O, S>;
(
input?: z.infer<I>,
opts?: FlowCallOptions<z.infer<S>>
): StreamingResponse<O, S>;
flow: Flow<I, O, S>;
}

Expand Down Expand Up @@ -176,7 +183,7 @@ export class Flow<
async invoke(
input: unknown,
opts: {
streamingCallback?: StreamingCallback<z.infer<S>>;
onChunk?: StreamingCallback<z.infer<S>>;
labels?: Record<string, string>;
context?: unknown;
}
Expand All @@ -185,14 +192,17 @@ export class Flow<
return await this.action.run(input, {
context: opts.context,
telemetryLabels: opts.labels,
onChunk: opts.streamingCallback ?? (() => {}),
onChunk: opts.onChunk ?? (() => {}),
});
}

/**
* Runs the flow. This is used when calling a flow from another flow.
*/
async run(payload?: z.infer<I>, opts?: FlowCallOptions): Promise<z.infer<O>> {
async run(
payload?: z.infer<I>,
opts?: FlowCallOptions<z.infer<S>>
): Promise<z.infer<O>> {
const input = this.inputSchema ? this.inputSchema.parse(payload) : payload;
await this.authPolicy?.(opts?.withLocalAuthContext, payload);

Expand All @@ -204,6 +214,7 @@ export class Flow<

const result = await this.invoke(input, {
context: opts?.context || opts?.withLocalAuthContext,
onChunk: opts?.onChunk,
});
return result.result;
}
Expand All @@ -213,7 +224,7 @@ export class Flow<
*/
stream(
payload?: z.infer<I>,
opts?: FlowCallOptions
opts?: Omit<FlowCallOptions<z.infer<S>>, 'onChunk'>
): StreamingResponse<O, S> {
let chunkStreamController: ReadableStreamController<z.infer<S>>;
const chunkStream = new ReadableStream<z.infer<S>>({
Expand All @@ -233,7 +244,7 @@ export class Flow<
this.invoke(
this.inputSchema ? this.inputSchema.parse(payload) : payload,
{
streamingCallback: ((chunk: z.infer<S>) => {
onChunk: ((chunk: z.infer<S>) => {
chunkStreamController.enqueue(chunk);
}) as S extends z.ZodVoid
? undefined
Expand Down Expand Up @@ -293,7 +304,7 @@ export class Flow<
});
try {
const result = await this.invoke(input, {
streamingCallback: (chunk: z.infer<S>) => {
onChunk: (chunk: z.infer<S>) => {
response.write(
'data: ' + JSON.stringify({ message: chunk }) + streamDelimiter
);
Expand Down Expand Up @@ -484,7 +495,7 @@ export function defineFlow<
(callableFlow as CallableFlow<I, O, S>).flow = flow;
(callableFlow as CallableFlow<I, O, S>).stream = (
input: z.infer<I>,
opts: FlowCallOptions
opts: Omit<FlowCallOptions<z.infer<S>>, 'onChunk'>
): StreamingResponse<O, S> => {
return flow.stream(input, opts);
};
Expand Down
24 changes: 18 additions & 6 deletions js/genkit/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,18 @@ const __flowStreamDelimiter = '\n\n';
* console.log(await response.output);
* ```
*/
export function streamFlow({
export function streamFlow<O = any, S = any>({
url,
input,
headers,
}: {
url: string;
input?: any;
headers?: Record<string, string>;
}) {
}): {
output(): Promise<O>;
stream(): AsyncIterable<S>;
} {
let chunkStreamController: ReadableStreamDefaultController | undefined =
undefined;
const chunkStream = new ReadableStream({
Expand Down Expand Up @@ -97,8 +100,7 @@ async function __flowRunEnvelope({
streamingCallback: (chunk: any) => void;
headers?: Record<string, string>;
}) {
let response;
response = await fetch(url, {
const response = await fetch(url, {
method: 'POST',
body: JSON.stringify({
data: input,
Expand All @@ -109,6 +111,11 @@ async function __flowRunEnvelope({
...headers,
},
});
if (response.status !== 200) {
throw new Error(
`Server returned: ${response.status}: ${await response.text()}`
);
}
if (!response.body) {
throw new Error('Response body is empty');
}
Expand Down Expand Up @@ -163,15 +170,15 @@ async function __flowRunEnvelope({
* console.log(await response);
* ```
*/
export async function runFlow({
export async function runFlow<O = any>({
url,
input,
headers,
}: {
url: string;
input?: any;
headers?: Record<string, string>;
}) {
}): Promise<O> {
const response = await fetch(url, {
method: 'POST',
body: JSON.stringify({
Expand All @@ -182,6 +189,11 @@ export async function runFlow({
...headers,
},
});
if (response.status !== 200) {
throw new Error(
`Server returned: ${response.status}: ${await response.text()}`
);
}
const wrappedDesult = await response.json();
return wrappedDesult.result;
}
6 changes: 3 additions & 3 deletions js/genkit/src/genkit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export class Genkit {
/** Flow server. May be null if the flow server is not enabled in configuration or not started. */
private flowServer: FlowServer | null = null;
/** List of flows that have been registered in this instance. */
private registeredFlows: Flow<any, any, any>[] = [];
readonly flows: Flow<any, any, any>[] = [];

constructor(options?: GenkitOptions) {
this.options = options || {};
Expand Down Expand Up @@ -208,7 +208,7 @@ export class Genkit {
fn: FlowFn<I, O, S>
): CallableFlow<I, O, S> {
const flow = defineFlow(this.registry, config, fn);
this.registeredFlows.push(flow.flow);
this.flows.push(flow.flow);
return flow;
}

Expand All @@ -230,7 +230,7 @@ export class Genkit {
typeof config === 'string' ? { name: config } : config,
fn
);
this.registeredFlows.push(flow.flow);
this.flows.push(flow.flow);
return flow;
}

Expand Down
93 changes: 93 additions & 0 deletions js/plugins/express/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Genkit Express Plugin

This plugin provides utilities for conveninetly exposing Genkit flows and actions via Express HTTP server as REST APIs.

```ts
import { handler } from '@genkit-ai/express';
import express from 'express';

const simpleFlow = ai.defineFlow(
'simpleFlow',
async (input, streamingCallback) => {
const { text } = await ai.generate({
model: gemini15Flash,
prompt: input,
streamingCallback,
});
return text;
}
);

const app = express();
app.use(express.json());

app.post('/simpleFlow', handler(simpleFlow));

app.listen(8080);
```

You can also set auth policies:

```ts
// middleware for handling auth headers.
const authMiddleware = async (req, resp, next) => {
// parse auth headers and convert to auth object.
(req as RequestWithAuth).auth = {
user:
req.header('authorization') === 'open sesame' ? 'Ali Baba' : '40 thieves',
};
next();
};

app.post(
'/simpleFlow',
authMiddleware,
handler(simpleFlow, {
authPolicy: ({ auth }) => {
if (auth.user !== 'Ali Baba') {
throw new Error('not authorized');
}
},
})
);
```

Flows and actions exposed using the `handler` function can be accessed using `genkit/client` library:

```ts
import { runFlow, streamFlow } from 'genkit/client';

const result = await runFlow({
url: `http://localhost:${port}/simpleFlow`,
input: 'say hello',
});

console.log(result); // hello

// set auth headers (when using auth policies)
const result = await runFlow({
url: `http://localhost:${port}/simpleFlow`,
headers: {
Authorization: 'open sesame',
},
input: 'say hello',
});

console.log(result); // hello

// and streamed
const result = streamFlow({
url: `http://localhost:${port}/simpleFlow`,
input: 'say hello',
});
for await (const chunk of result.stream()) {
console.log(chunk);
}
console.log(await result.output());
```

The sources for this package are in the main [Genkit](https://github.com/firebase/genkit) repo. Please file issues and pull requests against that repo.

Usage information and reference details can be found in [Genkit documentation](https://firebase.google.com/docs/genkit).

License: Apache 2.0
54 changes: 54 additions & 0 deletions js/plugins/express/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"name": "@genkit-ai/express",
"description": "Genkit AI framework plugin for Express server",
"keywords": [
"genkit",
"genkit-plugin",
"langchain",
"ai",
"genai",
"generative-ai"
],
"version": "0.9.6",
"type": "commonjs",
"scripts": {
"check": "tsc",
"compile": "tsup-node",
"build:clean": "rimraf ./lib",
"build": "npm-run-all build:clean check compile",
"build:watch": "tsup-node --watch",
"test": "node --import tsx --test tests/*_test.ts",
"test:watch": "node --import tsx --watch --test tests/*_test.ts"
},
"repository": {
"type": "git",
"url": "https://github.com/firebase/genkit.git",
"directory": "js/plugins/express"
},
"author": "genkit",
"license": "Apache-2.0",
"dependencies": {},
"peerDependencies": {
"genkit": "workspace:*",
"express": "^4.21.1"
},
"devDependencies": {
"get-port": "^5.1.0",
"@types/express": "^4.17.21",
"@types/node": "^20.11.16",
"npm-run-all": "^4.1.5",
"rimraf": "^6.0.1",
"tsup": "^8.3.5",
"tsx": "^4.19.2",
"typescript": "^4.9.0"
},
"types": "./lib/index.d.ts",
"exports": {
".": {
"require": "./lib/index.js",
"default": "./lib/index.js",
"import": "./lib/index.mjs",
"types": "./lib/index.d.ts"
}
}
}
Loading

0 comments on commit ec0c95d

Please sign in to comment.