Skip to content

Commit

Permalink
feat: run in batches or regular run
Browse files Browse the repository at this point in the history
  • Loading branch information
njuguna-n committed Oct 4, 2024
1 parent e1e3b9c commit a520e30
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 16 deletions.
19 changes: 5 additions & 14 deletions dbt/dbt-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,10 @@ def get_max_timestamp():
return cur.fetchone()[0]

def run_dbt_in_batches():
print("Running dbt in batches")
last_processed_timestamp = get_last_processed_timestamp()
batch_size = int(os.getenv("DBT_BATCH_SIZE") or 10000)
# Start the timer
start_time = time.time()

while True:
print(f"Starting new batch with timestamp: {last_processed_timestamp}")
update_dbt_deps()
result = subprocess.run([
"dbt", "run",
Expand All @@ -213,20 +209,12 @@ def run_dbt_in_batches():
max_timestamp = get_max_timestamp()

if max_timestamp == last_processed_timestamp:
print("No batches to process")
# End the timer
end_time = time.time()

# Calculate the duration
duration = end_time - start_time
print(f"Time taken to process batches: {duration:.2f} seconds")
time.sleep(int(os.getenv("DATAEMON_INTERVAL") or 5))
break
continue

last_processed_timestamp = max_timestamp

def run_dbt():
print("Starting regular dbt run")
while True:
update_models()
run_incremental_models()
Expand All @@ -235,4 +223,7 @@ def run_dbt():
if __name__ == "__main__":
print("Starting dbt run")
setup()
run_dbt_in_batches()
if os.getenv("RUN_DBT_IN_BATCHES"):
run_dbt_in_batches()
else:
run_dbt()
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ services:
- DATAEMON_INTERVAL=${DATAEMON_INTERVAL}
- DBT_PACKAGE_TARBALL_URL=${DBT_PACKAGE_TARBALL_URL}
- DBT_BATCH_SIZE=${DBT_BATCH_SIZE}
- RUN_DBT_IN_BATCHES=${RUN_DBT_IN_BATCHES}
- PYTHONUNBUFFERED=1
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
"main": "",
"scripts": {
"postinstall": "cd couch2pg && npm ci",
"test:e2e": "npm run test:e2e-data && npm run test:e2e-containers && mocha tests/**/*.spec.js --timeout 50000; npm run test:e2e-stop-containers ",
"test:e2e": "npm run test:e2e-regular && npm run test:e2e-batch",
"test:e2e-regular": "npm run test:e2e-data && npm run test:e2e-containers && mocha tests/e2e-test.spec.js --timeout 50000; npm run test:e2e-stop-containers",
"test:e2e-batch": "npm run test:e2e-data && npm run test:e2e-batch-containers && mocha tests/**/*.spec.js --timeout 50000; npm run test:e2e-stop-containers",
"lint": "eslint --color --cache .",
"test:e2e-stop-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml down -v",
"test:e2e-containers": "docker compose --env-file ./tests/.e2e-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml up -d --build --force-recreate",
"test:e2e-batch-containers": "docker compose --env-file ./tests/.e2e-batch-env -f docker-compose.yml -f docker-compose.couchdb.yml -f docker-compose.postgres.yml -f tests/dbt/docker-compose.yml up -d --build --force-recreate",
"test:e2e-data": "cd tests/data && rm -rf ./json_docs && cht csv-to-docs",
"test": "cd couch2pg && npm run test"
},
Expand Down
20 changes: 20 additions & 0 deletions tests/.e2e-batch-env
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
POSTGRES_USER="postgres"
POSTGRES_PASSWORD="postgres"
POSTGRES_DB="data"
POSTGRES_TABLE="medic"
POSTGRES_SCHEMA="v1"
DBT_POSTGRES_USER="postgres"
DBT_POSTGRES_PASSWORD="postgres"
DBT_POSTGRES_SCHEMA="dbt"
DBT_POSTGRES_HOST="postgres"
DBT_PACKAGE_TARBALL_URL="http://dbt-package/dbt/package.tar.gz"
DATAEMON_INTERVAL=0
COUCHDB_USER="medic"
COUCHDB_PASSWORD="password"
COUCHDB_DBS="medic,medic-sentinel"
COUCHDB_HOST="host.docker.internal"
COUCHDB_PORT=5984
COUCHDB_SECURE=false
POSTGRES_HOST=postgres
DBT_BATCH_SIZE=500
RUN_DBT_IN_BATCHES=true
1 change: 0 additions & 1 deletion tests/.e2e-env
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,3 @@ COUCHDB_HOST="host.docker.internal"
COUCHDB_PORT=5984
COUCHDB_SECURE=false
POSTGRES_HOST=postgres
DBT_BATCH_SIZE=500
108 changes: 108 additions & 0 deletions tests/e2e-batch-test.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import chai from 'chai';
import chaiExclude from 'chai-exclude';
chai.use(chaiExclude);
chai.use(chaiExclude);
import { rootConnect } from './utils/postgres-utils.js';
import {
importAllDocs,
docs,
reports,
contacts,
persons,
} from './utils/couchdb-utils.js';

const {
POSTGRES_SCHEMA,
DBT_POSTGRES_SCHEMA: pgSchema,
POSTGRES_TABLE,
} = process.env;

const PGTABLE = `${POSTGRES_SCHEMA}.${POSTGRES_TABLE}`;

const delay = (seconds) => new Promise(resolve => setTimeout(resolve, seconds * 1000));

const waitForDbt = async (pgClient, retry = 30) => {
if (retry <= 0) {
throw new Error('DBT models missing records after 30s');
}

try {
const dbtReports = await pgClient.query(`SELECT * FROM ${pgSchema}.reports`);
const dbtContacts = await pgClient.query(`SELECT * FROM ${pgSchema}.contacts`);
if (dbtReports.rows.length === reports().length && dbtContacts.rows.length === contacts().length) {
return;
}
} catch {
// not done yet
}

await delay(1);
return waitForDbt(pgClient, --retry);
};

describe('Main workflow Test Suite', () => {
let client;

before(async () => {
console.log('Importing docs');
await importAllDocs();
client = await rootConnect();
console.log('Waiting for DBT');
await waitForDbt(client);
});

after(async () => await client?.end());

describe('Initial Sync', () => {
it('should have data in postgres medic table', async () => {
const couchdbTableResult = await client.query(`SELECT * FROM ${PGTABLE}`);
expect(couchdbTableResult.rows.length).to.equal(docs.length);
});

it('should have data in postgres contacts table', async () => {
const contactsTableResult = await client.query(`SELECT * FROM ${pgSchema}.contacts`);
expect(contactsTableResult.rows.length).to.equal(contacts().length);
});

it('should have data in postgres reports table', async () => {
const reportsTableResult = await client.query(`SELECT * FROM ${pgSchema}.reports`);
expect(reportsTableResult.rows.length).to.equal(reports().length);
});

it('should have data in postgres persons table', async () => {
const personsTableResult = await client.query(`SELECT * FROM ${pgSchema}.persons`);
expect(personsTableResult.rows.length).to.equal(persons().length);
});

it('should have the expected data in a record in contact table', async () => {
const contact = contacts().at(0);
const contactTableResult = await client.query(`SELECT * FROM ${pgSchema}.contacts where uuid=$1`, [contact._id]);
expect(contactTableResult.rows.length).to.equal(1);
expect(contactTableResult.rows[0]).to.deep.include({
parent_uuid: contact.parent._id,
name: contact.name,
contact_type: contact.type,
phone: contact.phone
});
});

it('should have the expected data in a record in person table', async () => {
const person = persons().at(0);
const personTableResult = await client.query(`SELECT * FROM ${pgSchema}.persons where uuid=$1`, [person._id]);
expect(personTableResult.rows.length).to.equal(1);
expect(personTableResult.rows[0].date_of_birth).to.equal(person.date_of_birth);
expect(personTableResult.rows[0].sex).to.equal(person.sex);
});

it('should have the expected data in a record in reports table', async () => {
const report = reports().at(0);
const reportTableResult = await client.query(`SELECT * FROM ${pgSchema}.reports where uuid=$1`, [report._id]);
expect(reportTableResult.rows.length).to.equal(1);
expect(reportTableResult.rows[0].doc).excluding(['_rev', '_id']).to.deep.equal(report);
expect(reportTableResult.rows[0].form).to.equal(report.form);
expect(reportTableResult.rows[0].patient_id).to.equal(report.patient_id);
expect(reportTableResult.rows[0].contact_id).to.equal(report.contact._id);
expect(reportTableResult.rows[0].fields).to.deep.equal(report.fields);
});
});
});

0 comments on commit a520e30

Please sign in to comment.