Skip to content

Commit

Permalink
[devrel] atproto demo ingestion (#26038)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Scaffolds project for atproto dashboard demo.

```
dagster project scaffold --name project_atproto_dashboard
```

Adds ingestion for Bluesky; snapshots starter pack and user feeds.

See upstream PRs that were squashed for context.

## How I Tested These Changes

## Changelog

> Insert changelog entry or delete this section.

---------

Co-authored-by: Alex Noonan <[email protected]>
  • Loading branch information
cmpadden and C00ldudeNoonan authored Dec 17, 2024
1 parent 8b83349 commit 452713c
Show file tree
Hide file tree
Showing 38 changed files with 946 additions and 61 deletions.
17 changes: 17 additions & 0 deletions examples/project_atproto_dashboard/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
AWS_ENDPOINT_URL=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_BUCKET_NAME=
AWS_ACCOUNT_ID=

MOTHERDUCK_TOKEN=

BSKY_LOGIN=
BSKY_APP_PASSWORD=

DBT_TARGET=

AZURE_POWERBI_CLIENT_ID=
AZURE_POWERBI_CLIENT_SECRET=
AZURE_POWERBI_TENANT_ID=
AZURE_POWERBI_WORKSPACE_ID=
5 changes: 5 additions & 0 deletions examples/project_atproto_dashboard/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
tmp*/
storage/
schedules/
history/
atproto-session.txt
52 changes: 52 additions & 0 deletions examples/project_atproto_dashboard/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# project_atproto_dashboard

An end-to-end demonstration of ingestion data from the ATProto API, modeling it with dbt, and presenting it with Power BI.

![Architecture Diagram](./architecture-diagram.png)

![Project asset lineage](./lineage.svg)

## Features used

1. Ingestion of data-related Bluesky posts
- Dynamic partitions
- Declarative automation
- Concurrency limits
2. Modelling data using _dbt_
3. Representing data in a dashboard

## Getting started

### Environment Setup

Ensure the following environments have been populated in your `.env` file. Start by copying the
template.

```
cp .env.example .env
```

And then populate the fields.

### Development

Install the project dependencies:

pip install -e ".[dev]"

Start Dagster:

DAGSTER_HOME=$(pwd) dagster dev

### Unit testing

Tests are in the `project_atproto_dashboard_tests` directory and you can run tests using `pytest`:

pytest project_atproto_dashboard_tests

## Resources

- https://docs.bsky.app/docs/tutorials/viewing-feeds
- https://docs.bsky.app/docs/advanced-guides/rate-limits
- https://atproto.blue/en/latest/atproto_client/auth.html#session-string
- https://tenacity.readthedocs.io/en/latest/#waiting-before-retrying
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions examples/project_atproto_dashboard/dagster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator

concurrency:
default_op_concurrency_limit: 1
Empty file.
2 changes: 2 additions & 0 deletions examples/project_atproto_dashboard/dbt_project/.sqlfluff
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[sqlfluff:rules:capitalisation.keywords]
capitalisation_policy = upper
13 changes: 13 additions & 0 deletions examples/project_atproto_dashboard/dbt_project/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name: "dbt_project"
version: "1.0.0"
config-version: 2

profile: "bluesky"

target-path: "target"
clean-targets:
- "target"
- "dbt_packages"

models:
+materialized: table
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
WITH final AS (
SELECT
date_trunc('day', created_at) AS post_date,
count(DISTINCT post_text) AS unique_posts,
count(DISTINCT author_handle) AS active_authors,
sum(likes) AS total_likes,
sum(replies) AS total_comments,
sum(quotes) AS total_quotes
FROM {{ ref("latest_feed") }}
GROUP BY date_trunc('day', created_at)
ORDER BY date_trunc('day', created_at) DESC
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
WITH max_profile_data AS (
SELECT
json_extract_string(json, '$.subject.did') AS profile_did,
max(
strptime(
regexp_extract(
filename,
'dagster-demo/atproto_starter_pack_snapshot/(\d{4}-\d{2}-\d{2}/\d{2}/\d{2})',
1
),
'%Y-%m-%d/%H/%M'
)
) AS max_extracted_timestamp
FROM {{ ref("stg_profiles") }}
GROUP BY
json_extract_string(json, '$.subject.did')
),

profiles AS (
SELECT
json_extract_string(json, '$.subject.handle') AS handle_subject,
json_extract_string(json, '$.subject.did') AS profile_did,
json_extract_string(json, '$.subject.avatar') AS profile_avatar,
json_extract_string(json, '$.subject.display_name')
AS profile_display_name,
json_extract_string(json, '$.subject.created_at')
AS profile_created_date,
json_extract_string(json, '$.subject.description')
AS profile_description
FROM {{ ref("stg_profiles") }} stg_prof
JOIN max_profile_data
ON
json_extract_string(stg_prof.json, '$.subject.did')
= max_profile_data.profile_did
AND strptime(
regexp_extract(
stg_prof.filename,
'dagster-demo/atproto_starter_pack_snapshot/(\d{4}-\d{2}-\d{2}/\d{2}/\d{2})',
1
),
'%Y-%m-%d/%H/%M'
)
= max_profile_data.max_extracted_timestamp
),

user_aggregates AS (
SELECT
replace(author_handle, '"', '') AS author_handle,
count(*) AS num_posts,
avg(cast(lf.likes AS int)) AS average_likes,
sum(cast(lf.likes AS int)) AS total_likes,
sum(cast(lf.replies AS int)) AS total_replies,
sum(cast(lf.likes AS int)) / count(*) AS total_likes_by_num_of_posts,
round(
count(*)
/ count(DISTINCT date_trunc('day', cast(created_at AS timestamp))),
2
) AS avg_posts_per_day,
ntile(100)
OVER (
ORDER BY sum(cast(lf.likes AS int))
)
AS likes_percentile,
ntile(100)
OVER (
ORDER BY sum(cast(lf.replies AS int))
)
AS replies_percentile,
ntile(100) OVER (
ORDER BY count(*)
) AS posts_percentile,
(ntile(100) OVER (
ORDER BY sum(cast(lf.likes AS int))) + ntile(100) OVER (
ORDER BY sum(cast(lf.replies AS int))) + ntile(100) OVER (
ORDER BY count(*)
))
/ 3.0 AS avg_score
FROM {{ ref("latest_feed") }} lf
GROUP BY replace(author_handle, '"', '')
),

final AS (
SELECT DISTINCT
profiles.handle_subject AS profile_handle,
profiles.profile_did,
profiles.profile_display_name,
profiles.profile_avatar,
profiles.profile_created_date,
profiles.profile_description,
user_aggregates.num_posts,
user_aggregates.average_likes,
user_aggregates.total_likes,
user_aggregates.total_replies,
user_aggregates.total_likes_by_num_of_posts,
user_aggregates.avg_posts_per_day,
user_aggregates.likes_percentile,
user_aggregates.replies_percentile,
user_aggregates.posts_percentile,
user_aggregates.avg_score
FROM profiles
LEFT JOIN user_aggregates
ON user_aggregates.author_handle = profiles.handle_subject
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
WITH date_spine AS (
SELECT CAST(range AS DATE) AS date_key
FROM RANGE(
(SELECT MIN(created_at) FROM {{ ref("latest_feed") }}),
CURRENT_DATE(),
INTERVAL 1 DAY
)
)

SELECT
date_key AS date_key,
DAYOFYEAR(date_key) AS day_of_year,
WEEKOFYEAR(date_key) AS week_of_year,
DAYOFWEEK(date_key) AS day_of_week,
ISODOW(date_key) AS iso_day_of_week,
DAYNAME(date_key) AS day_name,
DATE_TRUNC('week', date_key) AS first_day_of_week,
DATE_TRUNC('week', date_key) + 6 AS last_day_of_week,
YEAR(date_key) || RIGHT('0' || MONTH(date_key), 2) AS month_key,
MONTH(date_key) AS month_of_year,
DAYOFMONTH(date_key) AS day_of_month,
LEFT(MONTHNAME(date_key), 3) AS month_name_short,
MONTHNAME(date_key) AS month_name,
DATE_TRUNC('month', date_key) AS first_day_of_month,
LAST_DAY(date_key) AS last_day_of_month,
CAST(YEAR(date_key) || QUARTER(date_key) AS INT) AS quarter_key,
QUARTER(date_key) AS quarter_of_year,
CAST(date_key - DATE_TRUNC('Quarter', date_key) + 1 AS INT)
AS day_of_quarter,
('Q' || QUARTER(date_key)) AS quarter_desc_short,
('Quarter ' || QUARTER(date_key)) AS quarter_desc,
DATE_TRUNC('quarter', date_key) AS first_day_of_quarter,
LAST_DAY(DATE_TRUNC('quarter', date_key) + INTERVAL 2 MONTH)
AS last_day_of_quarter,
CAST(YEAR(date_key) AS INT) AS year_key,
DATE_TRUNC('Year', date_key) AS first_day_of_year,
DATE_TRUNC('Year', date_key) - 1 + INTERVAL 1 YEAR AS last_day_of_year,
ROW_NUMBER()
OVER (
PARTITION BY YEAR(date_key), MONTH(date_key), DAYOFWEEK(date_key)
ORDER BY date_key
)
AS ordinal_weekday_of_month
FROM date_spine
WHERE CAST(YEAR(date_key) AS INT) >= 2020
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
WITH max_update AS (
SELECT
max(
strptime(
regexp_extract(
filename,
'dagster-demo/atproto_actor_feed_snapshot/(\d{4}-\d{2}-\d{2}/\d{2}/\d{2})',
1
),
'%Y-%m-%d/%H/%M'
)
) AS max_extracted_timestamp,
regexp_extract(filename, 'did:(.*?)\.json') AS profile_id
FROM {{ ref("stg_feed_snapshots") }}
GROUP BY
regexp_extract(filename, 'did:(.*?)\.json')
),

final AS (
SELECT
json_extract_string(sfs.json, '$.post.author.handle') AS author_handle,
json_extract_string(sfs.json, '$.post.author.did') AS author_id,
cast(sfs.json.post.like_count AS int) AS likes,
cast(sfs.json.post.quote_count AS int) AS quotes,
cast(sfs.json.post.reply_count AS int) AS replies,
json_extract_string(sfs.json, '$.post.record.text') AS post_text,
sfs.json.post.record.embed,
json_extract_string(
sfs.json, '$.post.record.embed.external.description'
) AS external_embed_description,
json_extract_string(sfs.json, '$.post.record.embed.external.uri')
AS external_embed_link,
sfs.json.post.record.embed.external.thumb AS external_embed_thumbnail,
cast(sfs.json.post.record.created_at AS timestamp) AS created_at,
CASE
WHEN json_extract_string(sfs.json.post.record.embed, '$.images[0].image.ref.link') IS NULL THEN NULL
ELSE concat('https://cdn.bsky.app/img/feed_thumbnail/plain/', json_extract_string(sfs.json, '$.post.author.did') ,'/' ,json_extract_string(sfs.json.post.record.embed, '$.images[0].image.ref.link'), '@jpeg')
END AS image_url,
max_update.max_extracted_timestamp,
max_update.profile_id
FROM {{ ref("stg_feed_snapshots") }} sfs
JOIN max_update
ON
max_update.profile_id
= regexp_extract(sfs.filename, 'did:(.*?)\.json')
AND max_update.max_extracted_timestamp
= strptime(
regexp_extract(
sfs.filename,
'dagster-demo/atproto_actor_feed_snapshot/(\d{4}-\d{2}-\d{2}/\d{2}/\d{2})',
1
),
'%Y-%m-%d/%H/%M'
)
)

SELECT * FROM final
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: 2

models:
- name: all_profiles
description: "table showing data for all the profiles posts are collected from and some high level statistics"
columns:
- name: profile_handle
data_tests:
- unique
- not_null
- name: latest_feed
description: "the latest feed of posts"
- name: activity_over_time
description: "daily activity of posts overtime"
- name: top_daily_posts
description: "top posts ranked for a given day"
- name: top_external_links
description: "top external content grouped by type shared in the community"
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
WITH distinct_posts AS (
SELECT DISTINCT ON (author_handle, post_text, date_trunc('day', created_at))
author_handle,
post_text,
likes,
quotes,
replies,
image_url,
external_embed_link,
external_embed_thumbnail,
external_embed_description,
created_at
FROM {{ ref("latest_feed") }}
),

scored_posts AS (
SELECT
*,
(likes * 0.2) + (quotes * 0.4) + (replies * 0.4) AS engagement_score,
date_trunc('day', created_at) AS post_date,
row_number() OVER (
PARTITION BY date_trunc('day', created_at)
ORDER BY (likes * 0.2) + (quotes * 0.4) + (replies * 0.4) DESC
) AS daily_rank
FROM distinct_posts
),

final AS (
SELECT
post_date,
author_handle,
post_text,
likes,
quotes,
replies,
image_url,
external_embed_link,
external_embed_thumbnail,
external_embed_description,
round(engagement_score, 2) AS engagement_score,
daily_rank
FROM scored_posts
WHERE daily_rank <= 10
)

SELECT * FROM final
Loading

0 comments on commit 452713c

Please sign in to comment.