Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lsp: single client + sequential requests #133

Merged
merged 5 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/clients/lsp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ const SERVER_DECOMPRESS_PATH: string = path.join(os.tmpdir(), "mz", "bin", "mz-l
/// The final server binary path.
const SERVER_PATH: string = path.join(__dirname, "bin", "mz-lsp-server");


/// Represents the structure a client uses to understand
export interface ExecuteCommandParseStatement {
/// The sql content in the statement
sql: string,
/// The type of statement.
/// Represents the String version of [Statement].
kind: string,
}

/// Represents the response from the parse command.
interface ExecuteCommandParseResponse {
statements: Array<ExecuteCommandParseStatement>
}

/// This class implements the Language Server Protocol (LSP) client for Materialize.
/// The LSP is downloaded for an endpoint an it is out of the bundle. Binaries are heavy-weight
/// and is preferable to download on the first activation.
Expand Down Expand Up @@ -280,4 +295,31 @@ export default class LspClient {
stop() {
this.client && this.client.stop();
}

/**
* Sends a request to the LSP server to execute the parse command.
* The parse command returns the list of statements in an array,
* including their corresponding SQL and type (e.g., select, create_table, etc.).
*
* For more information about commands: https://microsoft.github.io/language-server-protocol/specifications/lsp/3.17/specification/#workspace_executeCommand
*/
async parseSql(sql: string): Promise<Array<ExecuteCommandParseStatement>> {
if (this.client) {
console.log("[LSP]", "Setting on request handler.");

// Setup the handler.
this.client.onRequest("workspace/executeCommand", (...params) => {
console.log("[LSP]", "Response params: ", params);
});

// Send request
const { statements } = await this.client.sendRequest("workspace/executeCommand", { command: "parse", arguments: [
sql
]}) as ExecuteCommandParseResponse;

return statements;
} else {
throw new Error("Client is not yet available.");
}
}
}
77 changes: 47 additions & 30 deletions src/clients/sql.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Pool, QueryResult } from "pg";
import { Pool, PoolClient, PoolConfig, QueryResult } from "pg";
import AdminClient from "./admin";
import CloudClient from "./cloud";
import { Context, EventType } from "../context";
import { Profile } from "../context/config";

export default class SqlClient {
private pool: Promise<Pool>;
private privateClient: Promise<PoolClient>;
private adminClient: AdminClient;
private cloudClient: CloudClient;
private context: Context;
Expand Down Expand Up @@ -45,6 +46,19 @@ export default class SqlClient {

asyncOp();
});

this.privateClient = new Promise((res, rej) => {
const asyncOp = async () => {
try {
const pool = await this.pool;
this.privateClient = pool.connect();
} catch (err) {
console.error("[SqlClient]", "Error awaiting the pool: ", err);
}
};

asyncOp();
});
}

async connectErr() {
Expand Down Expand Up @@ -74,7 +88,7 @@ export default class SqlClient {
return connectionOptions.join(" ");
}

private async buildPoolConfig() {
private async buildPoolConfig(): Promise<PoolConfig> {
console.log("[SqlClient]", "Loading host.");
const hostPromise = this.cloudClient?.getHost(this.profile.region);
console.log("[SqlClient]", "Loading user email.");
Expand All @@ -94,46 +108,49 @@ export default class SqlClient {
password: await this.context.getAppPassword(),
// Disable SSL for tests
ssl: (host && host.startsWith("localhost")) ? false : true,
keepAlive: true
};
}

async query(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
/**
* Internal queries are intended for exploring cases.
* Like quering the catalog, or information about Materialize.
* Queries goes to the pool, and no client is kept.
* @param statement
* @param values
* @returns query results
*/
async internalQuery(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
const pool = await this.pool;
const results = await pool.query(statement, values);

return results;
}

async* cursorQuery(statement: string): AsyncGenerator<QueryResult> {
const pool = await this.pool;
const client = await pool.connect();

try {
const batchSize = 100; // Number of rows to fetch in each batch

await client.query("BEGIN");
await client.query(`DECLARE c CURSOR FOR ${statement}`);
let finish = false;

// Run the query
while (!finish) {
let results: QueryResult = await client.query(`FETCH ${batchSize} c;`);
const { rowCount } = results;
/**
* Private queries are intended for the user. A private query reuses always the same client.
* In this way, it functions like a shell, processing one statement after another.
* @param statement
* @param values
* @returns query results
*/
async privateQuery(statement: string, values?: Array<any>): Promise<QueryResult<any>> {
const client = await this.privateClient;
const results = await client.query(statement, values);

if (rowCount === 0) {
finish = true;
}
return results;
}

yield results;
}
} finally {
try {
await client.query("COMMIT;");
} catch (err) {
console.error("[SqlClient]", "Error commiting transaction.", err);
}
// Release the client and pool resources
client.release();
/**
* Shut down cleanly the pool.
*/
async end() {
try {
const pool = await this.pool;
await pool.end();
} catch (err) {
console.error("[SqlClient]", "Error ending the pool: ", err);
}
}
}
36 changes: 25 additions & 11 deletions src/context/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AdminClient, CloudClient, SqlClient } from "../clients";
import { Config } from "./config";
import { MaterializeObject, MaterializeSchemaObject } from "../providers/schema";
import AppPassword from "./appPassword";
import LspClient from "../clients/lsp";
import LspClient, { ExecuteCommandParseStatement } from "../clients/lsp";
import { Errors, ExtensionError } from "../utilities/error";

export enum EventType {
Expand Down Expand Up @@ -91,6 +91,10 @@ export class Context extends EventEmitter {
} else if (!profile) {
throw new Error(Errors.unconfiguredProfile);
} else {
// Clean the previous [SqlClient] connection.
if (this.sqlClient) {
this.sqlClient.end();
}
this.sqlClient = new SqlClient(this.adminClient, this.cloudClient, profile, this);
this.sqlClient.connectErr().catch((err) => {
console.error("[Context]", "Sql Client connect err: ", err);
Expand All @@ -100,12 +104,12 @@ export class Context extends EventEmitter {
// Set environment
if (!this.environment) {
const environmentPromises = [
this.query("SHOW CLUSTER;"),
this.query("SHOW DATABASE;"),
this.query("SHOW SCHEMA;"),
this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`),
this.query(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`),
this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`),
this.internalQuery("SHOW CLUSTER;"),
this.internalQuery("SHOW DATABASE;"),
this.internalQuery("SHOW SCHEMA;"),
this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_clusters;`),
this.internalQuery(`SELECT id, name, owner_id as "ownerId" FROM mz_databases;`),
this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`),
];

try {
Expand Down Expand Up @@ -140,8 +144,8 @@ export class Context extends EventEmitter {
// TODO: Improve this code.
console.log("[Context]", "Reloading schema.");
const schemaPromises = [
this.query("SHOW SCHEMA;"),
this.query(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`)
this.internalQuery("SHOW SCHEMA;"),
this.internalQuery(`SELECT id, name, database_id as "databaseId", owner_id as "ownerId" FROM mz_schemas`)
];
const [
{ rows: [{ schema }] },
Expand Down Expand Up @@ -205,10 +209,16 @@ export class Context extends EventEmitter {
this.lspClient.stop();
}

async query(text: string, vals?: Array<any>) {
async internalQuery(text: string, vals?: Array<any>) {
const client = await this.getSqlClient();

return await client.query(text, vals);
return await client.internalQuery(text, vals);
}

async privateQuery(text: string, vals?: Array<any>) {
const client = await this.getSqlClient();

return await client.privateQuery(text, vals);
}

getClusters(): MaterializeObject[] | undefined {
Expand Down Expand Up @@ -259,6 +269,10 @@ export class Context extends EventEmitter {
await this.loadContext();
}

async parseSql(sql: string): Promise<Array<ExecuteCommandParseStatement>> {
return this.lspClient.parseSql(sql);
}

handleErr(err: Error) {
if (err instanceof ExtensionError) {
this.emit("event", { type: EventType.error, message: err.message });
Expand Down
108 changes: 70 additions & 38 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,55 +62,87 @@ export function activate(vsContext: vscode.ExtensionContext) {
const selection = activeEditor.selection;
const textSelected = activeEditor.document.getText(selection).trim();
const query = textSelected ? textSelected : document.getText();

console.log("[RunSQLCommand]", "Running query: ", query);
const fileName = document.fileName;

// Identify the query to not overlap results.
// When a user press many times the run query button
// the results from one query can overlap the results
// from another. We only want to display the last results.
const id = randomUUID();
context.emit("event", { type: EventType.newQuery, data: { id } });

// Benchmark
const startTime = Date.now();
try {
const results = await context.query(query);
const endTime = Date.now();
const elapsedTime = endTime - startTime;

console.log("[RunSQLCommand]", "Results: ", results);
console.log("[RunSQLCommand]", "Emitting results.");

if (Array.isArray(results)) {
context.emit("event", { type: EventType.queryResults, data: { ...results[0], elapsedTime, id } });
} else {
context.emit("event", { type: EventType.queryResults, data: { ...results, elapsedTime, id } });
// Clean the results by emitting a newQuery event.
context.emit("event", { type: EventType.newQuery, data: { id } });

try {
const statements = await context.parseSql(query);

console.log("[RunSQLCommand]", "Running statements: ", statements);

const lastStatement = statements[statements.length - 1];
for (const statement of statements) {
console.log("[RunSQLCommand]", "Running statement: ", statement);

// Benchmark
const startTime = Date.now();
try {
const results = await context.privateQuery(statement.sql);
const endTime = Date.now();
const elapsedTime = endTime - startTime;

console.log("[RunSQLCommand]", "Results: ", results);
console.log("[RunSQLCommand]", "Emitting results.");

// Only display the results from the last statement.
if (lastStatement === statement) {
if (Array.isArray(results)) {
context.emit("event", { type: EventType.queryResults, data: { ...results[0], elapsedTime, id } });
} else {
context.emit("event", { type: EventType.queryResults, data: { ...results, elapsedTime, id } });
}
}
activityLogProvider.addLog({
status: "success",
latency: elapsedTime, // assuming elapsedTime holds the time taken for the query to execute
sql: statement.sql
});
} catch (error: any) {
console.log("[RunSQLCommand]", error.toString());
console.log("[RunSQLCommand]", JSON.stringify(error));
const endTime = Date.now();
const elapsedTime = endTime - startTime;

activityLogProvider.addLog({
status: "failure",
latency: elapsedTime, // assuming elapsedTime holds the time taken before the error was caught
sql: statement.sql
});

context.emit("event", { type: EventType.queryResults, data: { id, rows: [], fields: [], error: {
message: error.toString(),
position: error.position,
query,
}, elapsedTime }});
break;
} finally {
resultsProvider._view?.show();
}
}
} catch (err) {
context.emit("event", { type: EventType.queryResults, data: { id, rows: [], fields: [], error: {
message: "Syntax errors are present. For more information, please refer to the \"Problems\" tab.",
position: undefined,
query,
}, elapsedTime: undefined }});

console.error("[RunSQLCommand]", "Error running statement: ", err);
}
activityLogProvider.addLog({
status: "success",
latency: elapsedTime, // assuming elapsedTime holds the time taken for the query to execute
sql: query
});
} catch (error: any) {
console.log("[RunSQLCommand]", error.toString());
console.log("[RunSQLCommand]", JSON.stringify(error));
const endTime = Date.now();
const elapsedTime = endTime - startTime;

activityLogProvider.addLog({
status: "failure",
latency: elapsedTime, // assuming elapsedTime holds the time taken before the error was caught
sql: query
});

} catch (err) {
context.emit("event", { type: EventType.queryResults, data: { id, rows: [], fields: [], error: {
message: error.toString(),
position: error.position,
message: "Error connecting to Materialize.",
position: undefined,
query,
}, elapsedTime }});
} finally {
resultsProvider._view?.show();
}, elapsedTime: undefined }});
}
});
});
Expand Down
2 changes: 1 addition & 1 deletion src/providers/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export default class DatabaseTreeProvider implements vscode.TreeDataProvider<Nod
}

private async query(text: string, vals?: Array<any>): Promise<Array<any>> {
const { rows } = await this.context.query(text, vals);
const { rows } = await this.context.internalQuery(text, vals);

return rows;
}
Expand Down