Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

fix: retry peer starting #13

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "inter-scheme-proxy-adapter",
"version": "1.1.0-snapshot.2",
"version": "1.1.2-snapshot.1",
"private": true,
"description": "Schemes Proxy implementation (ISPA)",
"author": "Eugen Klymniuk (geka-evk)",
Expand Down
115 changes: 50 additions & 65 deletions src/domain/InterSchemeProxyAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import config from '../config';
const { checkPeerJwsInterval, pm4mlEnabled } = config.get();

export class InterSchemeProxyAdapter implements IProxyAdapter {
private peerJwsRefreshLoopTimer: NodeJS.Timeout | undefined;
private peerJwsRefreshLoopTimer: {
A: NodeJS.Timeout | undefined;
B: NodeJS.Timeout | undefined;
} = { A: undefined, B: undefined };

constructor(private readonly deps: ISPADeps) {
this.handleProxyRequest = this.handleProxyRequest.bind(this);
}
Expand All @@ -50,13 +54,7 @@ export class InterSchemeProxyAdapter implements IProxyAdapter {
}

async start(): Promise<void> {
if (pm4mlEnabled) {
await this.getAccessTokens();
await this.initControlAgents();
await this.loadInitialCerts();
this.startPeerJwsRefreshLoop();
this.deps.logger.debug('certs and token are ready.');
}
this.startPeer();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we need await here. Otherwise, we start listening to incoming requests without being able to process them (we don't have required state: valid certs fro TLS and access token.

this.deps.logger.debug('Starting httpServers...');

const [isAStarted, isBStarted] = await Promise.all([
Expand All @@ -67,71 +65,58 @@ export class InterSchemeProxyAdapter implements IProxyAdapter {
this.deps.logger.info('ISPA is started', { isAStarted, isBStarted });
}

private timeoutA: NodeJS.Timeout | undefined;
private timeoutB: NodeJS.Timeout | undefined;
private retryAgents: boolean = false;

startPeer() {
const init = async (which: 'A' | 'B') => {
this.deps.logger.info('Starting peer', { peer: which });
const agent = this.deps[`controlAgent${which}`];
const emit = (event: ServerStateEvent) => this.deps[`httpServer${which}`].emit(INTERNAL_EVENTS.serverState, event);
const peerAgent = this.deps[`controlAgent${which === 'A' ? 'B' : 'A'}`];
try {
await this.deps[`authClient${which}`].startAccessTokenUpdates((accessToken: string) => emit({ accessToken }));
await agent.init({
onCert: (certs: ICACerts) => emit({ certs }),
onPeerJWS: (peerJWS: ICAPeerJWSCert[]) => peerAgent.sendPeerJWS(peerJWS),
});
emit({ certs: await agent.loadCerts() });

// @note: This is a fail safe measure to ensure that the peer JWS certs
// are optimistically retrieved, just in case the websocket event is missed.
this.peerJwsRefreshLoopTimer[which] = setInterval(() => agent.triggerFetchPeerJws(), checkPeerJwsInterval);
this.deps.logger.info('Certs and token are ready.', { peer: which });

} catch (error) {
if (this.retryAgents) this[`timeout${which}`] = setTimeout(() => init(which), 60000);
Copy link
Collaborator

@geka-evk geka-evk Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I don't like this approach. Let me try to explain, why

In case of failure in init function, you suggest starting ISPA in any case, right?
So here I see 3 problems:

  1. For, at least, near a minute all incoming requests will fail, coz we don't have proper internal state (no certs or access token). What is the reason to start server if it's "not ready"?
  2. Things might get worse, if the issue in init is permanent (e.g. misconfiguration), so the server will stack in retry loop forever. And all incoming requests will fail, but we won't see it. It's a zombie process - it seems to be working, but can't do anything.
  3. Let's imaging we did some changes and introduced an issue, which causes init failure (unintentionally, of course). In current approach, k8s will try to spin up a new container, see it fails, and won't delete previous version. But in your approach we easily may introduce "regression" of the service.

Aslo we need to keep in mind, that for proxy flow to be working, we need to have both proxy servers (A and B) up and running with proper internal state.

To sum up my point - we shouldn't accept incoming requests unless both proxy servers have required internal starts (certs and access token). And of course, we should have monitoring and alerting of all pods statuses, and in case of CrashLoopBackOff case, it should be a notification, and we have to react accordingly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is with Argo CD, which will stop any subsequent deployments if the proxy is crashing. This means we cannot fully deploy the regional hub until all buffer schemes are working. Also if some buffer scheme is down, this may create problems in the regional hub. Not starting the proxy will result also in error, which will be a generic one, like service not available or similar. It may be better to start the proxy and respond with better error, which includes details about what exactly is missing in the proxy. We can put a limit for the retries, to avoid the potential zombie case.

"we shouldn't accept incoming requests unless both proxy servers have required internal starts" - isn't it better to accept them and return a proper error instead of crash looping, where it is not very clear what the problem is.

Copy link
Collaborator

@geka-evk geka-evk Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shared my point. If proxy doesn't have proper state (certs or accessToken), it's clear it won't be able serve any incoming requests. If so, the whole proxy flow with regional and buffer schemes is not working. I don't get why we need to allow "broken" proxy handle incoming requests and produce tones of error logs. Instead we have to see problems with proxy immediately (CrashLoopBackOff means service has huge problems, please react ASAP), and not wait a minute (or more), and rely on assumption that the issue will be solved "by its own ".

Sorry, but it seems to me we're trying to solve one small issue (with ArgoCD) by introducing a bit more critical issue (check my 3 points in initial comment)

this.deps.logger.error('Failed to start peer', { error, peer: which });
}
};
if (pm4mlEnabled) {
this.retryAgents = true;
init('A');
init('B');
Comment on lines +98 to +99
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await?

}
}

async stop(): Promise<void> {
this.retryAgents = false;
this.timeoutA && clearTimeout(this.timeoutA);
this.timeoutA = undefined;
this.timeoutB && clearTimeout(this.timeoutB);
this.timeoutB = undefined;
this.peerJwsRefreshLoopTimer.A && clearInterval(this.peerJwsRefreshLoopTimer.A);
this.peerJwsRefreshLoopTimer.A = undefined;
this.peerJwsRefreshLoopTimer.B && clearInterval(this.peerJwsRefreshLoopTimer.B);
this.peerJwsRefreshLoopTimer.B = undefined;
this.deps.authClientA.stopUpdates();
this.deps.authClientB.stopUpdates();
this.stopPeerJwsRefreshLoop();
// prettier-ignore
const [isAStopped, isBStopped] = await Promise.all([
this.deps.httpServerA.stop(),
this.deps.httpServerB.stop(),
]);
this.deps.logger.info('ISPA is stopped', { isAStopped, isBStopped });
}

private emitStateEventServerA(event: ServerStateEvent) {
this.deps.httpServerA.emit(INTERNAL_EVENTS.serverState, event);
}

private emitStateEventServerB(event: ServerStateEvent) {
this.deps.httpServerB.emit(INTERNAL_EVENTS.serverState, event);
}

private async getAccessTokens() {
const emitNewTokenA = (accessToken: string) => this.emitStateEventServerA({ accessToken });
const emitNewTokenB = (accessToken: string) => this.emitStateEventServerB({ accessToken });

await Promise.all([
this.deps.authClientA.startAccessTokenUpdates(emitNewTokenA),
this.deps.authClientB.startAccessTokenUpdates(emitNewTokenB),
]);
}

private async initControlAgents() {
const { controlAgentA, controlAgentB } = this.deps;

await Promise.all([
controlAgentA.init({
onCert: (certs: ICACerts) => this.emitStateEventServerA({ certs }),
onPeerJWS: (peerJWS: ICAPeerJWSCert[]) => this.deps.controlAgentB.sendPeerJWS(peerJWS),
}),
controlAgentB.init({
onCert: (certs: ICACerts) => this.emitStateEventServerB({ certs }),
onPeerJWS: (peerJWS: ICAPeerJWSCert[]) => this.deps.controlAgentA.sendPeerJWS(peerJWS),
}),
]);
}

private async loadInitialCerts() {
const [certsA, certsB] = await Promise.all([
this.deps.controlAgentA.loadCerts(),
this.deps.controlAgentB.loadCerts(),
]);

this.emitStateEventServerA({ certs: certsA });
this.emitStateEventServerB({ certs: certsB });
}

// @note: This is a fail safe measure to ensure that the peer JWS certs
// are optimistically retrieved, just in case the websocket event is missed.
private startPeerJwsRefreshLoop() {
this.peerJwsRefreshLoopTimer = setInterval(() => {
this.deps.controlAgentA.triggerFetchPeerJws();
this.deps.controlAgentB.triggerFetchPeerJws();
}, checkPeerJwsInterval);
}

private async stopPeerJwsRefreshLoop() {
clearInterval(this.peerJwsRefreshLoopTimer);
}
}
2 changes: 1 addition & 1 deletion src/domain/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export interface IHttpServer {
}

export interface IAuthClient {
startAccessTokenUpdates: (cb: (token: string) => void) => void;
startAccessTokenUpdates: (cb: (token: string) => void) => Promise<void>;
stopUpdates: () => void;
getOidcToken: () => Promise<OIDCToken>;
}
Expand Down
Loading