Skip to content

Commit

Permalink
sql: divide between private and internal queries
Browse files Browse the repository at this point in the history
  • Loading branch information
joacoc committed Nov 3, 2023
1 parent da5ed55 commit 3b5ef2f
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 61 deletions.
84 changes: 45 additions & 39 deletions src/clients/sql.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Pool, QueryResult } from "pg";
import { Pool, PoolClient, PoolConfig, QueryResult } from "pg";
import AdminClient from "./admin";
import CloudClient from "./cloud";
import { Context, EventType } from "../context";
import { Profile } from "../context/config";

export default class SqlClient {
private pool: Promise<Pool>;
private privateClient: Promise<PoolClient>;
private adminClient: AdminClient;
private cloudClient: CloudClient;
private context: Context;
Expand Down Expand Up @@ -45,6 +46,19 @@ export default class SqlClient {

asyncOp();
});

this.privateClient = new Promise((res, rej) => {
const asyncOp = async () => {
try {
const pool = await this.pool;
this.privateClient = pool.connect();
} catch (err) {
console.error("[SqlClient]", "Error awaiting the pool: ", err);
}
};

asyncOp();
});
}

async connectErr() {
Expand Down Expand Up @@ -74,7 +88,7 @@ export default class SqlClient {
return connectionOptions.join(" ");
}

private async buildPoolConfig() {
private async buildPoolConfig(): Promise<PoolConfig> {
console.log("[SqlClient]", "Loading host.");
const hostPromise = this.cloudClient?.getHost(this.profile.region);
console.log("[SqlClient]", "Loading user email.");
Expand All @@ -94,57 +108,49 @@ export default class SqlClient {
password: await this.context.getAppPassword(),
// Disable SSL for tests
ssl: (host && host.startsWith("localhost")) ? false : true,
keepAlive: true
};
}

/**
* Returns a client from the pool.
* The request must call `done()` to free the client after using it..
* Internal queries are intended for exploring cases.
* Like quering the catalog, or information about Materialize.
* Queries goes to the pool, and no client is kept.
* @param statement
* @param values
* @returns query results
*/
async poolClient() {
const pool = await this.pool;
const client = await pool.connect();

return client;
}

async query(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
async internalQuery(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
const pool = await this.pool;
const results = await pool.query(statement, values);

return results;
}

async* cursorQuery(statement: string): AsyncGenerator<QueryResult> {
const pool = await this.pool;
const client = await pool.connect();

try {
const batchSize = 100; // Number of rows to fetch in each batch

await client.query("BEGIN");
await client.query(`DECLARE c CURSOR FOR ${statement}`);
let finish = false;

// Run the query
while (!finish) {
let results: QueryResult = await client.query(`FETCH ${batchSize} c;`);
const { rowCount } = results;
/**
* Private queries are intended for the user. A private query reuses always the same client.
* In this way, it functions like a shell, processing one statement after another.
* @param statement
* @param values
* @returns query results
*/
async privateQuery(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
const client = await this.privateClient;
const results = await client.query(statement, values);

if (rowCount === 0) {
finish = true;
}
return results;
}

yield results;
}
} finally {
try {
await client.query("COMMIT;");
} catch (err) {
console.error("[SqlClient]", "Error commiting transaction.", err);
}
// Release the client and pool resources
client.release();
/**
* Shut down cleanly the pool.
*/
async end() {
try {
const pool = await this.pool;
await pool.end();
} catch (err) {
console.error("[SqlClient]", "Error ending the pool: ", err);
}
}
}
35 changes: 17 additions & 18 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { MaterializeObject, MaterializeSchemaObject } from "../providers/schema"
import AppPassword from "./appPassword";
import LspClient, { ExecuteCommandParseStatement } from "../clients/lsp";
import { Errors, ExtensionError } from "../utilities/error";
import { PoolClient } from "pg";

export enum EventType {
newProfiles,
Expand Down Expand Up @@ -92,6 +91,10 @@ export class Context extends EventEmitter {
} else if (!profile) {
throw new Error(Errors.unconfiguredProfile);
} else {
// Clean the previous [SqlClient] connection.
if (this.sqlClient) {
this.sqlClient.end();
}
this.sqlClient = new SqlClient(this.adminClient, this.cloudClient, profile, this);
this.sqlClient.connectErr().catch((err) => {
console.error("[Context]", "Sql Client connect err: ", err);
Expand All @@ -101,12 +104,12 @@ export class Context extends EventEmitter {
// Set environment
if (!this.environment) {
const environmentPromises = [
this.query("SHOW CLUSTER;"),
this.query("SHOW DATABASE;"),
this.query("SHOW SCHEMA;"),
this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`),
this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`),
this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`),
this.internalQuery("SHOW CLUSTER;"),
this.internalQuery("SHOW DATABASE;"),
this.internalQuery("SHOW SCHEMA;"),
this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`),
this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`),
this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`),
];

try {
Expand Down Expand Up @@ -141,8 +144,8 @@ export class Context extends EventEmitter {
// TODO: Improve this code.
console.log("[Context]", "Reloading schema.");
const schemaPromises = [
this.query("SHOW SCHEMA;"),
this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`)
this.internalQuery("SHOW SCHEMA;"),
this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`)
];
const [
{ rows: [{ schema }] },
Expand Down Expand Up @@ -206,20 +209,16 @@ export class Context extends EventEmitter {
this.lspClient.stop();
}

async query(text: string, vals?: Array<any>) {
async internalQuery(text: string, vals?: Array<any>) {
const client = await this.getSqlClient();

return await client.query(text, vals);
return await client.internalQuery(text, vals);
}

/**
* This method is NOT recommended to use.
* Make sure to understand clients from the pool lifecycle.
* @returns a client from the pool.
*/
async poolClient(): Promise<PoolClient> {
async privateQuery(text: string, vals?: Array<any>) {
const client = await this.getSqlClient();
return await client.poolClient();

return await client.privateQuery(text, vals);
}

getClusters(): MaterializeObject[] | undefined {
Expand Down
5 changes: 2 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export function activate(vsContext: vscode.ExtensionContext) {
const selection = activeEditor.selection;
const textSelected = activeEditor.document.getText(selection).trim();
const query = textSelected ? textSelected : document.getText();
const fileName = document.fileName;

// Identify the query to not overlap results.
// When a user press many times the run query button
Expand All @@ -73,7 +74,6 @@ export function activate(vsContext: vscode.ExtensionContext) {
// Clean the results by emitting a newQuery event.
context.emit("event", { type: EventType.newQuery, data: { id } });

const poolClient = await context.poolClient();
try {
const statements = await context.parseSql(query);

Expand All @@ -86,7 +86,7 @@ export function activate(vsContext: vscode.ExtensionContext) {
// Benchmark
const startTime = Date.now();
try {
const results = await poolClient.query(statement.sql);
const results = await context.privateQuery(statement.sql);
const endTime = Date.now();
const elapsedTime = endTime - startTime;

Expand Down Expand Up @@ -136,7 +136,6 @@ export function activate(vsContext: vscode.ExtensionContext) {
}, elapsedTime: undefined }});

console.error("[RunSQLCommand]", "Error running statement: ", err);
poolClient.release();
}
} catch (err) {
context.emit("event", { type: EventType.queryResults, data: { id, rows: [], fields: [], error: {
Expand Down
2 changes: 1 addition & 1 deletion src/providers/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export default class DatabaseTreeProvider implements vscode.TreeDataProvider<Nod
}

private async query(text: string, vals?: Array<any>): Promise<Array<any>> {
const { rows } = await this.context.query(text, vals);
const { rows } = await this.context.internalQuery(text, vals);

return rows;
}
Expand Down

0 comments on commit 3b5ef2f

Please sign in to comment.