Skip to content

Commit

Permalink
Update logging
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronfriedman6 committed Apr 23, 2024
1 parent 5c5fa26 commit c06a322
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 2024-04-18 -- v1.0.0
## 2024-04-24 -- v1.0.0
### Added
- Perform recovery queries on past thirty days of missing data

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ Every variable not marked as optional below is required for the poller to run. T
| `LAST_POLL_DATE` (optional) | If `IGNORE_CACHE` is `True`, the starting state. The first date to be queried will be the day *after* this date. If `IGNORE_CACHE` is `False`, this field is not read. |
| `LAST_END_DATE` (optional) | The most recent date to query for. If this is left blank, it will be yesterday. |
| `IGNORE_CACHE` (optional) | Whether fetching and setting the state from S3 should *not* be done. If this is `True`, the `LAST_POLL_DATE` will be used for the initial state. |
| `IGNORE_KINESIS` (optional) | Whether sending the encoded records to Kinesis should *not* be done |
| `IGNORE_KINESIS` (optional) | Whether sending the encoded records to Kinesis should *not* be done |
| `IGNORE_UPDATE` (optional) | Whether marking old records as stale in Redshift should *not* be done |
1 change: 1 addition & 0 deletions config/devel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PLAINTEXT_VARIABLES:
END_DATE: 2024-03-27
IGNORE_CACHE: True
IGNORE_KINESIS: True
IGNORE_UPDATE: True
ENCRYPTED_VARIABLES:
REDSHIFT_DB_HOST: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAKIwgZ8GCSqGSIb3DQEHBqCBkTCBjgIBADCBiAYJKoZIhvcNAQcBMB4GCWCGSAFlAwQBLjARBAzfC7it2NYa1YnFy+4CARCAW1NrqGfSfPptEF4epEXl8Hr6ntrJcZ0DE1GeGt2c2mMnMt7hcim39NTgxHvi5DIldF4J2UJzgG0KBs5UGRU9dyjrQu5zLfdB8Kv50d1lHfvqM2aDQAK0SQ188BY=
REDSHIFT_DB_USER: AQECAHh7ea2tyZ6phZgT4B9BDKwguhlFtRC6hgt+7HbmeFsrsgAAAG0wawYJKoZIhvcNAQcGoF4wXAIBADBXBgkqhkiG9w0BBwEwHgYJYIZIAWUDBAEuMBEEDDqSUOKwGRhPNnJwhQIBEIAqJ/XgIWvG3fz4FnBPSzL640Apqd6SeIJ+KoU5N6Ka9RjN9eyBSBtU+WZJ
Expand Down
14 changes: 10 additions & 4 deletions lib/pipeline_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self):
self.redshift_hours_table = "location_hours" + redshift_suffix
self.redshift_branch_codes_table = "branch_codes_map" + redshift_suffix

self.ignore_update = os.environ.get("IGNORE_UPDATE", False) == "True"
self.ignore_cache = os.environ.get("IGNORE_CACHE", False) == "True"
if not self.ignore_cache:
self.s3_client = S3Client(
Expand Down Expand Up @@ -208,15 +209,20 @@ def _process_recovered_data(self, recovered_data, known_data_dict):
)

# Mark old rows for successfully recovered data as stale
update_query = build_redshift_update_query(
self.redshift_visits_table, ",".join(stale_ids)
)
self.redshift_client.execute_transaction([(update_query, None)])
if stale_ids:
self.logger.info(f"Updating {len(stale_ids)} stale records")
update_query = build_redshift_update_query(
self.redshift_visits_table, ",".join(stale_ids)
)
if not self.ignore_update:
self.redshift_client.execute_transaction([(update_query, None)])

if results:
encoded_records = self.avro_encoder.encode_batch(results)
if not self.ignore_kinesis:
self.kinesis_client.send_records(encoded_records)
else:
self.logger.info("No recovered data found")

def _get_poll_date(self, batch_num):
"""Retrieves the last poll date from the S3 cache or the config"""
Expand Down

0 comments on commit c06a322

Please sign in to comment.