Skip to content

Commit

Permalink
Added a snowflake integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 committed Jun 5, 2024
1 parent 38614ea commit 527d568
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 42 deletions.
40 changes: 40 additions & 0 deletions frontend/public/snowflake-logo.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
63 changes: 62 additions & 1 deletion frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
Breadcrumb,
ExternalDataSourceCreatePayload,
ExternalDataSourceSyncSchema,
ExternalDataSourceType,
PipelineTab,
SourceConfig,
SourceFieldConfig,
Expand All @@ -22,7 +23,7 @@ import type { sourceWizardLogicType } from './sourceWizardLogicType'

export const getHubspotRedirectUri = (): string => `${window.location.origin}/data-warehouse/hubspot/redirect`

export const SOURCE_DETAILS: Record<string, SourceConfig> = {
export const SOURCE_DETAILS: Record<ExternalDataSourceType, SourceConfig> = {
Stripe: {
name: 'Stripe',
caption: (
Expand Down Expand Up @@ -186,6 +187,66 @@ export const SOURCE_DETAILS: Record<string, SourceConfig> = {
},
],
},
Snowflake: {
name: 'Snowflake',
caption: (
<>
Enter your Snowflake credentials to automatically pull your Snowflake data into the PostHog Data
warehouse.
</>
),
fields: [
{
name: 'account_id',
label: 'Account ID',
type: 'text',
required: true,
placeholder: '',
},
{
name: 'database',
label: 'Database',
type: 'text',
required: true,
placeholder: 'snowflake_sample_data',
},
{
name: 'warehouse',
label: 'Warehouse',
type: 'text',
required: true,
placeholder: 'compute_warehouse',
},
{
name: 'user',
label: 'User',
type: 'text',
required: true,
placeholder: 'user',
},
{
name: 'password',
label: 'Password',
type: 'password',
required: true,
placeholder: '',
},
{
name: 'role',
label: 'Role (optional)',
type: 'text',
required: false,
placeholder: 'ACCOUNTADMIN',
},
{
name: 'schema',
label: 'Schema',
type: 'text',
required: true,
placeholder: 'public',
},
],
},
Zendesk: {
name: 'Zendesk',
caption: (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { More } from 'lib/lemon-ui/LemonButton/More'
import hubspotLogo from 'public/hubspot-logo.svg'
import postgresLogo from 'public/postgres-logo.svg'
import posthogLogo from 'public/posthog-icon.svg'
import snowflakeLogo from 'public/snowflake-logo.svg'
import stripeLogo from 'public/stripe-logo.svg'
import zendeskLogo from 'public/zendesk-logo.svg'
import { urls } from 'scenes/urls'
Expand Down Expand Up @@ -228,6 +229,7 @@ export function RenderDataWarehouseSourceIcon({
Hubspot: hubspotLogo,
Zendesk: zendeskLogo,
Postgres: postgresLogo,
Snowflake: snowflakeLogo,
}[type]

return (
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3707,7 +3707,7 @@ export interface DataWarehouseViewLink {
created_at?: string | null
}

export const externalDataSources = ['Stripe', 'Hubspot', 'Postgres', 'Zendesk', 'Manual'] as const
export const externalDataSources = ['Stripe', 'Hubspot', 'Postgres', 'Zendesk', 'Snowflake', 'Manual'] as const

export type ExternalDataSourceType = (typeof externalDataSources)[number]

Expand Down
39 changes: 26 additions & 13 deletions posthog/temporal/data_imports/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,36 @@ async def run(self, inputs: ExternalDataWorkflowInputs):

# create external data job and trigger activity
create_external_data_job_inputs = CreateExternalDataJobModelActivityInputs(
team_id=inputs.team_id, schema_id=inputs.external_data_schema_id, source_id=inputs.external_data_source_id
team_id=inputs.team_id,
schema_id=inputs.external_data_schema_id,
source_id=inputs.external_data_source_id,
)

run_id, incremental = await workflow.execute_activity(
create_external_data_job_model_activity,
create_external_data_job_inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)
try:
# TODO: split out the creation of the external data job model from schema getting to seperate out exception handling
run_id, incremental = await workflow.execute_activity(
create_external_data_job_model_activity,
create_external_data_job_inputs,
start_to_close_timeout=dt.timedelta(minutes=1),
retry_policy=RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=3,
non_retryable_error_types=["NotNullViolation", "IntegrityError", "BaseSSHTunnelForwarderError"],
),
)
except Exception as e:
logger.error(
f"External data job failed on create_external_data_job_model_activity for {inputs.external_data_source_id} with error: {e}"
)
raise e

update_inputs = UpdateExternalDataJobStatusInputs(
id=run_id, run_id=run_id, status=ExternalDataJob.Status.COMPLETED, latest_error=None, team_id=inputs.team_id
id=run_id,
run_id=run_id,
status=ExternalDataJob.Status.COMPLETED,
latest_error=None,
team_id=inputs.team_id,
)

try:
Expand Down
25 changes: 25 additions & 0 deletions posthog/temporal/data_imports/pipelines/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,31 @@ def postgres_source(
return db_source


def snowflake_source(
account_id: str,
user: str,
password: str,
database: str,
warehouse: str,
schema: str,
table_names: list[str],
role: Optional[str] = None,
) -> DltSource:
account_id = quote(account_id)
user = quote(user)
password = quote(password)
database = quote(database)
warehouse = quote(warehouse)
role = quote(role) if role else None

credentials = ConnectionStringCredentials(
f"snowflake://{user}:{password}@{account_id}/{database}/{schema}?warehouse={warehouse}{f'&role={role}' if role else ''}"
)
db_source = sql_database(credentials, schema=schema, table_names=table_names)

return db_source


@dlt.source(max_table_nesting=0)
def sql_database(
credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value,
Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/data_imports/pipelines/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
list(BASE_ENDPOINTS) + [resource for resource, endpoint_url, data_key, cursor_paginated in SUPPORT_ENDPOINTS]
),
ExternalDataSource.Type.POSTGRES: (),
ExternalDataSource.Type.SNOWFLAKE: (),
}

PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING = {
ExternalDataSource.Type.STRIPE: STRIPE_INCREMENTAL_ENDPOINTS,
ExternalDataSource.Type.HUBSPOT: (),
ExternalDataSource.Type.ZENDESK: (),
ExternalDataSource.Type.POSTGRES: (),
ExternalDataSource.Type.SNOWFLAKE: (),
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
create_external_data_job,
)
from posthog.warehouse.models import sync_old_schemas_with_new_schemas, ExternalDataSource, aget_schema_by_id
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, get_postgres_schemas
from posthog.warehouse.models.external_data_schema import (
ExternalDataSchema,
get_postgres_schemas,
get_snowflake_schemas,
)
from posthog.temporal.common.logger import bind_temporal_worker_logger
from posthog.warehouse.models.ssh_tunnel import SSHTunnel

Expand Down Expand Up @@ -69,6 +73,18 @@ async def create_external_data_job_model_activity(inputs: CreateExternalDataJobM
schemas_to_sync = await sync_to_async(get_postgres_schemas)(
host, port, database, user, password, db_schema, ssh_tunnel
)
elif source.source_type == ExternalDataSource.Type.SNOWFLAKE:
account_id = source.job_inputs.get("account_id")
user = source.job_inputs.get("user")
password = source.job_inputs.get("password")
database = source.job_inputs.get("database")
warehouse = source.job_inputs.get("warehouse")
sf_schema = source.job_inputs.get("schema")
role = source.job_inputs.get("role")

schemas_to_sync = await sync_to_async(get_snowflake_schemas)(
account_id, database, warehouse, user, password, sf_schema, role
)
else:
schemas_to_sync = list(PIPELINE_TYPE_SCHEMA_DEFAULT_MAPPING.get(source.source_type, ()))

Expand Down
23 changes: 23 additions & 0 deletions posthog/temporal/data_imports/workflow_activities/import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,29 @@ async def import_data_activity(inputs: ImportDataActivityInputs) -> tuple[TSchem
table_names=endpoints,
)

return await _run(job_inputs=job_inputs, source=source, logger=logger, inputs=inputs, schema=schema)
elif model.pipeline.source_type == ExternalDataSource.Type.SNOWFLAKE:
from posthog.temporal.data_imports.pipelines.postgres import snowflake_source

account_id = model.pipeline.job_inputs.get("account_id")
user = model.pipeline.job_inputs.get("user")
password = model.pipeline.job_inputs.get("password")
database = model.pipeline.job_inputs.get("database")
warehouse = model.pipeline.job_inputs.get("warehouse")
sf_schema = model.pipeline.job_inputs.get("schema")
role = model.pipeline.job_inputs.get("role")

source = snowflake_source(
account_id=account_id,
user=user,
password=password,
database=database,
schema=sf_schema,
warehouse=warehouse,
role=role,
table_names=endpoints,
)

return await _run(job_inputs=job_inputs, source=source, logger=logger, inputs=inputs, schema=schema)

elif model.pipeline.source_type == ExternalDataSource.Type.ZENDESK:
Expand Down
Loading

0 comments on commit 527d568

Please sign in to comment.