Skip to content

Commit

Permalink
feat: add offline mode, fix in-process connection edge cases (#708)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Beemer <[email protected]>
  • Loading branch information
beeme1mr authored Jan 8, 2024
1 parent 35ae705 commit 3d56225
Show file tree
Hide file tree
Showing 15 changed files with 522 additions and 245 deletions.
18 changes: 15 additions & 3 deletions libs/providers/flagd/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Server-Side flagd Provider

Flagd is a simple daemon for evaluating feature flags.
It is designed to conform to OpenFeature schema for flag definitions.
This provider is designed to use flagd's [evaluation protocol](https://github.com/open-feature/schemas/blob/main/protobuf/schema/v1/schema.proto), or locally evaluate flags defined in a flagd [flag definition](https://github.com/open-feature/schemas/blob/main/json/flagd-definitions.json).
This repository and package provides the client code for interacting with it via the OpenFeature server-side JavaScript SDK.

## Installation
Expand Down Expand Up @@ -45,7 +44,7 @@ Below are examples of usage patterns.

This is the default mode of operation of the provider.
In this mode, FlagdProvider communicates with flagd via the gRPC protocol.
Flag evaluations take place remotely at the connected [flagd](https://flagd.dev/) instance.
Flag evaluations take place remotely on the connected [flagd](https://flagd.dev/) instance.

```ts
OpenFeature.setProvider(new FlagdProvider())
Expand Down Expand Up @@ -74,6 +73,19 @@ Flag configurations for evaluation are obtained via gRPC protocol using [sync pr

In the above example, the provider expects a flag sync service implementation to be available at `localhost:8013` (default host and port).

In-process resolver can also work in an offline mode.
To enable this mode, you should provide a valid flag configuration file with the option `offlineFlagSourcePath`.

```
OpenFeature.setProvider(new FlagdProvider({
resolverType: 'in-process',
offlineFlagSourcePath: './flags.json',
}))
```

Offline mode uses `fs.watchFile` and polls every 5 seconds for changes to the file.
This mode is useful for local development, test cases, and for offline applications.

### Supported Events

The flagd provider emits `PROVIDER_READY`, `PROVIDER_ERROR` and `PROVIDER_CONFIGURATION_CHANGED` events.
Expand Down
16 changes: 8 additions & 8 deletions libs/providers/flagd/jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
export default {
displayName: 'providers-flagd',
preset: '../../../jest.preset.js',
globals: {
'ts-jest': {
tsconfig: '<rootDir>/tsconfig.spec.json',
},
},
transform: {
'^.+\\.[tj]s$': 'ts-jest',
'^.+\\.[tj]s$': [
'ts-jest',
{
tsconfig: '<rootDir>/tsconfig.spec.json',
},
],
},
moduleFileExtensions: ['ts', 'js', 'html'],
// ignore e2e path
testPathIgnorePatterns: ["/e2e/"],
coverageDirectory: '../../../coverage/libs/providers/flagd'
testPathIgnorePatterns: ['/e2e/'],
coverageDirectory: '../../../coverage/libs/providers/flagd',
};
14 changes: 8 additions & 6 deletions libs/providers/flagd/src/e2e/step-definitions/flagd.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { OpenFeature, ProviderEvents } from '@openfeature/server-sdk';
import { OpenFeature, ProviderEvents, EventDetails } from '@openfeature/server-sdk';
import { defineFeature, loadFeature } from 'jest-cucumber';

// load the feature file.
Expand Down Expand Up @@ -38,22 +38,24 @@ defineFeature(feature, (test) => {

test('Flag change event', ({ given, when, and, then }) => {
let ran = false;
let eventDetails: EventDetails<ProviderEvents> | undefined;

aFlagProviderIsSet(given);
when('a PROVIDER_CONFIGURATION_CHANGED handler is added', () => {
client.addHandler(ProviderEvents.ConfigurationChanged, async () => {
client.addHandler(ProviderEvents.ConfigurationChanged, async (details) => {
ran = true;
eventDetails = details;
});
});
and(/^a flag with key "(.*)" is modified$/, async () => {
// this happens every 1s in the associated container, so wait 2s
await new Promise((resolve) => setTimeout(resolve, 2000));
// this happens every 1s in the associated container, so wait 3s
await new Promise((resolve) => setTimeout(resolve, 3000));
});
then('the PROVIDER_CONFIGURATION_CHANGED handler must run', () => {
expect(ran).toBeTruthy();
});
and(/^the event details must indicate "(.*)" was altered$/, () => {
// not supported
and(/^the event details must indicate "(.*)" was altered$/, (flagName) => {
expect(eventDetails?.flagsChanged).toContain(flagName);
});
});

Expand Down
6 changes: 6 additions & 0 deletions libs/providers/flagd/src/lib/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ export interface Config {
*/
resolverType?: ResolverType;

/**
* File source of flags to be used by offline mode.
* Setting this enables the offline mode of the in-process provider.
*/
offlineFlagSourcePath?: string;

/**
* Selector to be used with flag sync gRPC contract.
*/
Expand Down
7 changes: 4 additions & 3 deletions libs/providers/flagd/src/lib/flagd-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ export class FlagdProvider implements Provider {
})
.catch((err) => {
this._status = ProviderStatus.ERROR;
this.logger?.error(`${this.metadata.name}: error during initialization: ${err.message}, ${err.stack}`);
this.logger?.error(`${this.metadata.name}: error during initialization: ${err.message}`);
this.logger?.debug(err);
throw err;
});
}
Expand Down Expand Up @@ -127,9 +128,9 @@ export class FlagdProvider implements Provider {
this._events.emit(ProviderEvents.Ready);
}

private handleError(): void {
private handleError(message: string): void {
this._status = ProviderStatus.ERROR;
this._events.emit(ProviderEvents.Error);
this._events.emit(ProviderEvents.Error, { message });
}

private handleChanged(flagsChanged: string[]): void {
Expand Down
24 changes: 13 additions & 11 deletions libs/providers/flagd/src/lib/service/grpc/grpc-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class GRPCService implements Service {
connect(
reconnectCallback: () => void,
changedCallback: (flagsChanged: string[]) => void,
disconnectCallback: () => void,
disconnectCallback: (message: string) => void,
): Promise<void> {
return new Promise((resolve, reject) =>
this.listen(reconnectCallback, changedCallback, disconnectCallback, resolve, reject),
Expand Down Expand Up @@ -148,7 +148,7 @@ export class GRPCService implements Service {
private listen(
reconnectCallback: () => void,
changedCallback: (flagsChanged: string[]) => void,
disconnectCallback: () => void,
disconnectCallback: (message: string) => void,
resolveConnect?: () => void,
rejectConnect?: (reason: Error) => void,
) {
Expand Down Expand Up @@ -185,12 +185,14 @@ export class GRPCService implements Service {
if (data && typeof data === 'object' && 'flags' in data && data?.['flags']) {
const flagChangeMessage = data as FlagChangeMessage;
const flagsChanged: string[] = Object.keys(flagChangeMessage.flags || []);
// remove each changed key from cache
flagsChanged.forEach((key) => {
if (this._cache?.delete(key)) {
this.logger?.debug(`${FlagdProvider.name}: evicted key: ${key} from cache.`);
}
});
if (this._cacheEnabled) {
// remove each changed key from cache
flagsChanged.forEach((key) => {
if (this._cache?.delete(key)) {
this.logger?.debug(`${FlagdProvider.name}: evicted key: ${key} from cache.`);
}
});
}
changedCallback(flagsChanged);
}
}
Expand All @@ -199,7 +201,7 @@ export class GRPCService implements Service {
private reconnect(
reconnectCallback: () => void,
changedCallback: (flagsChanged: string[]) => void,
disconnectCallback: () => void,
disconnectCallback: (message: string) => void,
) {
const channel = this._client.getChannel();
channel.watchConnectivityState(channel.getConnectivityState(true), Infinity, () => {
Expand All @@ -210,9 +212,9 @@ export class GRPCService implements Service {
private handleError(
reconnectCallback: () => void,
changedCallback: (flagsChanged: string[]) => void,
disconnectCallback: () => void,
disconnectCallback: (message: string) => void,
) {
disconnectCallback();
disconnectCallback('streaming connection error, will attempt reconnect...');
this.logger?.error(`${FlagdProvider.name}: streaming connection error, will attempt reconnect...`);
this._cache?.clear();
this.reconnect(reconnectCallback, changedCallback, disconnectCallback);
Expand Down
28 changes: 25 additions & 3 deletions libs/providers/flagd/src/lib/service/in-process/data-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,34 @@
* Contract of in-process resolver's data fetcher
*/
export interface DataFetch {
/**
* Connects the data fetcher
*/
connect(
dataFillCallback: (flags: string) => void,
/**
* Callback that runs when data is received from the source
* @param flags The flags from the source
* @returns The flags that have changed
*/
dataCallback: (flags: string) => string[],
/**
* Callback that runs when the connection is re-established
*/
reconnectCallback: () => void,
/**
* Callback that runs when flags have changed
* @param flagsChanged The flags that have changed
*/
changedCallback: (flagsChanged: string[]) => void,
disconnectCallback: () => void,
/**
* Callback that runs when the connection is disconnected
* @param message The reason for the disconnection
*/
disconnectCallback: (message: string) => void,
): Promise<void>;

disconnect(): void;
/**
* Disconnects the data fetcher
*/
disconnect(): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import fs from 'fs';
import { FileFetch } from './file-fetch';
import { FlagdCore } from '@openfeature/flagd-core';
import { Logger } from '@openfeature/core';

jest.mock('fs', () => ({
...jest.requireActual('fs'),
promises: {
readFile: jest.fn(),
},
}));

const dataFillCallbackMock = jest.fn();
const reconnectCallbackMock = jest.fn();
const changedCallbackMock = jest.fn();
const loggerMock: Logger = {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};

describe('FileFetch', () => {
let flagdCore: FlagdCore;
let fileFetch: FileFetch;
let dataFillCallback: (flags: string) => string[];

beforeEach(() => {
flagdCore = new FlagdCore();
fileFetch = new FileFetch('./flags.json', loggerMock);
dataFillCallback = (flags: string) => {
return flagdCore.setConfigurations(flags);
};
});

afterEach(() => {
jest.resetAllMocks();
});

it('should connect to the file and setup the watcher', async () => {
const flags = '{"flags":{"flag":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"off"}}}';
mockReadFile(flags);
const watchMock = jest.fn();

fs.watchFile = watchMock as jest.MockedFunction<typeof fs.watchFile>;

await fileFetch.connect(dataFillCallbackMock, reconnectCallbackMock, changedCallbackMock);

expect(dataFillCallbackMock).toHaveBeenCalledWith(flags);
expect(watchMock).toHaveBeenCalledWith('./flags.json', expect.any(Function));
});

it('should throw because of invalid json', async () => {
const flags = 'this is not JSON';
mockReadFile(flags);
const watchSpy = jest.spyOn(fs, 'watchFile');

await expect(fileFetch.connect(dataFillCallback, reconnectCallbackMock, changedCallbackMock)).rejects.toThrow();
expect(watchSpy).not.toHaveBeenCalled();
});

it('should throw an error if the file is not found', async () => {
const mockReadFile = fs.promises.readFile as jest.MockedFunction<typeof fs.promises.readFile>;
mockReadFile.mockRejectedValue({ code: 'ENOENT' });

await expect(fileFetch.connect(dataFillCallbackMock, reconnectCallbackMock, changedCallbackMock)).rejects.toThrow(
'File not found: ./flags.json',
);
});

it('should throw an error if the file is not accessible', async () => {
const mockReadFile = fs.promises.readFile as jest.MockedFunction<typeof fs.promises.readFile>;
mockReadFile.mockRejectedValue({ code: 'EACCES' });

await expect(fileFetch.connect(dataFillCallbackMock, reconnectCallbackMock, changedCallbackMock)).rejects.toThrow(
'File not accessible: ./flags.json',
);
});

it('should close the watcher on disconnect', async () => {
const watchSpy = jest.spyOn(fs, 'watchFile');
const unwatchSpy = jest.spyOn(fs, 'unwatchFile');

await fileFetch.connect(dataFillCallbackMock, reconnectCallbackMock, changedCallbackMock);
await fileFetch.disconnect();

expect(watchSpy).toHaveBeenCalled();
expect(unwatchSpy).toHaveBeenCalledWith('./flags.json');
});

describe('on file change', () => {
it('should call changedCallback with the changed flags', async () => {
const flags = '{"flags":{"flag":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"off"}}}';
const changedFlags =
'{"flags":{"flag":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"on"}}}';
const mockReadFile = fs.promises.readFile as jest.MockedFunction<typeof fs.promises.readFile>;
mockReadFile.mockResolvedValueOnce(flags);
const watchMock = jest.fn();
fs.watchFile = watchMock as jest.MockedFunction<typeof fs.watchFile>;

await fileFetch.connect(dataFillCallback, reconnectCallbackMock, changedCallbackMock);
mockReadFile.mockResolvedValueOnce(changedFlags);
// Manually call the callback that is passed to fs.watchFile;
await watchMock.mock.calls[0][1]();

expect(changedCallbackMock).toHaveBeenCalledWith(['flag']);
});

it('should call skip changedCallback because no flag has changed', async () => {
const flags = '{"flags":{"flag":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"off"}}}';
const mockReadFile = fs.promises.readFile as jest.MockedFunction<typeof fs.promises.readFile>;
mockReadFile.mockResolvedValue(flags);
const watchMock = jest.fn();
fs.watchFile = watchMock as jest.MockedFunction<typeof fs.watchFile>;

await fileFetch.connect(dataFillCallback, reconnectCallbackMock, changedCallbackMock);
// Manually call the callback that is passed to fs.watchFile;
await watchMock.mock.calls[0][1]();

expect(changedCallbackMock).not.toHaveBeenCalled();
});

it('should log an error if the file could not be read', async () => {
const flags = '{"flags":{"flag":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"off"}}}';
const mockReadFile = fs.promises.readFile as jest.MockedFunction<typeof fs.promises.readFile>;
mockReadFile.mockResolvedValue(flags);
const watchMock = jest.fn();
fs.watchFile = watchMock as jest.MockedFunction<typeof fs.watchFile>;

await fileFetch.connect(dataFillCallback, reconnectCallbackMock, changedCallbackMock);
mockReadFile.mockRejectedValueOnce(new Error('Error reading file'));
// Manually call the callback that is passed to fs.watchFile;
await watchMock.mock.calls[0][1]();

expect(changedCallbackMock).not.toHaveBeenCalled();
expect(loggerMock.error).toHaveBeenCalled();
});
});
});

// Helper function to mock fs.promise.readFile
function mockReadFile(flags: string): void {
const mockReadFile = fs.promises.readFile as jest.MockedFunction<typeof fs.promises.readFile>;
mockReadFile.mockResolvedValue(flags);
}
Loading

0 comments on commit 3d56225

Please sign in to comment.