Plan to go away from the prefect dbt core blocks & connection blocks
-
The prefect blocks dont actually add any value and have just increased the complexity through out the app. Lot convoluted relationships need to be formed/used to work with blocks. The only use of the block is to run dbt cli operations and airbyte syncs.
-
Becomes easier/flexible for us to parameterize dbt cli commands.
-
Also, it setups us up for to scale for multiple warehouses and multiple dbt targets. People will be able to create a development pipeline and a prod pipeline.
-
Idea is that we will create block objects in memory, run them and clear them. We wont save them
-
Only
dbt cli profile
andairbyte server
blocks will be created to store warehouse information and airbyte server host details. -
We will also create
secret
block(s) in prefect to store github token if they want to connect to a private repo. -
We can go away with
airbyte server
block & take the creds from env. Since all orgs have the same airbyte server. Or we can create a single server block for the entire app. -
Run git pull command through
ShellOperation
class.shell_op.run()
. To run a shell operation we need- commands
- env
- work_dir
-
Run dbt command through
DbtCoreOperation
class.dbt_op.run()
. To run a core operation we need- commands
- cli profile block name
- project_dir
- work_dir
- profiles_dir
- env if any
-
Run airbyte sync by creating an object of
AirbyteConnection
class and then using run sync functionrun_connection_sync(airbyte_connection)
. To run an airbyte sync we need- airbyte server block name
- connection_id
- timeout
-
Basically we will replace blocks by tasks that we run. We will now have a task lock logic.
-
Each deployment will be running some task which can be locked or not.
-
ddpui_orgprefectblock
will have very minimal use in the entire. No block lock logic here. We will store airbyte server, dbt cli profile and secret blocks here. -
We will have to store the name of the dbt cli profile block name. Add a col in table
ddpui_orgdbt
to store this. Whenever a block runs, we pass directory information & the cli profile block name to run with. -
Either we hardcode a list of commands or create a master table
ddpui_tasks
to store the commands/tasks available in our app. It will have master list of airbyte syncs, git pull & dbt core commands. The table will have the following columns- id
- slug
- command
-
Add a jsonb col
params
inddpui_dataflowblock
to store deployment parameters of each deployment. -
ddpui_dataflowblock
will go away and we create a new table insteadddpui_dataflowtask
. Each deployment will now be mapped to a task from our master table. -
ddpui_blocklock
becomesddpui_locktask
. This will be the table that stores the locks when tasks are running or triggered. Prefect webhook will now update this.
-
The idea is to make sure things currently running dont break. Since we wont be doing pushing UI changes but only backend in the release.
-
Both the old block and new tasks logic should work simultaenously. Until we release the UI changes.
-
We won't touch the old apis. We will version & create new ones. Same goes for the flows in prefect. Everything will be deprecated once we move away with the blocks completely.
-
Final step is to migrate the current setup. For this we will write django commands(scripts) that help us go away with the blocks.
Before | After |
---|---|
Django: create airbyte workspace POST /api/airbyte/workspace/ |
Django : new api to create airbyte workspace POST /api/airbyte/v1/workspace/ |
Django: create organization POST /api/organizations/ |
Django : new api to create org POST /api/v1/organizations/ |
Before | After |
---|---|
Django: put airbyte destination PUT /api/airbyte/destinations/{destination_id}/ |
Django : new api to create airbyte destination POST /api/airbyte/v1/destinations/{destination_id}/ |
Proxy: update cli profile block PUT /proxy/blocks/dbtcli/profile/ |
|
Django: api to update target config schema of cli profile block PUT /api/dbt/schema/ |
Django: new api to update target config schema of cli profile block PUT /api/dbt/v1/schema/ |
Before | After |
---|---|
Django: create airbyte conncetion POST /api/airbyte/connections/ |
Django : new api to create airbyte connection POST /api/airbyte/v1/connections/ |
Django: to sync a connection via the button POST /api/airbyte/connections/{connection_block_id}/sync/ |
Django : reusing api to sync connection via deployment (already created in prefect_api) POST /api/prefect/v1/flows/{deployment_id}/flow_run |
Get all connections GET /api/airbyte/connections |
Get all connections GET /api/airbyte/v1/connections |
Get a connection POST /api/airbyte/connections/:connection_block_id |
Get a connection POST /api/airbyte/v1/connections/:connection_id |
Reset a connection POST /api/airbyte/connections/:connection_block_id/reset |
Reset a connection POST /api/airbyte/v1/connections/:connection_id/reset |
Update a connection PUT /api/airbyte/connections/:connection_block_id/update |
Update a connection PUT /api/airbyte/v1/connections/:connection_id/update |
Delete a connection DELETE /api/airbyte/connections/:connection_block_id |
Delete a connection DELETE /api/airbyte/v1/connections/:connection_id |
Before | After |
---|---|
Django: create dbt blocks api POST /api/prefect/blocks/dbt/ |
Django : api to create org tasks & cli profile blocks POST /api/prefect/tasks/transform/ |
Django: get dbt blocks api GET /api/prefect/blocks/dbt/ |
Django : api to get org tasks GET /api/prefect/tasks/transform/ |
Proxy: create dbt core block api /proxy/blocks/dbtcore/ |
Proxy : new api in proxy to create dbt cli profile block /proxy/blocks/dbtcli/profile/ |
Before | After |
---|---|
Django: apis to run dbt core operations & git pull operations on transform can be deprecated /api/prefect/flows/dbt_run/ & /api/dbt/git_pull/ |
Django : api to run tasks (dbt + shell operation(git pull)) /api/prefect/tasks/{orgtask_id}/run/ Proxy : new api to run tasks (dbt tasks) via updated flows /proxy/v1/flows/dbtcore/run/ Proxy : new api to run shell operations (a general shell op; will be used for git pull for now) via updated flows /proxy/flows/shell/run/ |
Django: api to run long tasks via deployment /api/prefect/flows/{deployment_id}/flow_run |
Django: new api to run long running tasks via deployment /api/prefect/v1/flows/{deployment_id}/flow_run/ |
Before | After |
---|---|
Django: create dataflow/pipeline POST /api/prefect/flows/ |
Django : new api to create dataflow/pipeline POST /api/prefect/v1/flows/ |
Get all pipelines GET /api/prefect/flows/ |
Get all pipelines GET /api/prefect/v1/flows/ |
Get a pipeline GET /api/prefect/flows/{deployment_id} |
Get a pipeline GET /api/airbyte/v1/flows/{deployment_id} |
Update a pipeline PUT /api/prefect/flows/{deployment_id} |
Update a pipeline PUT /api/prefect/v1/flows/{deployment_id} |
Delete a pipeline DELETE /api/prefect/flows/{deployment_id} |
Update a connection DELETE /api/prefect/v1/flows/{deployment_id} |
Proxy: update a deployment PUT /api/proxy/deployments/{deployment_id} |
Proxy: update a deployment PUT /api/proxy/v1/deployments/{deployment_id} |
-
New function in prefect service to handle the logic
lock_tasks_for_deployment
-
New api for prefect custom webhook
POST /webhooks/v1/notification/
. Need to setup this up in prefect UI.
-
Move/copy all airbyte server blocks from
orgprefectblock
toorgprefectblockv1
. No changes required on prefect side. Block Ids will remain the same.
-
Move/copy all the airbyte connections deployment with prefix as
manual-sync..
fromorgdataflow
toorgdataflowv1
. Block name, block type and block ids remain the same. -
For each copied deployment above, we need to update the delpoyment parameters. We will make the deployment have old params + new params. So the new deployment parameters
{ airbyte_blocks: [], # old dbt_blocks: [], # old config: {tasks: []} # new task dictionary }
-
Let the old architecture run for atleast one day to make sure its working.
-
Update the columns
path
andentrypoint
in deployment table in prefect db to point it to the new flow inprefect_flows.py
-
Run and test the new architecture