Skip to content

Commit

Permalink
Merge pull request #7 from github/secret_alerts
Browse files Browse the repository at this point in the history
Support for secret scanning alerts
  • Loading branch information
zbazztian authored Nov 11, 2021
2 parents 5c49637 + e9a9a3c commit 3552cf2
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 103 deletions.
145 changes: 122 additions & 23 deletions ghlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,21 @@ def create_hook(
self.repo_id, url, secret, active, events, insecure_ssl, content_type
)

def get_alerts(self, state=None):
def get_key(self):
return util.make_key(self.repo_id)

def alerts_helper(self, api_segment, state=None):
if state:
state = "&state=" + state
else:
state = ""

try:
resp = requests.get(
"{api_url}/repos/{repo_id}/code-scanning/alerts?per_page={results_per_page}{state}".format(
"{api_url}/repos/{repo_id}/{api_segment}/alerts?per_page={results_per_page}{state}".format(
api_url=self.gh.url,
repo_id=self.repo_id,
api_segment=api_segment,
state=state,
results_per_page=RESULTS_PER_PAGE,
),
Expand All @@ -167,7 +171,7 @@ def get_alerts(self, state=None):
resp.raise_for_status()

for a in resp.json():
yield GHAlert(self, a)
yield a

nextpage = resp.links.get("next", {}).get("url", None)
if not nextpage:
Expand All @@ -188,6 +192,32 @@ def get_alerts(self, state=None):
# propagate everything else
raise

def get_info(self):
resp = requests.get(
"{api_url}/repos/{repo_id}".format(
api_url=self.gh.url, repo_id=self.repo_id
),
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
resp.raise_for_status()
return resp.json()

def isprivate(self):
return self.get_info()["private"]

def get_alerts(self, state=None):
for a in self.alerts_helper("code-scanning", state):
yield Alert(self, a)

def get_secrets(self, state=None):
# secret scanning alerts are only accessible on private repositories, so
# we return an empty list on public ones
if not self.isprivate():
return
for a in self.alerts_helper("secret-scanning", state):
yield Secret(self, a)

def get_alert(self, alert_num):
resp = requests.get(
"{api_url}/repos/{repo_id}/code-scanning/alerts/{alert_num}".format(
Expand All @@ -198,7 +228,7 @@ def get_alert(self, alert_num):
)
try:
resp.raise_for_status()
return GHAlert(self, resp.json())
return Alert(self, resp.json())
except HTTPError as httpe:
if httpe.response.status_code == 404:
# A 404 suggests that the alert doesn't exist
Expand All @@ -208,41 +238,74 @@ def get_alert(self, alert_num):
raise


class GHAlert:
class AlertBase:
def __init__(self, github_repo, json):
self.github_repo = github_repo
self.gh = github_repo.gh
self.json = json

def get_state(self):
return self.json["state"] == "open"

def get_type(self):
return type(self).__name__

def number(self):
return int(self.json["number"])

def get_state(self):
return parse_alert_state(self.json["state"])
def short_desc(self):
raise NotImplementedError

def adjust_state(self, state):
if state:
self.update("open")
else:
self.update("dismissed")
def long_desc(self):
raise NotImplementedError

def hyperlink(self):
return self.json["html_url"]

def can_transition(self):
return True

def is_fixed(self):
return self.json["state"] == "fixed"
def get_key(self):
raise NotImplementedError

def update(self, alert_state):
if self.json["state"] == alert_state:
def adjust_state(self, target_state):
if self.get_state() == target_state:
return

action = "Reopening" if parse_alert_state(alert_state) else "Closing"
logger.info(
'{action} alert {alert_num} of repository "{repo_id}".'.format(
action=action, alert_num=self.number(), repo_id=self.github_repo.repo_id
'{action} {atype} {alert_num} of repository "{repo_id}".'.format(
atype=self.get_type(),
action="Reopening" if target_state else "Closing",
alert_num=self.number(),
repo_id=self.github_repo.repo_id,
)
)
self.do_adjust_state(target_state)


class Alert(AlertBase):
def __init__(self, github_repo, json):
AlertBase.__init__(self, github_repo, json)

def can_transition(self):
return self.json["state"] != "fixed"

def long_desc(self):
return self.json["rule"]["description"]

def short_desc(self):
return self.json["rule"]["id"]

def get_key(self):
return util.make_key(self.github_repo.repo_id + "/" + str(self.number()))

def do_adjust_state(self, target_state):
state = "open"
reason = ""
if alert_state == "dismissed":
if not target_state:
state = "dismissed"
reason = ', "dismissed_reason": "won\'t fix"'
data = '{{"state": "{state}"{reason}}}'.format(state=alert_state, reason=reason)
data = '{{"state": "{state}"{reason}}}'.format(state=state, reason=reason)
resp = requests.patch(
"{api_url}/repos/{repo_id}/code-scanning/alerts/{alert_num}".format(
api_url=self.gh.url,
Expand All @@ -256,5 +319,41 @@ def update(self, alert_state):
resp.raise_for_status()


def parse_alert_state(state_string):
return state_string not in ["dismissed", "fixed"]
class Secret(AlertBase):
def __init__(self, github_repo, json):
AlertBase.__init__(self, github_repo, json)

def can_transition(self):
return True

def long_desc(self):
return self.json["secret_type"]

def short_desc(self):
return self.long_desc()

def get_key(self):
return util.make_key(
self.github_repo.repo_id + "/" + self.get_type() + "/" + str(self.number())
)

def do_adjust_state(self, target_state):
state = "open"
resolution = ""
if not target_state:
state = "resolved"
resolution = ', "resolution": "wont_fix"'
data = '{{"state": "{state}"{resolution}}}'.format(
state=state, resolution=resolution
)
resp = requests.patch(
"{api_url}/repos/{repo_id}/secret-scanning/alerts/{alert_num}".format(
api_url=self.gh.url,
repo_id=self.github_repo.repo_id,
alert_num=self.number(),
),
data=data,
headers=self.gh.default_headers(),
timeout=util.REQUEST_TIMEOUT,
)
resp.raise_for_status()
66 changes: 46 additions & 20 deletions jiralib.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@
CREATE_EVENT = "jira:issue_created"
DELETE_EVENT = "jira:issue_deleted"

TITLE_PREFIX = "[Code Scanning Alert]:"

TITLE_PREFIXES = {
"Alert": "[Code Scanning Alert]:",
"Secret": "[Secret Scanning Alert]:",
}

DESC_TEMPLATE = """
{rule_desc}
{long_desc}
{alert_url}
----
This issue was automatically generated from a GitHub alert, and will be automatically resolved once the underlying problem is fixed.
DO NOT MODIFY DESCRIPTION BELOW LINE.
REPOSITORY_NAME={repo_id}
ALERT_TYPE={alert_type}
ALERT_NUMBER={alert_num}
REPOSITORY_KEY={repo_key}
ALERT_KEY={alert_key}
Expand Down Expand Up @@ -155,19 +160,35 @@ def save_repo_state(self, repo_id, state, issue_key="-"):
if a.filename == repo_id_to_fname(repo_id):
self.j.delete_attachment(a.id)

def create_issue(self, repo_id, rule_id, rule_desc, alert_url, alert_num):
# attach the new state file
self.jira.attach_file(
i.key, repo_id_to_fname(repo_id), util.state_to_json(state)
)

def create_issue(
self,
repo_id,
short_desc,
long_desc,
alert_url,
alert_type,
alert_num,
repo_key,
alert_key,
):
raw = self.j.create_issue(
project=self.projectkey,
summary="{prefix} {rule} in {repo}".format(
prefix=TITLE_PREFIX, rule=rule_id, repo=repo_id
summary="{prefix} {short_desc} in {repo}".format(
prefix=TITLE_PREFIXES[alert_type], short_desc=short_desc, repo=repo_id
),
description=DESC_TEMPLATE.format(
rule_desc=rule_desc,
long_desc=long_desc,
alert_url=alert_url,
repo_id=repo_id,
alert_type=alert_type,
alert_num=alert_num,
repo_key=util.make_key(repo_id),
alert_key=util.make_alert_key(repo_id, alert_num),
repo_key=repo_key,
alert_key=alert_key,
),
issuetype={"name": "Bug"},
labels=self.labels,
Expand All @@ -177,14 +198,18 @@ def create_issue(self, repo_id, rule_id, rule_desc, alert_url, alert_num):
issue_key=raw.key, alert_num=alert_num, repo_id=repo_id
)
)
logger.info(
"Created issue {issue_key} for {alert_type} {alert_num} in {repo_id}.".format(
issue_key=raw.key,
alert_type=alert_type,
alert_num=alert_num,
repo_id=repo_id,
)
)

return JiraIssue(self, raw)

def fetch_issues(self, repo_id, alert_num=None):
if alert_num is None:
key = util.make_key(repo_id)
else:
key = util.make_alert_key(repo_id, alert_num)
def fetch_issues(self, key):
issue_search = 'project={jira_project} and description ~ "{key}"'.format(
jira_project='"{}"'.format(self.projectkey), key=key
)
Expand Down Expand Up @@ -295,7 +320,14 @@ def parse_alert_info(desc):
if m is None:
return failed
repo_id = m.group(1)

m = re.search("ALERT_TYPE=(.*)$", desc, re.MULTILINE)
if m is None:
alert_type = None
else:
alert_type = m.group(1)
m = re.search("ALERT_NUMBER=(.*)$", desc, re.MULTILINE)

if m is None:
return failed
alert_num = int(m.group(1))
Expand All @@ -308,13 +340,7 @@ def parse_alert_info(desc):
return failed
alert_key = m.group(1)

# consistency checks:
if repo_key != util.make_key(repo_id) or alert_key != util.make_alert_key(
repo_id, alert_num
):
return failed

return repo_id, alert_num, repo_key, alert_key
return repo_id, alert_num, repo_key, alert_key, alert_type


def repo_id_to_fname(repo_id):
Expand Down
4 changes: 2 additions & 2 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def jira_webhook():
payload = json.loads(request.data.decode("utf-8"))
event = payload["webhookEvent"]
desc = payload["issue"]["fields"]["description"]
repo_id, alert_id, _, _ = jiralib.parse_alert_info(desc)
repo_id, _, _, _, _ = jiralib.parse_alert_info(desc)

app.logger.debug('Received JIRA webhook for event "{event}"'.format(event=event))

Expand Down Expand Up @@ -143,7 +143,7 @@ def github_webhook():
alert_url = alert.get("html_url")
alert_num = alert.get("number")
rule_id = alert.get("rule").get("id")
rule_desc = alert.get("rule").get("id")
rule_desc = alert.get("rule").get("description")

# TODO: We might want to do the following asynchronously, as it could
# take time to do a full sync on a repo with many alerts / issues
Expand Down
Loading

0 comments on commit 3552cf2

Please sign in to comment.