-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaggregator.py
133 lines (118 loc) · 5.34 KB
/
aggregator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import boto3
import pandas as pd
from datetime import date, timedelta
from agg_webcam import aggregate as agg_webcam
from agg_webcam_customvision import aggregate as agg_webcam_customvision
from agg_hystreet import aggregate as agg_hystreet
# from agg_gmap_transit_score import aggregate as agg_gmap_transit_score
from agg_fahrrad import aggregate as agg_fahrrad
from agg_airquality import aggregate as agg_airquality
from agg_lemgo_digital import aggregate as agg_lemgo_digital
from agg_mdm import aggregate as agg_mdm
# from agg_tomtom import aggregate as agg_tomtom
# import json
# import settings
import os
if __name__ == "__main__":
sources = "mdm"
dict_environ = {"TIMERANGE": 30, "SOURCE_SELECTOR": sources, "OFFSET": 0}
for key, value in dict_environ.items():
if key in list(os.environ):
dict_environ[key] = os.environ[key]
list_sources = dict_environ["SOURCE_SELECTOR"].split(";")
# How far back do you want to aggregate data?
days = int(dict_environ["TIMERANGE"])
dict_environ["OFFSET"] = int(dict_environ["OFFSET"])
s3_client = boto3.client('s3')
print(f"\nAggregate the last {days} days.")
for x in range(dict_environ["OFFSET"], days + dict_environ["OFFSET"]):
date_obj = date.today() - timedelta(days=x)
print("\n##########################")
print('### START ',date_obj,"\n")
list_result = pd.DataFrame(columns = ['landkreis'])
list_result = list_result.set_index("landkreis")
if 'lemgo' in list_sources:
print("start lemgo...")
try:
lemgo_digital_list = pd.DataFrame(agg_lemgo_digital(date_obj))
lemgo_digital_list = lemgo_digital_list.set_index('landkreis')
list_result = list_result.join(lemgo_digital_list, how="outer")
except Exception as e:
print("Error Lemgo:")
print(e)
if 'webcam' in list_sources:
print("--------------")
print("start webcams...")
try:
webcam_list = pd.DataFrame(agg_webcam(date_obj))
webcam_list = webcam_list.set_index('landkreis')
list_result = list_result.join(webcam_list, how="outer")
except Exception as e:
print("Error Webcam")
print(e)
if 'webcam-customvision' in list_sources:
print("--------------")
print("start webcams customvision...")
try:
webcam_list_customvision = pd.DataFrame(agg_webcam_customvision(date_obj))
webcam_list_customvision = webcam_list_customvision.set_index('landkreis')
list_result = list_result.join(webcam_list_customvision, how="outer")
except Exception as e:
print("Error Webcam customvision")
print(e)
if 'hystreet' in list_sources:
print("--------------")
print("start hystreet...")
try:
hystreet_list = pd.DataFrame(agg_hystreet(date_obj))
hystreet_list = hystreet_list.set_index('landkreis')
list_result = list_result.join(hystreet_list, how = "outer")
except Exception as e:
print("Error Hystreet")
print(e)
if 'fahrrad' in list_sources:
print("--------------")
print("start fahrrad...")
try:
fahrrad_list = pd.DataFrame(agg_fahrrad(date_obj))
fahrrad_list = fahrrad_list.set_index('landkreis')
list_result = list_result.join(fahrrad_list, how="outer")
except Exception as e:
print("Error Fahrrad")
print(e)
if 'airquality' in list_sources:
print("--------------")
print("start airquality...")
try:
airquality_list = agg_airquality(date_obj)
if airquality_list == []:
print("airquality: No data")
else:
airquality_df = pd.DataFrame(airquality_list)
airquality_df = airquality_df.set_index('ags')
list_result = list_result.join(airquality_df, how="outer")
except Exception as e:
print("Error Airquality")
print(e)
if 'mdm' in list_sources:
print("--------------")
print("start mdm...")
try:
agg_mdm(date_obj)
except Exception as e:
print("Error mdm")
print(e)
# print("--------------")
# print("write output...")
# list_result["date"] = str(date_obj)
# list_result.index = list_result.index.astype(int).astype(str).str.zfill(5)
# #list_result.to_csv("test.csv")
#
# #list_result
# dict_results = list_result.T.to_dict()
# #dict
# # s3_client.put_object(Bucket='sdd-s3-basebucket', Key="aggdata/live", Body=json.dumps(dict))
# response = s3_client.put_object(Bucket=settings.BUCKET, Key='aggdata/{}/{}/{}'.format(str(date_obj.year).zfill(4), str(date_obj.month).zfill(2),str(date_obj.day).zfill(2)), Body=json.dumps(dict_results))
# print("s3_client.put_object response:", response)
# print('\n### END ',date_obj,"")
# print("##########################\n")