Skip to content

Commit

Permalink
Fix onConnect handler (#7265)
Browse files Browse the repository at this point in the history
Signed-off-by: Denis Bykhov <[email protected]>
  • Loading branch information
BykhovDenis authored Dec 5, 2024
1 parent d13c80b commit ecf4064
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 17 deletions.
16 changes: 8 additions & 8 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export interface ClientConnection extends Storage, FulltextStorage, BackupClient
isConnected: () => boolean

close: () => Promise<void>
onConnect?: (event: ClientConnectEvent) => Promise<void>
onConnect?: (event: ClientConnectEvent, data: any) => Promise<void>

// If hash is passed, will return LoadModelResponse
loadModel: (last: Timestamp, hash?: string) => Promise<Tx[] | LoadModelResponse>
Expand Down Expand Up @@ -264,11 +264,11 @@ export async function createClient (
txHandler(...txBuffer)
txBuffer = undefined

const oldOnConnect: ((event: ClientConnectEvent) => Promise<void>) | undefined = conn.onConnect
conn.onConnect = async (event) => {
const oldOnConnect: ((event: ClientConnectEvent, data: any) => Promise<void>) | undefined = conn.onConnect
conn.onConnect = async (event, data) => {
console.log('Client: onConnect', event)
if (event === ClientConnectEvent.Maintenance) {
await oldOnConnect?.(ClientConnectEvent.Maintenance)
await oldOnConnect?.(ClientConnectEvent.Maintenance, data)
return
}
// Find all new transactions and apply
Expand All @@ -282,15 +282,15 @@ export async function createClient (
model = new ModelDb(hierarchy)

await ctx.with('build-model', {}, (ctx) => buildModel(ctx, loadModelResponse, modelFilter, hierarchy, model))
await oldOnConnect?.(ClientConnectEvent.Upgraded)
await oldOnConnect?.(ClientConnectEvent.Upgraded, data)

// No need to fetch more stuff since upgrade was happened.
return
}

if (event === ClientConnectEvent.Connected) {
// No need to do anything here since we connected.
await oldOnConnect?.(event)
await oldOnConnect?.(event, data)
return
}

Expand Down Expand Up @@ -318,10 +318,10 @@ export async function createClient (
if (atxes.length < transactionThreshold && !needFullRefresh) {
console.log('applying input transactions', atxes.length)
txHandler(...atxes)
await oldOnConnect?.(ClientConnectEvent.Reconnected)
await oldOnConnect?.(ClientConnectEvent.Reconnected, data)
} else {
// We need to trigger full refresh on queries, etc.
await oldOnConnect?.(ClientConnectEvent.Refresh)
await oldOnConnect?.(ClientConnectEvent.Refresh, data)
}
}

Expand Down
8 changes: 6 additions & 2 deletions plugins/client-resources/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class Connection implements ClientConnection {

private helloRecieved: boolean = false

onConnect?: (event: ClientConnectEvent, data: any) => Promise<void>

rpcHandler = new RPCHandler()

constructor (
Expand Down Expand Up @@ -130,6 +132,8 @@ class Connection implements ClientConnection {
this.sessionId = generateId()
}

this.onConnect = opt?.onConnect

this.scheduleOpen(this.ctx, false)
}

Expand Down Expand Up @@ -261,7 +265,7 @@ class Connection implements ClientConnection {
if (resp.id === -1) {
this.delay = 0
if (resp.result?.state === 'upgrading') {
void this.opt?.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true
this.delay = 3
return
Expand Down Expand Up @@ -302,7 +306,7 @@ class Connection implements ClientConnection {
v.reconnect?.()
}

void this.opt?.onConnect?.(
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected,
this.sessionId
)
Expand Down
4 changes: 2 additions & 2 deletions plugins/client-resources/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ export default async () => {
reject(new Error(`Connection timeout, and no connection established to ${endpoint}`))
}
}, connectTimeout)
newOpt.onConnect = (event) => {
newOpt.onConnect = async (event, data) => {
// Any event is fine, it means server is alive.
clearTimeout(connectTO)
resolve()
}
})
}
const clientConnection = connect(url, upgradeHandler, tokenPayload.workspace, tokenPayload.email, newOpt)
const clientConnection = connect(url, upgradeHandler, tokenPayload.workspace, tokenPayload.email, opt)
if (connectPromise !== undefined) {
await connectPromise
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export interface ClientFactoryOptions {
onUpgrade?: () => void
onUnauthorized?: () => void
onArchived?: () => void
onConnect?: (event: ClientConnectEvent, data: any) => void
onConnect?: (event: ClientConnectEvent, data: any) => Promise<void>
ctx?: MeasureContext
onDialTimeout?: () => void | Promise<void>
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/guest-resources/src/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export async function connect (title: string): Promise<Client | undefined> {
})
},
// We need to refresh all active live queries and clear old queries.
onConnect: (event: ClientConnectEvent, data: any) => {
onConnect: async (event: ClientConnectEvent, data: any) => {
console.log('WorkbenchClient: onConnect', event)
try {
if (event === ClientConnectEvent.Connected) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/workbench-resources/src/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export async function connect (title: string): Promise<Client | undefined> {
})
},
// We need to refresh all active live queries and clear old queries.
onConnect: (event: ClientConnectEvent, data: any) => {
onConnect: async (event: ClientConnectEvent, data: any): Promise<void> => {
console.log('WorkbenchClient: onConnect', event)
if (event === ClientConnectEvent.Maintenance) {
if (data != null && data.total !== 0) {
Expand Down
2 changes: 1 addition & 1 deletion services/github/pod-github/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import config from './config'
export async function createPlatformClient (
workspace: string,
timeout: number,
reconnect?: (event: ClientConnectEvent) => void
reconnect?: (event: ClientConnectEvent, data: any) => Promise<void>
): Promise<Client> {
setMetadata(client.metadata.ClientSocketFactory, (url) => {
return new WebSocket(url, {
Expand Down
2 changes: 1 addition & 1 deletion services/github/pod-github/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,7 @@ export class GithubWorker implements IntegrationManager {
ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName })
let client: Client | undefined
try {
client = await createPlatformClient(workspace.name, 30000, (event: ClientConnectEvent) => {
client = await createPlatformClient(workspace.name, 30000, async (event: ClientConnectEvent) => {
reconnect(workspace.name, event)
})

Expand Down

0 comments on commit ecf4064

Please sign in to comment.