Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for storing and validating GBFS files in GCP storage #695

Merged
merged 23 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/api-deployer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ jobs:

- name: Build & Publish Docker Image
run: |
# We want to generate the image even if it's the same commit that has been tagged. So use the version
# We want to generate the image even if it's the same commit that has been tagged. So use the version
# (coming from the tag) in the docker image tag (If the docket tag does not change it's won't be uploaded)
DOCKER_IMAGE_VERSION=$EXTRACTED_VERSION.$FEED_API_IMAGE_VERSION
scripts/docker-build-push.sh -project_id $PROJECT_ID -repo_name feeds-$ENVIRONMENT -service feed-api -region $REGION -version $DOCKER_IMAGE_VERSION
scripts/docker-build-push.sh -project_id $PROJECT_ID -repo_name feeds-$ENVIRONMENT -service feed-api -region $REGION -version $DOCKER_IMAGE_VERSION

terraform-deploy:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions api/src/scripts/populate_db_gbfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pandas as pd
import pytz
import pycountry

from database.database import generate_unique_id, configure_polymorphic_mappers
from database_gen.sqlacodegen_models import Gbfsfeed, Location, Gbfsversion, Externalid
Expand Down Expand Up @@ -92,9 +93,11 @@ def populate_db(self):
country_code = self.get_safe_value(row, "Country Code", "")
municipality = self.get_safe_value(row, "Location", "")
location_id = self.get_location_id(country_code, None, municipality)
country = pycountry.countries.get(alpha_2=country_code) if country_code else None
location = self.db.session.get(Location, location_id) or Location(
id=location_id,
country_code=country_code,
country=country.name if country else None,
municipality=municipality,
)
gbfs_feed.locations.clear()
Expand Down
2 changes: 1 addition & 1 deletion functions-python/batch_process_dataset/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(
self.execution_id = execution_id
self.authentication_type = authentication_type
self.api_key_parameter_name = api_key_parameter_name
self.date = datetime.now().strftime("%Y%m%d%H%S")
self.date = datetime.now().strftime("%Y%m%d%H%M")
feeds_credentials = ast.literal_eval(os.getenv("FEED_CREDENTIALS", "{}"))
self.feed_credentials = feeds_credentials.get(self.feed_stable_id, None)
self.public_hosted_datasets_url = public_hosted_datasets_url
Expand Down
21 changes: 20 additions & 1 deletion functions-python/dataset_service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import uuid
from datetime import datetime
from enum import Enum
Expand Down Expand Up @@ -43,6 +43,7 @@ class Status(Enum):
class PipelineStage(Enum):
DATASET_PROCESSING = "DATASET_PROCESSING"
LOCATION_EXTRACTION = "LOCATION_EXTRACTION"
GBFS_VALIDATION = "GBFS_VALIDATION"


# Dataset trace class to store the trace of a dataset
Expand Down Expand Up @@ -72,11 +73,29 @@ class BatchExecution:
batch_execution_collection: Final[str] = "batch_execution"


class MaxExecutionsReachedError(Exception):
pass


# Dataset trace service with CRUD operations for the dataset trace
class DatasetTraceService:
def __init__(self, client: Client = None):
self.client = datastore.Client() if client is None else client

def validate_and_save(self, dataset_trace: DatasetTrace, max_executions: int = 1):
if dataset_trace.execution_id is None or dataset_trace.stable_id is None:
raise ValueError("Execution ID and Stable ID are required.")
trace = self.get_by_execution_and_stable_ids(
dataset_trace.execution_id, dataset_trace.stable_id
)
executions = len(trace) if trace else 0
logging.info(f"[{dataset_trace.stable_id}] Executions: {executions}")
if executions > 0 and executions >= max_executions:
raise MaxExecutionsReachedError(
f"Maximum executions reached for {dataset_trace.stable_id}."
)
self.save(dataset_trace)

# Save the dataset trace
def save(self, dataset_trace: DatasetTrace):
entity = self._dataset_trace_to_entity(dataset_trace)
Expand Down
21 changes: 21 additions & 0 deletions functions-python/dataset_service/tests/test_dataset_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ def test_save_dataset_trace(self, mock_datastore_client):
service.save(dataset_trace)
mock_datastore_client.put.assert_called_once()

@patch("google.cloud.datastore.Client")
def test_validate_and_save_exception(self, mock_datastore_client):
service = DatasetTraceService(mock_datastore_client)
dataset_trace = DatasetTrace(
stable_id="123", status=Status.PUBLISHED, timestamp=datetime.now()
)
with self.assertRaises(ValueError):
service.validate_and_save(dataset_trace, 1)

@patch("google.cloud.datastore.Client")
def test_validate_and_save(self, mock_datastore_client):
service = DatasetTraceService(mock_datastore_client)
dataset_trace = DatasetTrace(
stable_id="123",
execution_id="123",
status=Status.PUBLISHED,
timestamp=datetime.now(),
)
service.validate_and_save(dataset_trace, 1)
mock_datastore_client.put.assert_called_once()

@patch("google.cloud.datastore.Client")
def test_get_dataset_trace_by_id(self, mock_datastore_client):
mock_datastore_client.get.return_value = {
Expand Down
29 changes: 12 additions & 17 deletions functions-python/extract_location/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
DatasetTrace,
Status,
PipelineStage,
MaxExecutionsReachedError,
)
from helpers.database import start_db_session
from helpers.logger import Logger
from helpers.parser import jsonify_pubsub
from .bounding_box.bounding_box_extractor import (
create_polygon_wkt_element,
update_dataset_bounding_box,
Expand Down Expand Up @@ -61,11 +63,8 @@ def extract_location_pubsub(cloud_event: CloudEvent):
logging.info(f"Function triggered with Pub/Sub event data: {data}")

# Extract the Pub/Sub message data
try:
message_data = data["message"]["data"]
message_json = json.loads(base64.b64decode(message_data).decode("utf-8"))
except Exception as e:
logging.error(f"Error parsing message data: {e}")
message_json = jsonify_pubsub(data)
if message_json is None:
return "Invalid Pub/Sub message data."

logging.info(f"Parsed message data: {message_json}")
Expand All @@ -88,17 +87,6 @@ def extract_location_pubsub(cloud_event: CloudEvent):
execution_id = str(uuid.uuid4())
logging.info(f"[{dataset_id}] Generated execution ID: {execution_id}")
trace_service = DatasetTraceService()
trace = trace_service.get_by_execution_and_stable_ids(execution_id, stable_id)
logging.info(f"[{dataset_id}] Trace: {trace}")
executions = len(trace) if trace else 0
print(f"[{dataset_id}] Executions: {executions}")
print(trace_service.get_by_execution_and_stable_ids(execution_id, stable_id))
logging.info(f"[{dataset_id}] Executions: {executions}")
if executions > 0 and executions >= maximum_executions:
logging.warning(
f"[{dataset_id}] Maximum executions reached. Skipping processing."
)
return f"Maximum executions reached for {dataset_id}."
trace_id = str(uuid.uuid4())
error = None
# Saving trace before starting in case we run into memory problems or uncatchable errors
Expand All @@ -112,7 +100,14 @@ def extract_location_pubsub(cloud_event: CloudEvent):
dataset_id=dataset_id,
pipeline_stage=PipelineStage.LOCATION_EXTRACTION,
)
trace_service.save(trace)
try:
trace_service.validate_and_save(trace, maximum_executions)
except ValueError as e:
logging.error(f"[{dataset_id}] Error while saving trace: {e}")
return f"Error while saving trace: {e}"
except MaxExecutionsReachedError as e:
logging.warning(f"[{dataset_id}] {e}")
return f"{e}"
try:
logging.info(f"[{dataset_id}] accessing url: {url}")
try:
Expand Down
10 changes: 10 additions & 0 deletions functions-python/gbfs_validator/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[run]
omit =
*/test*/*
*/helpers/*
*/database_gen/*
*/dataset_service/*

[report]
exclude_lines =
if __name__ == .__main__.:
59 changes: 59 additions & 0 deletions functions-python/gbfs_validator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# GBFS Validator Pipeline

This pipeline consists of two functions that work together to validate GBFS feeds:

1. **`gbfs-validator-batch`**: This function is HTTP-triggered by a Cloud Scheduler.
2. **`gbfs-validator-pubsub`**: This function is triggered by a Pub/Sub message.

### Pipeline Overview

- **`gbfs-validator-batch`**: This function checks all GBFS feeds in the database and publishes a message to the Pub/Sub topic for each feed to initiate its validation.
- **`gbfs-validator-pubsub`**: This function is triggered by the Pub/Sub message generated by the batch function. It handles the validation of the individual feed.

### Message Format

The message published by the batch function to the Pub/Sub topic follows this format:

```json
{
"message": {
"data": {
"execution_id": "execution_id",
"stable_id": "stable_id",
"feed_id": "id",
"url": "auto_discovery_url",
"latest_version": "version"
}
}
}
```

### Functionality Details

- **`gbfs-validator-batch`**: Triggered per execution ID, this function iterates over all GBFS feeds, preparing and publishing individual messages to the Pub/Sub topic.
- **`gbfs-validator-pubsub`**: Triggered per feed, this function performs the following steps:
1. **Download the feed snapshot to GCP**: It uploads all related files to the specified Cloud Storage bucket and updates the `gbfs.json` file to point to the newly uploaded files.
2. **Validate the feed**: Run the GBFS validator on the feed snapshot.
3. **Update the database**: The function updates the database with the snapshot information and validation report details.

## Function Configuration

### Batch Function Environment Variables

The `gbfs-validator-batch` function requires the following environment variables:

- **`PUBSUB_TOPIC_NAME`**: The name of the Pub/Sub topic where messages will be published.
- **`PROJECT_ID`**: The Google Cloud Project ID used to construct the full topic path.
- **`FEEDS_DATABASE_URL`**: The database connection string for accessing the GBFS feeds.

### Pub/Sub Function Environment Variables

The `gbfs-validator-pubsub` function requires the following environment variables:

- **`BUCKET_NAME`**: The name of the Cloud Storage bucket where the GBFS snapshots will be stored. Defaults to `"mobilitydata-gbfs-snapshots-dev"` if not set.
- **`FEEDS_DATABASE_URL`**: The database connection string for accessing the GBFS feeds.
- **`MAXIMUM_EXECUTIONS`**: The maximum number of times a trace can be executed before it is considered as having reached its limit. Defaults to `1` if not set.

## Local Development

For local development, these functions should be developed and tested according to standard practices for GCP serverless functions. Refer to the main [README.md](../README.md) file for general instructions on setting up the development environment.
20 changes: 20 additions & 0 deletions functions-python/gbfs_validator/function_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"name": "gbfs-validator",
"description": "Validate GBFS feeds",
"entry_point": "gbfs_validator",
"timeout": 540,
"memory": "2Gi",
"trigger_http": false,
"include_folders": ["database_gen", "helpers", "dataset_service"],
"environment_variables": [],
"secret_environment_variables": [
{
"key": "FEEDS_DATABASE_URL"
}
],
"ingress_settings": "ALLOW_INTERNAL_AND_GCLB",
"max_instance_request_concurrency": 1,
"max_instance_count": 5,
"min_instance_count": 0,
"available_cpu": 1
}
15 changes: 15 additions & 0 deletions functions-python/gbfs_validator/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
functions-framework==3.*
google-cloud-storage
google-cloud-pubsub
google-cloud-logging
google-api-core
google-cloud-firestore
google-cloud-datastore
psycopg2-binary==2.9.6
aiohttp
asyncio
urllib3~=2.1.0
SQLAlchemy==2.0.23
geoalchemy2==0.14.7
requests~=2.31.0
cloudevents~=1.10.1
4 changes: 4 additions & 0 deletions functions-python/gbfs_validator/requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Faker
pytest~=7.4.3
urllib3-mock
requests-mock
Empty file.
Loading
Loading