Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alerts_twilio script, and prepare alerts_gcs script for future usage in a Windmill Flow #59

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions c_twilio_message_template.resource-type.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"type": "object",
"order": [
"account_sid",
"auth_token",
"message_service_sid",
"content_sid",
"origin_number",
"forward_numbers"
],
"$schema": "https://json-schema.org/draft/2020-12/schema",
"required": [
"account_sid",
"auth_token",
"content_sid",
"origin_number",
"forward_numbers"
],
"properties": {
"account_sid": {
"type": "string",
"default": "",
"description": "The 34 letter SID used to represent a Twilio resource."
},
"auth_token": {
"type": "string",
"default": "",
"description": "The token used to authenticate Twilio API requests."
},
"message_service_sid": {
"type": "string",
"default": "",
"description": "(Optional) The SID for a messaging service, which is a container that bundle messaging functionality for your specific use cases (such as WhatsApp). It can be found in the Messaging Services menu; each service has their own SID. By including it, you can retrieve usage statistics for the service."
},
"content_sid": {
"type": "string",
"default": "",
"description": "The SID for the message content template."
},
"origin_number": {
"type": "string",
"default": "",
"description": "The phone number from which messages will originate. This number must be initially approved and authenticated in the Twilio UI before it can be activated and used. If you are sending via a WhatsApp number, prefix your phone number with whatsapp:."
},
"recipients": {
"type": "array",
"items": {
"type": "string"
},
"default": [],
"description": "The list of phone numbers to which alerts will be sent. If you are sending to WhatsApp numbers, prefix your phone numbers with whatsapp:."
}
}
}
42 changes: 23 additions & 19 deletions f/connectors/alerts/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Google Cloud Alerts Change Detection Integration
# `alerts_gcs`: Google Cloud Alerts Change Detection Integration

This script fetches change detection alerts and images from a storage bucket on Google Cloud Platform. The script transforms the data for SQL compatibility and stores it in a PostgreSQL database. Additionally, it saves before-and-after images -- as TIF and JPEG -- to a specified directory.
This script fetches change detection alerts and images from a storage bucket on Google Cloud Platform. The script transforms the data for SQL compatibility and stores it in a PostgreSQL database. Additionally, it saves before-and-after images -- as TIF and JPEG -- to a specified directory.

## API Queries
## GCP API Queries

Change Detection alerts can be stored in a Google Cloud storage bucket.

Expand All @@ -15,31 +15,35 @@ Google Cloud storage has a built-in API solution, [GCS JSON API](https://cloud.g
Change detection alert files are currently stored on GCP in this format:

**Vector:**
```
<territory_id>/vector/<year_detec>/<month_detec>/alert_<id>.geojson
```

<territory_id>/vector/<year_detec>/<month_detec>/alert_<id>.geojson


**Raster:**

```
<territory_id>/raster/<year_detec>/<month_detec>/<sat_detec_prefix>_T0_<id>.tif
<territory_id>/raster/<year_detec>/<month_detec>/<sat_detec_prefix>_T1_<id>.tif
<territory_id>/raster/<year_detec>/<month_detec>/<sat_viz_prefix>_T0_<id>.tif
<territory_id>/raster/<year_detec>/<month_detec>/<sat_viz_prefix>_T1_<id>.tif
```

<territory_id>/raster/<year_detec>/<month_detec>/<sat_detec_prefix>_T0_<id>.tif
<territory_id>/raster/<year_detec>/<month_detec>/<sat_detec_prefix>_T1_<id>.tif
<territory_id>/raster/<year_detec>/<month_detec>/<sat_viz_prefix>_T0_<id>.tif
<territory_id>/raster/<year_detec>/<month_detec>/<sat_viz_prefix>_T1_<id>.tif

### Warehouse

**Vector:**
```
<territory_id>/<year_detec>/<month_detec>/<alert_id>/alert_<id>.geojson
```

<territory_id>/<year_detec>/<month_detec>/<alert_id>/alert_<id>.geojson

**Raster:**
Currently, we are assuming there to be only four raster images for each change detection alert: a 'before' and 'after' used for detection and visualization, respectively. Each of these is saved in both TIFF and JPEG format in the following way:

```
<territory_id>/<year_detec>/<month_detec>/<alert_id>/images/<sat_viz_prefix>_T0_<id>.tif
<territory_id>/<year_detec>/<month_detec>/<alert_id>/images/<sat_viz_prefix>_T0_<id>.jpg
...
<territory_id>/<year_detec>/<month_detec>/<alert_id>/images/<sat_viz_prefix>_T0_<id>.tif
<territory_id>/<year_detec>/<month_detec>/<alert_id>/images/<sat_viz_prefix>_T0_<id>.jpg
...

# `alerts_twilio`: Send a Twilio Message

This script leverages Twilio to send a WhatsApp message to recipients with a summary of the latest processed alerts. Below is the message template, with values from an `alerts_statistics` object:

```javascript
`${total_alerts} new change detection alert(s) have been published on your alerts dashboard for the date of ${month_year}. The following activities have been detected in your region: ${description_alerts}. Visit your alerts dashboard here: https://explorer.${territory_name}.guardianconnector.net/alerts/alerts`
```
89 changes: 71 additions & 18 deletions f/connectors/alerts/alerts_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ def main(
)

return _main(
storage_client, alerts_bucket, territory_id, db, db_table_name, destination_path
storage_client,
alerts_bucket,
territory_id,
db,
db_table_name,
destination_path,
)


Expand Down Expand Up @@ -99,16 +104,20 @@ def _main(

convert_tiffs_to_jpg(tiff_files)

prepared_alerts_metadata = prepare_alerts_metadata(alerts_metadata, territory_id)
prepared_alerts_metadata, alerts_statistics = prepare_alerts_metadata(
alerts_metadata, territory_id
)

prepared_alerts_data = prepare_alerts_data(destination_path, geojson_files)

db_writer = AlertsDBWriter(conninfo(db), db_table_name)
db_writer.handle_output(prepared_alerts_data, prepared_alerts_metadata)
logger.info(
f"Alerts data successfully written to database table: [{db_table_name}]"
new_alerts_data = db_writer.handle_output(
prepared_alerts_data, prepared_alerts_metadata
)

if new_alerts_data:
return alerts_statistics


def _get_rel_filepath(local_file_path, territory_id):
"""Generate the relative file path for a file based on its blob name and local path.
Expand Down Expand Up @@ -188,9 +197,9 @@ def sync_gcs_to_local(
prefix = f"{territory_id}/"
files_to_download = set(blob.name for blob in bucket.list_blobs(prefix=prefix))

assert (
len(files_to_download) > 0
), f"No files found to download in bucket '{bucket_name}' with that prefix."
assert len(files_to_download) > 0, (
f"No files found to download in bucket '{bucket_name}' with that prefix."
)

logger.info(
f"Found {len(files_to_download)} files to download from bucket '{bucket_name}'."
Expand Down Expand Up @@ -312,6 +321,12 @@ def prepare_alerts_metadata(alerts_metadata, territory_id):
a unique UUID for the metadata based on the content hash and includes a
placeholder geolocation.

The alert statistics dictionary is generated based on the assumption that
the first row (after sorting by month and year in descending order) in the
filtered DataFrame represents the most recent alert. In other words, it is
assumed that the latest alerts posted by the provider are always for the
latest month and year in the dataset.

Parameters
----------
alerts_metadata : str
Expand All @@ -321,17 +336,33 @@ def prepare_alerts_metadata(alerts_metadata, territory_id):

Returns
-------
list of dict
prepared_alerts_metadata : list of dict
A list of dictionaries representing the filtered and processed alerts
metadata, including additional columns for geolocation, metadata UUID,
and alert source.
alerts_statistics : dict
A dictionary containing alert statistics: total alerts, month/year,
and description of alerts.
"""
# Convert CSV bytes to DataFrame
df = pd.read_csv(StringIO(alerts_metadata))

# Filter DataFrame based on territory_id
filtered_df = df.loc[df["territory_id"] == territory_id]

# Group the DataFrame by month and year, and get the last row for each group
filtered_df = (
filtered_df.loc[df["territory_id"] == territory_id]
.groupby(["month", "year"])
.last()
.reset_index()
)

# Sort the DataFrame by year and month in descending order
filtered_df = filtered_df.sort_values(
by=["year", "month"], ascending=[False, False]
)

# Hash each row into a unique UUID; this will be used as the primary key for the metadata table
# The hash is based on the most important columns for the metadata table, so that changes in other columns do not affect the hash
filtered_df["metadata_uuid"] = pd.util.hash_pandas_object(
Expand All @@ -356,7 +387,15 @@ def prepare_alerts_metadata(alerts_metadata, territory_id):

logger.info("Successfully prepared alerts metadata.")

return prepared_alerts_metadata
# Generate alert statistics
latest_row = filtered_df.iloc[0]
alerts_statistics = {
"total_alerts": str(latest_row["total_alerts"]),
"month_year": f"{latest_row['month']}/{latest_row['year']}",
"description_alerts": latest_row["description_alerts"].replace("_", " "),
}

return prepared_alerts_metadata, alerts_statistics


def prepare_alerts_data(local_directory, geojson_files):
Expand Down Expand Up @@ -563,8 +602,11 @@ def handle_output(self, alerts, alerts_metadata):
conn = self._get_conn()
cursor = conn.cursor()

inserted_count = 0
updated_count = 0
new_alerts_data = False
data_inserted_count = 0
data_updated_count = 0
metadata_inserted_count = 0
metadata_updated_count = 0

try:
if alerts:
Expand Down Expand Up @@ -675,8 +717,8 @@ def handle_output(self, alerts, alerts_metadata):
result_inserted_count, result_updated_count = (
self._safe_insert(cursor, table_name, columns, values)
)
inserted_count += result_inserted_count
updated_count += result_updated_count
data_inserted_count += result_inserted_count
data_updated_count += result_updated_count

except Exception:
logger.exception(
Expand Down Expand Up @@ -732,9 +774,14 @@ def handle_output(self, alerts, alerts_metadata):
data_source,
]

self._safe_insert(
cursor, f"{table_name}__metadata", columns, values
result_inserted_count, result_updated_count = self._safe_insert(
cursor,
f"{table_name}__metadata",
columns,
values,
)
metadata_inserted_count += result_inserted_count
metadata_updated_count += result_updated_count

conn.commit()
except Exception:
Expand All @@ -745,7 +792,13 @@ def handle_output(self, alerts, alerts_metadata):
raise

finally:
logger.info(f"Total alert rows inserted: {inserted_count}")
logger.info(f"Total alert rows updated: {updated_count}")
logger.info(f"Total alert rows inserted: {data_inserted_count}")
logger.info(f"Total alert rows updated: {data_updated_count}")
logger.info(f"Total metadata rows inserted: {metadata_inserted_count}")
logger.info(f"Total metadata rows updated: {metadata_updated_count}")
cursor.close()
conn.close()

if data_inserted_count > 0 or metadata_inserted_count > 0:
new_alerts_data = True
return new_alerts_data
70 changes: 70 additions & 0 deletions f/connectors/alerts/alerts_twilio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# twilio~=9.4

import json
import logging

from twilio.rest import Client as TwilioClient

# type names that refer to Windmill Resources
c_twilio_message_template = dict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def main(
alerts_statistics: dict,
territory_name: str,
twilio: c_twilio_message_template,
):
send_twilio_message(twilio, alerts_statistics, territory_name)


def send_twilio_message(twilio, alerts_statistics, territory_name):
"""
Send a Twilio SMS message with alerts processing completion details.

The message template is defined in the Twilio console, and is structured as follows:
{{1}} new change detection alert(s) have been published on your alerts dashboard for
the date of {{2}}. The following activities have been detected in your region: {{3}}.
Visit your alerts dashboard here: {{4}}

In the content_variables below, the placeholders {{1}}, {{2}}, {{3}}, and {{4}} are
replaced with the corresponding values from the alerts_statistics dictionary.

Parameters
----------
twilio : dict
A dictionary containing Twilio configuration parameters, including
account credentials, messaging service details, and recipient phone
numbers.
alerts_statistics : dict
A dictionary containing statistics about the processed alerts, such as
the total number of alerts, month and year, and a description.
territory_name : str
The slug of the territory for which alerts are being processed.
"""
client = TwilioClient(twilio["account_sid"], twilio["auth_token"])

# Send a message to each recipient
logger.info(
f"Sending Twilio messages to {len(twilio.get('recipients', []))} recipients."
)

for recipient in twilio["recipients"]:
client.messages.create(
content_sid=twilio.get("content_sid"),
content_variables=json.dumps(
{
"1": alerts_statistics.get("total_alerts"),
"2": alerts_statistics.get("month_year"),
"3": alerts_statistics.get("description_alerts"),
"4": f"https://explorer.{territory_name}.guardianconnector.net/alerts/alerts",
}
),
messaging_service_sid=twilio.get("messaging_service_sid"),
to=recipient,
from_=twilio["origin_number"],
)

logger.info("Twilio messages sent successfully.")
1 change: 1 addition & 0 deletions f/connectors/alerts/alerts_twilio.script.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
twilio==9.4.1
Loading