diff --git a/packages/shared/lib/services/connection.service.ts b/packages/shared/lib/services/connection.service.ts index ae580e186da..13b3f7712c4 100644 --- a/packages/shared/lib/services/connection.service.ts +++ b/packages/shared/lib/services/connection.service.ts @@ -647,7 +647,7 @@ class ConnectionService { if (connection?.credentials?.type === 'OAUTH2' || connection?.credentials?.type === 'APP' || connection?.credentials?.type === 'OAUTH2_CC') { const { success, error, response } = await this.refreshCredentialsIfNeeded({ - connection, + oldConnection: connection, providerConfig: config, template: template as ProviderTemplateOAuth2, environment_id: environment.id, @@ -811,23 +811,23 @@ class ConnectionService { } private async refreshCredentialsIfNeeded({ - connection, + oldConnection, providerConfig, template, environment_id, instantRefresh = false }: { - connection: Connection; + oldConnection: Connection; providerConfig: ProviderConfig; template: ProviderTemplateOAuth2; environment_id: number; instantRefresh?: boolean; }): Promise> { - const connectionId = connection.connection_id; - const credentials = connection.credentials as OAuth2Credentials; - const providerConfigKey = connection.provider_config_key; + const connectionId = oldConnection.connection_id; + const credentials = oldConnection.credentials as OAuth2Credentials; + const providerConfigKey = oldConnection.provider_config_key; - const shouldRefresh = await this.shouldRefreshCredentials(connection, credentials, providerConfig, template, instantRefresh); + const shouldRefresh = await this.shouldRefreshCredentials(oldConnection, credentials, providerConfig, template, instantRefresh); if (shouldRefresh) { await telemetry.log(LogTypes.AUTH_TOKEN_REFRESH_START, 'Token refresh is being started', LogActionEnum.AUTH, { @@ -836,7 +836,7 @@ class ConnectionService { providerConfigKey, provider: providerConfig.provider }); - // We must ensure that only one refresh is running at a time accross all instances. + // We must ensure that only one refresh is running at a time across all instances. // Using a simple redis entry as a lock with a TTL to ensure it is always released. // NOTES: // - This is not a distributed lock and will not work in a multi-redis environment. @@ -845,10 +845,22 @@ class ConnectionService { const lockKey = `lock:refresh:${environment_id}:${providerConfigKey}:${connectionId}`; try { const ttlInMs = 10000; - const acquitistionTimeoutMs = ttlInMs * 1.2; // giving some extra time for the lock to be released - await this.locking.tryAcquire(lockKey, ttlInMs, acquitistionTimeoutMs); + const acquisitionTimeoutMs = ttlInMs * 1.2; // giving some extra time for the lock to be released + const { tries } = await this.locking.tryAcquire(lockKey, ttlInMs, acquisitionTimeoutMs); + + let freshConnection = oldConnection; + if (tries > 0) { + // In this case, an other refresh was running so we want to be sure we have a brand new refresh_token + // Ultimately, this can also mean this refresh is now useless and we could return, but since we are not sure we proceed again + const res = await this.getConnection(connectionId, providerConfig.unique_key, oldConnection.environment_id); + if (res.error || !res.response) { + const error = new NangoError('unknown_connection'); + return { success: false, error, response: null }; + } + freshConnection = res.response; + } - const { success, error, response: newCredentials } = await this.getNewCredentials(connection, providerConfig, template); + const { success, error, response: newCredentials } = await this.getNewCredentials(freshConnection, providerConfig, template); if (!success || !newCredentials) { await telemetry.log(LogTypes.AUTH_TOKEN_REFRESH_FAILURE, `Token refresh failed, ${error?.message}`, LogActionEnum.AUTH, { environmentId: String(environment_id), @@ -860,8 +872,9 @@ class ConnectionService { return { success, error, response: null }; } - connection.credentials = newCredentials; - await this.updateConnection(connection); + + freshConnection.credentials = newCredentials; + await this.updateConnection(freshConnection); await telemetry.log(LogTypes.AUTH_TOKEN_REFRESH_SUCCESS, 'Token refresh was successful', LogActionEnum.AUTH, { environmentId: String(environment_id), @@ -871,12 +884,12 @@ class ConnectionService { }); return { success: true, error: null, response: { refreshed: shouldRefresh, credentials: newCredentials } }; - } catch (e: any) { - const errorMessage = e.message || 'Unknown error'; + } catch (err: any) { + const errorMessage = err.message || 'Unknown error'; const errorDetails = { message: errorMessage, - name: e.name || 'Error', - stack: e.stack || 'No stack trace' + name: err.name || 'Error', + stack: err.stack || 'No stack trace' }; const errorString = JSON.stringify(errorDetails); @@ -893,7 +906,7 @@ class ConnectionService { return { success: false, error, response: null }; } finally { - this.locking.release(lockKey); + await this.locking.release(lockKey); } } diff --git a/packages/shared/lib/utils/lock/locking.ts b/packages/shared/lib/utils/lock/locking.ts index 395428612ec..3f03cc08a85 100644 --- a/packages/shared/lib/utils/lock/locking.ts +++ b/packages/shared/lib/utils/lock/locking.ts @@ -8,19 +8,22 @@ export class Locking { this.store = store; } - public async tryAcquire(key: string, ttlInMs: number, acquisitionTimeoutMs: number): Promise { + public async tryAcquire(key: string, ttlInMs: number, acquisitionTimeoutMs: number): Promise<{ tries: number }> { if (ttlInMs <= 0) { throw new Error(`lock's TTL must be greater than 0`); } if (acquisitionTimeoutMs <= 0) { throw new Error(`acquisitionTimeoutMs must be greater than 0`); } + const start = Date.now(); + let tries = 0; while (Date.now() - start < acquisitionTimeoutMs) { try { await this.acquire(key, ttlInMs); - return; + return { tries }; } catch { + tries += 1; await new Promise((resolve) => setTimeout(resolve, 50)); } }