Skip to content

Commit

Permalink
UBERF-9017: Reduce createTable calls (#7550)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo authored Dec 25, 2024
1 parent 118ede7 commit 84020b3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
2 changes: 1 addition & 1 deletion dev/tool/src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async function moveWorkspace (
tables = tables.filter((t) => include.has(t))
}

await createTables(new MeasureMetricsContext('', {}), pgClient, tables)
await createTables(new MeasureMetricsContext('', {}), pgClient, '', tables)
const token = generateToken(systemAccountEmail, wsId)
const endpoint = await getTransactorEndpoint(token, 'external')
const connection = (await connect(endpoint, wsId, undefined, {
Expand Down
13 changes: 11 additions & 2 deletions server/postgres/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import core, {
type ModelDb,
type ObjQueryType,
type Projection,
RateLimiter,
type Ref,
type ReverseLookups,
type SessionData,
Expand Down Expand Up @@ -1506,13 +1507,18 @@ interface OperationBulk {
mixins: TxMixin<Doc, Doc>[]
}

const initRateLimit = new RateLimiter(1)

class PostgresAdapter extends PostgresAdapterBase {
async init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise<void> {
let resultDomains = domains ?? this.hierarchy.domains()
if (excludeDomains !== undefined) {
resultDomains = resultDomains.filter((it) => !excludeDomains.includes(it))
}
await createTables(ctx, this.client, resultDomains)
const url = this.refClient.url()
await initRateLimit.exec(async () => {
await createTables(ctx, this.client, url, resultDomains)
})
this._helper.domains = new Set(resultDomains as Domain[])
}

Expand Down Expand Up @@ -1789,7 +1795,10 @@ class PostgresAdapter extends PostgresAdapterBase {
class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
async init (ctx: MeasureContext, domains?: string[], excludeDomains?: string[]): Promise<void> {
const resultDomains = domains ?? [DOMAIN_TX, DOMAIN_MODEL_TX]
await createTables(ctx, this.client, resultDomains)
await initRateLimit.exec(async () => {
const url = this.refClient.url()
await createTables(ctx, this.client, url, resultDomains)
})
this._helper.domains = new Set(resultDomains as Domain[])
}

Expand Down
34 changes: 24 additions & 10 deletions server/postgres/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ export const NumericTypes = [
core.class.Collection
]

export async function createTables (ctx: MeasureContext, client: postgres.Sql, domains: string[]): Promise<void> {
const filtered = domains.filter((d) => !loadedDomains.has(d))
export async function createTables (
ctx: MeasureContext,
client: postgres.Sql,
url: string,
domains: string[]
): Promise<void> {
const filtered = domains.filter((d) => !loadedDomains.has(url + translateDomain(d)))
if (filtered.length === 0) {
return
}
Expand All @@ -90,17 +95,15 @@ export async function createTables (ctx: MeasureContext, client: postgres.Sql, d
const exists = new Set(tables.map((it) => it.table_name))

await retryTxn(client, async (client) => {
await ctx.with('load-schemas', {}, () =>
getTableSchema(
client,
mapped.filter((it) => exists.has(it))
)
)
const domainsToLoad = mapped.filter((it) => exists.has(it))
if (domainsToLoad.length > 0) {
await ctx.with('load-schemas', {}, () => getTableSchema(client, domainsToLoad))
}
for (const domain of mapped) {
if (!exists.has(domain)) {
await ctx.with('create-table', {}, () => createTable(client, domain))
}
loadedDomains.add(domain)
loadedDomains.add(url + domain)
}
})
}
Expand Down Expand Up @@ -188,20 +191,27 @@ export async function shutdown (): Promise<void> {
export interface PostgresClientReference {
getClient: () => Promise<postgres.Sql>
close: () => void

url: () => string
}

class PostgresClientReferenceImpl {
count: number
client: postgres.Sql | Promise<postgres.Sql>

constructor (
readonly connectionString: string,
client: postgres.Sql | Promise<postgres.Sql>,
readonly onclose: () => void
) {
this.count = 0
this.client = client
}

url (): string {
return this.connectionString
}

async getClient (): Promise<postgres.Sql> {
if (this.client instanceof Promise) {
this.client = await this.client
Expand Down Expand Up @@ -233,6 +243,10 @@ export class ClientRef implements PostgresClientReference {
clientRefs.set(this.id, this)
}

url (): string {
return this.client.url()
}

closed = false
async getClient (): Promise<postgres.Sql> {
if (!this.closed) {
Expand Down Expand Up @@ -274,7 +288,7 @@ export function getDBClient (connectionString: string, database?: string): Postg
...extraOptions
})

existing = new PostgresClientReferenceImpl(sql, () => {
existing = new PostgresClientReferenceImpl(connectionString, sql, () => {
connections.delete(key)
})
connections.set(key, existing)
Expand Down

0 comments on commit 84020b3

Please sign in to comment.