diff --git a/docs/api/covidcast_meta.md b/docs/api/covidcast_meta.md index d8121b10b..f7339808c 100644 --- a/docs/api/covidcast_meta.md +++ b/docs/api/covidcast_meta.md @@ -38,6 +38,7 @@ None required. | `epidata[].max_value` | maximum value | float | | `epidata[].mean_value` | mean of value | float | | `epidata[].stdev_value` | standard deviation of value | float | +| `epidata[].min_issue` | earliest date data was issued (e.g., 20200710) | integer | | `epidata[].max_issue` | most recent date data was issued (e.g., 20200710) | integer | | `epidata[].min_lag` | smallest lag from observation to issue, in `time_type` units | integer | | `epidata[].max_lag` | largest lag from observation to issue, in `time_type` units | integer | diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index 18dec6577..81d3be001 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -97,8 +97,9 @@ def test_caching(self): 'last_update': 789, 'min_value': 1, 'max_value': 1, - 'mean_value': 1, - 'stdev_value': 0, + 'mean_value': 1.0, + 'stdev_value': 0.0, + 'min_issue': 20200422, 'max_issue': 20200423, 'min_lag': 0, 'max_lag': 1, diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index f7f6551fe..db6483830 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -355,6 +355,7 @@ def test_covidcast_meta(self): 'mean_value': 6.5, 'stdev_value': 0.5, 'last_update': 345, + 'min_issue': 20200414, 'max_issue': 20200416, 'min_lag': 1, 'max_lag': 2, diff --git a/integrations/server/test_covidcast_meta.py b/integrations/server/test_covidcast_meta.py index a45981e15..e06499ec0 100644 --- a/integrations/server/test_covidcast_meta.py +++ b/integrations/server/test_covidcast_meta.py @@ -47,7 +47,11 @@ def test_round_trip(self): # insert dummy data and accumulate expected results (in sort order) template = ''' insert into covidcast values - (0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0, 1, %d) + (0, "%s", "%s", "%s", "%s", %d, "%s", + 123, + %d, 0, 0, + 456, 0, + %d, 0, 1, %d) ''' expected = [] for src in ('src1', 'src2'): @@ -67,6 +71,7 @@ def test_round_trip(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, + 'min_issue': 1, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -116,6 +121,7 @@ def test_filter(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, + 'min_issue': 1, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -235,6 +241,7 @@ def test_suppress_work_in_progress(self): 'mean_value': 15, 'stdev_value': 5, 'last_update': 123, + 'min_issue': 1, 'max_issue': 2, 'min_lag': 0, 'max_lag': 0, @@ -256,3 +263,52 @@ def test_suppress_work_in_progress(self): 'epidata': expected, 'message': 'success', }) + + + def test_min_issue(self): + """Test proper computation of min issue in a complex setup.""" + + def add_row(src, sig, time_type, geo_type, time_value, geo_value, value, issue, is_latest = True): + + template = ''' + insert into covidcast( +`id`, `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, +`value_updated_timestamp`, `value`, `stderr`, `sample_size`, `direction_updated_timestamp`, `direction`, +`issue`, `lag`, `is_latest_issue`, `is_wip`) + values + (0, "%s", "%s", "%s", "%s", %d, "%s", + 19700101, %d, 0, 0, 19700101, 0, + %d, 0, %d, 0) + ''' + self.cur.execute(template % (src, sig, time_type, geo_type, time_value, geo_value, value, issue, is_latest)) + + expected = [] + for src in ('src1', 'src2'): + for sig in ('sig1', 'sig2'): + expected.append(dict(data_source=src, signal=sig, min_issue=20200502, max_issue=20200505, mean_value=15)) + + for time_value in [20200101, 20200102]: + for geo_value, value in zip(('geo1', 'geo2'), (1, 2)): + # add some old issue rows which won't influence the mean + add_row(src, sig, 'day', 'county', time_value, geo_value, value, 20200502, False) + + for time_value in range(20200101, 20200105): + for geo_value, value in zip(('geo1', 'geo2'), (10, 20)): + # add a latest issue row + add_row(src, sig, 'day', 'county', time_value, geo_value, value, 20200505, True) + + self.cnx.commit() + update_cache(args=None) + + # make the request + response = requests.get(BASE_URL, params=dict(source='covidcast_meta', fields="data_source,signal,min_issue,max_issue,mean_value")) + response.raise_for_status() + response = response.json() + + # assert that the right data came back + self.maxDiff = None + self.assertEqual(response, { + 'result': 1, + 'epidata': expected, + 'message': 'success', + }) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 587c90f40..eb7355654 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -163,12 +163,12 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False SET `is_latest_issue`=0 ''' set_is_latest_issue_sql = f''' - UPDATE + UPDATE ( SELECT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, MAX(`issue`) AS `issue` FROM ( - SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` + SELECT DISTINCT `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value` FROM `{tmp_table_name}` ) AS TMP LEFT JOIN `covidcast` @@ -177,7 +177,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False ) AS TMP LEFT JOIN `covidcast` USING (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, `issue`) - SET `is_latest_issue`=1 + SET `is_latest_issue`=1 ''' @@ -268,68 +268,69 @@ def get_covidcast_meta(self): sql = 'SELECT `source`, `signal` FROM covidcast WHERE NOT `is_wip` GROUP BY `source`, `signal` ORDER BY `source` ASC, `signal` ASC;' self._cursor.execute(sql) - for source, signal in [ss for ss in self._cursor]: #NOTE: this obfuscation protects the integrity of the cursor; using the cursor as a generator will cause contention w/ subsequent queries + signals = [ss for ss in self._cursor] #NOTE: this obfuscation protects the integrity of the cursor; using the cursor as a generator will cause contention w/ subsequent queries + for source, signal in signals: + + # calculate the min issues per combination + sql = ''' + SELECT + `time_type`, + `geo_type`, + MIN(`issue`) as `min_issue` + FROM + `covidcast` + WHERE + `source` = %s AND + `signal` = %s + GROUP BY + `time_type`, + `geo_type` + ''' + self._cursor.execute(sql, (source, signal)) + min_issue_lookup = {(x[0], x[1]): x[2] for x in self._cursor} + # calculate statistics for the latest issue entries sql = ''' SELECT - t.`source` AS `data_source`, - t.`signal`, - t.`time_type`, - t.`geo_type`, - MIN(t.`time_value`) AS `min_time`, - MAX(t.`time_value`) AS `max_time`, - COUNT(DISTINCT t.`geo_value`) AS `num_locations`, + `source` AS `data_source`, + `signal`, + `time_type`, + `geo_type`, + MIN(`time_value`) AS `min_time`, + MAX(`time_value`) AS `max_time`, + COUNT(DISTINCT `geo_value`) AS `num_locations`, MIN(`value`) AS `min_value`, MAX(`value`) AS `max_value`, ROUND(AVG(`value`),7) AS `mean_value`, ROUND(STD(`value`),7) AS `stdev_value`, MAX(`value_updated_timestamp`) AS `last_update`, + MIN(`issue`) as `min_issue`, MAX(`issue`) as `max_issue`, MIN(`lag`) as `min_lag`, MAX(`lag`) as `max_lag` FROM - `covidcast` t - JOIN - ( - SELECT - max(`issue`) `max_issue`, - `time_type`, - `time_value`, - `source`, - `signal`, - `geo_type`, - `geo_value` - FROM - `covidcast` - WHERE - `source` = %s AND - `signal` = %s - GROUP BY - `time_value`, - `time_type`, - `geo_type`, - `geo_value` - ) x - ON - x.`max_issue` = t.`issue` AND - x.`time_type` = t.`time_type` AND - x.`time_value` = t.`time_value` AND - x.`source` = t.`source` AND - x.`signal` = t.`signal` AND - x.`geo_type` = t.`geo_type` AND - x.`geo_value` = t.`geo_value` + `covidcast` + WHERE + `source` = %s AND + `signal` = %s AND + `is_latest_issue` is TRUE GROUP BY - t.`time_type`, - t.`geo_type` + `time_type`, + `geo_type` ORDER BY - t.`time_type` ASC, - t.`geo_type` ASC + `time_type` ASC, + `geo_type` ASC ''' self._cursor.execute(sql, (source, signal)) - meta.extend(list(dict(zip(self._cursor.column_names,x)) for x in self._cursor)) + for x in self._cursor: + entry = dict(zip(self._cursor.column_names, x)) + # merge in the min issue + key = (entry['time_type'], entry['geo_type']) + entry['min_issue'] = min_issue_lookup.get(key, entry['max_issue']) + meta.append(entry) return meta - + def update_covidcast_meta_cache(self, metadata): """Updates the `covidcast_meta_cache` table."""