Skip to content

Commit

Permalink
feat: export events tsv directly to postgres instance (#2048)
Browse files Browse the repository at this point in the history
* fix: export events tsv directly to postgres instance

* fix: remove unused function

* chore: option to export events to either local file or client

* chore: try new docker path

* chore: divide remote and local paths

* fix: try relative path for mkdir

* ci: try chmod

* ci: run mkdir first

* ci: try with sudo

* fix: file paths

---------

Co-authored-by: Matthew Little <[email protected]>
  • Loading branch information
rafaelcr and zone117x committed Aug 20, 2024
1 parent f6e50f6 commit 53e7a5b
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 64 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ jobs:
- name: Setup integration environment
run: |
sudo ufw disable
mkdir -p src/tests-event-replay/.tmp/local/
sudo chown 999:999 src/tests-event-replay/.tmp/local/
sudo chmod -R 777 src/tests-event-replay/.tmp/local/
docker compose -f docker/docker-compose.dev.postgres.yml up -d
npm run devenv:logs -- --no-color &> docker-compose-logs.txt &
Expand Down
1 change: 0 additions & 1 deletion docker/docker-compose.dev.bitcoind.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3"
services:
bitcoind:
image: "blockstack/bitcoind:v0.20.99.0"
Expand Down
5 changes: 3 additions & 2 deletions docker/docker-compose.dev.postgres.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
version: '3.7'
services:
postgres:
image: "postgres:14"
image: "postgres:15"
ports:
- "5490:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: stacks_blockchain_api
POSTGRES_PORT: 5432
volumes:
- ../src/tests-event-replay/.tmp/local/:/root/
1 change: 0 additions & 1 deletion docker/docker-compose.dev.stacks-blockchain.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '3.7'
services:
stacks-blockchain:
image: 'hirosystems/stacks-api-e2e:stacks3.0-0a2c0e2'
Expand Down
22 changes: 0 additions & 22 deletions src/event-replay/connection-legacy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,3 @@ function getPgClientConfig<TGetPoolConfig extends boolean = false>({
return clientConfig;
}
}

/**
* Creates a postgres pool client connection. If the connection fails due to a transient error, it is retried until successful.
* You'd expect that the pg lib to handle this, but it doesn't, see https://github.com/brianc/node-postgres/issues/1789
*/
export async function connectWithRetry(pool: Pool): Promise<PoolClient> {
for (let retryAttempts = 1; ; retryAttempts++) {
try {
const client = await pool.connect();
return client;
} catch (error: any) {
// Check for transient errors, and retry after 1 second
const pgConnectionError = isPgConnectionError(error);
if (pgConnectionError) {
logger.warn(`${pgConnectionError}, will retry, attempt #${retryAttempts}`);
await timeout(1000);
} else {
throw error;
}
}
}
}
24 changes: 16 additions & 8 deletions src/event-replay/event-replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,24 @@ export async function exportEventsAsTsv(
if (!filePath) {
throw new Error(`A file path should be specified with the --file option`);
}
const resolvedFilePath = path.resolve(filePath);
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
throw new Error(
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
);
const isLocal = filePath.startsWith('local:');
if (isLocal) {
filePath = filePath.replace(/^local:/, '');
if (!path.isAbsolute(filePath)) {
throw new Error(`The file path must be absolute`);
}
} else {
const resolvedFilePath = path.resolve(filePath);
if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) {
throw new Error(
`A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file`
);
}
}
console.log(`Export event data to file: ${resolvedFilePath}`);
const writeStream = fs.createWriteStream(resolvedFilePath);

console.log(`Exporting event data to ${filePath}`);
console.log(`Export started...`);
await exportRawEventRequests(writeStream);
await exportRawEventRequests(filePath, isLocal);
console.log('Export successful.');
}

Expand Down
52 changes: 30 additions & 22 deletions src/event-replay/event-requests.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
import { pipelineAsync } from '../helpers';
import { Readable, Writable } from 'stream';
import { pipeline } from 'node:stream/promises';
import { Readable } from 'stream';
import { DbRawEventRequest } from '../datastore/common';
import { PgServer } from '../datastore/connection';
import { connectPgPool, connectWithRetry } from './connection-legacy';
import { getConnectionArgs, getConnectionConfig, PgServer } from '../datastore/connection';
import { connectPgPool } from './connection-legacy';
import * as pgCopyStreams from 'pg-copy-streams';
import * as PgCursor from 'pg-cursor';
import { connectPostgres } from '@hirosystems/api-toolkit';
import { createWriteStream } from 'node:fs';

export async function exportRawEventRequests(targetStream: Writable): Promise<void> {
const pool = await connectPgPool({
usageName: 'export-raw-events',
pgServer: PgServer.primary,
export async function exportRawEventRequests(filePath: string, local: boolean): Promise<void> {
const sql = await connectPostgres({
usageName: `export-events`,
connectionArgs: getConnectionArgs(PgServer.primary),
connectionConfig: getConnectionConfig(PgServer.primary),
});
const client = await connectWithRetry(pool);
try {
const copyQuery = pgCopyStreams.to(
`
COPY (SELECT id, receive_timestamp, event_path, payload FROM event_observer_requests ORDER BY id ASC)
TO STDOUT ENCODING 'UTF8'
`
);
const queryStream = client.query(copyQuery);
await pipelineAsync(queryStream, targetStream);
} finally {
client.release();
await pool.end();
const copyQuery = sql`
COPY (
SELECT id, receive_timestamp, event_path, payload
FROM event_observer_requests
ORDER BY id ASC
)`;
if (local) {
await sql`${copyQuery}
TO '${sql.unsafe(filePath)}'
WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8')
`;
} else {
const readableStream = await sql`${copyQuery}
TO STDOUT
WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8')
`.readable();
await pipeline(readableStream, createWriteStream(filePath));
}
await sql.end();
}

export async function* getRawEventRequests(
Expand Down Expand Up @@ -61,7 +69,7 @@ export async function* getRawEventRequests(
`);
onStatusUpdate?.('Importing raw event requests into temporary table...');
const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`));
await pipelineAsync(readStream, importStream);
await pipeline(readStream, importStream);
onStatusUpdate?.('Removing any duplicate raw event requests...');
await client.query(`
INSERT INTO temp_event_observer_requests
Expand Down
45 changes: 37 additions & 8 deletions src/tests-event-replay/import-export-tests.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ChainID } from '@stacks/transactions';
import * as fs from 'fs';
import * as path from 'path';
import { getRawEventRequests } from '../event-replay/event-requests';
import { PgWriteStore } from '../datastore/pg-write-store';
import { exportEventsAsTsv, importEventsFromTsv } from '../event-replay/event-replay';
Expand All @@ -25,7 +26,7 @@ describe('import/export tests', () => {
await db?.close();
});

test('event import and export cycle', async () => {
test('event import and export cycle - remote', async () => {
// Import from mocknet TSV
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
const chainTip = await db.getChainTip(db.sql);
Expand All @@ -38,14 +39,42 @@ describe('import/export tests', () => {
);

// Export into temp TSV
const tmpDir = 'src/tests-event-replay/.tmp';
const tmpDir = 'src/tests-event-replay/.tmp/remote';
fs.mkdirSync(tmpDir, { recursive: true });
await exportEventsAsTsv(`${tmpDir}/export.tsv`);

// Re-import with exported TSV and check that chain tip matches.
try {
fs.mkdirSync(tmpDir);
} catch (error: any) {
if (error.code != 'EEXIST') throw error;
await importEventsFromTsv(`${tmpDir}/export.tsv`, 'archival', true, true);
const newChainTip = await db.getChainTip(db.sql);
expect(newChainTip.block_height).toBe(28);
expect(newChainTip.index_block_hash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(newChainTip.block_hash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);
} finally {
fs.rmSync(`${tmpDir}/export.tsv`);
}
const tmpTsvPath = `${tmpDir}/export.tsv`;
await exportEventsAsTsv(tmpTsvPath, true);
});

test('event import and export cycle - local', async () => {
// Import from mocknet TSV
await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true);
const chainTip = await db.getChainTip(db.sql);
expect(chainTip.block_height).toBe(28);
expect(chainTip.index_block_hash).toBe(
'0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533'
);
expect(chainTip.block_hash).toBe(
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);

// Export into temp TSV
const tmpDir = 'src/tests-event-replay/.tmp/local';
fs.mkdirSync(tmpDir, { recursive: true });
await exportEventsAsTsv('local:/root/export.tsv');

// Re-import with exported TSV and check that chain tip matches.
try {
Expand All @@ -59,7 +88,7 @@ describe('import/export tests', () => {
'0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2'
);
} finally {
fs.rmSync(tmpDir, { force: true, recursive: true });
fs.rmSync(`${tmpDir}/export.tsv`);
}
});

Expand Down

0 comments on commit 53e7a5b

Please sign in to comment.