-
Notifications
You must be signed in to change notification settings - Fork 2
/
github-stats.py
73 lines (57 loc) · 2.08 KB
/
github-stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import datetime
import json
import sys
import os
import boto3
import requests
from awsglue.utils import getResolvedOptions
JOB_ARGS = getResolvedOptions(sys.argv, ["sekret", "bucket", "prefix"])
"""
Helper functions to retrieve environment variables
"""
def get_region():
return os.environ.get("AWS_DEFAULT_REGION")
def get_job_arg(name):
return JOB_ARGS.get(name)
def get_secret():
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client("secretsmanager", region_name=get_region())
get_secret_value_response = client.get_secret_value(SecretId=get_job_arg("sekret"))
return json.loads(get_secret_value_response["SecretString"])["github_pat"]
def save_results(bucket, key, value):
"""Save the provide dictionary to an S3 bucket and key"""
s3 = boto3.resource("s3")
obj = s3.Object(bucket, key)
obj.put(Body=json.dumps(value))
github_repos = [
"dacort/athena-query-stats",
"awslabs/athena-glue-service-logs",
"awslabs/athena-adobe-datafeed-splitter",
"dacort/demo-code",
"dacort/sample-code",
"dacort/syslog-to-athena",
"dacort/damons-data-lake",
"dacort/metabase-athena-driver",
]
traffic_endpoints = ["popular/referrers", "popular/paths", "views", "clones"]
for repo in github_repos:
for endpoint in traffic_endpoints:
url = "https://api.github.com/repos/" + repo + "/traffic/" + endpoint
headers = {"Authorization": "token " + get_secret()}
r = requests.get(url, headers=headers)
if r.ok:
events = json.loads(r.text or r.content)
# Only write out the file if we got any useful data
if events:
today = datetime.datetime.now().strftime("%Y-%m-%d")
s3_key = "%s/%s/%s/%s.json" % (
get_job_arg("prefix"),
endpoint.replace("popular/", "traffic/"),
repo,
today,
)
save_results(get_job_arg("bucket"), s3_key, events)
else:
# TODO: Error handling :)
print(r)