-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathagg_webcam_customvision.py
111 lines (95 loc) · 3.88 KB
/
agg_webcam_customvision.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import pandas as pd
import datetime
from datetime import date, timedelta
# compatibility with ipython
#os.chdir(os.path.dirname(__file__))
import json
import boto3
from coords_to_kreis import coords_convert, get_ags
import re
import settings
from push_to_influxdb import push_to_influxdb
from convert_df_to_influxdb import convert_df_to_influxdb
def convert_lat_lon_to_float(data):
try:
data["lat"] = data["Lat"].astype(float)
data["lon"] = data["Lon"].astype(float)
data = data.drop(columns=["Lon", "Lat"])
except:
print("convertlatlonerror")
pass
return data
def aggregate(date_obj=datetime.date.today()):
s3_client = boto3.client('s3')
data = pd.DataFrame()
list_keys = [""]
for hour in range(0,23):
try:
key = 'webcamdaten/{}/{}/{}/{}webcamdaten-customvision.json'.format(str(date_obj.year).zfill(4), str(date_obj.month).zfill(2), str(date_obj.day).zfill(2), str(hour).zfill(2))
response = s3_client.get_object(Bucket=settings.BUCKET, Key=key)
body = response["Body"].read()
json_body = json.loads(body)
#maybe better... if k in list_keys
json_body = [{k: v for k, v in d.items() if k != "pred"} for d in json_body]
df = pd.DataFrame(json_body)
df["date_check"] = date_obj
df["hour_check"] = hour
df["timestamp_check"] = str(datetime.datetime(year=date_obj.year, month=date_obj.month, day=date_obj.day, hour=hour))
data = data.append(df)
except Exception as e:
print(e,key)
pass
if data.empty:
print(f"WARNING: No data returned from S3 for {str(date_obj)}!")
return []
# Some cams do not update the image for several hours, e.g. at night.
# Remove entries with identical hash, keep first. Keep all data without hashes.
if "hash" in data.columns:
data_without_hashes = data[data["hash"].isna()]
data_with_hashes = data[~data["hash"].isna()]
data_with_hashes = data_with_hashes.drop_duplicates(subset="hash", keep="first")
data = pd.concat([data_without_hashes, data_with_hashes])
data = convert_lat_lon_to_float(data)
data = get_ags(data) # data is now a GeoDataFrame
data.columns = [col.lower() for col in data.columns]
data["personenzahl"] = data["personenzahl"].astype(float, errors="raise")
data["measurement"] = "webcam-customvision"
# cannot use "id" as unique identifier, because e.g. id=35 was assigned to multiple
# cameras in the past by accident. Workaround: use compound _id made up from id and ags.
data["_id"] = data.apply(lambda x: str(x["id"])+"_"+str(x["ags"]), 1)
if "orgin" not in data.columns:
data["origin"] = data["url"]
data = data.rename(columns={"stand": "time",
"state": "bundesland",
"districttype": "districtType"})
list_webcam_fields = ["personenzahl", "lat", "lon"]
list_webcam_tags = [
'_id',
'name',
'ags',
'bundesland',
'landkreis',
'districtType',
'origin']
json_out = convert_df_to_influxdb(data, list_webcam_fields, list_webcam_tags)
push_to_influxdb(json_out)
# print(json_out)
# prepare output for aggregator
data = pd.DataFrame(data.groupby("ags").mean()) # aggregate per day
data = data.reset_index()
list_results = []
for index, row in data.iterrows():
landkreis = row['ags']
webcam_score = row["personenzahl"]
data_index = {
"landkreis": landkreis,
'webcam_customvision_score': webcam_score
}
list_results.append(data_index)
return list_results
if __name__ == '__main__':
# for testing
for i in range(0, 3):
date = date.today() - timedelta(days = i)
list_results = aggregate(date)
print(list_results)