-
Notifications
You must be signed in to change notification settings - Fork 33
/
issue-scan
executable file
·179 lines (146 loc) · 5.88 KB
/
issue-scan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/env python3
# This file is part of Cockpit.
#
# Copyright (C) 2017 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.
import argparse
import json
import logging
import shlex
import sys
import time
from collections.abc import Collection, Sequence
import pika
from lib import ALLOWLIST, testmap
from lib.aio.jsonutil import JsonObject, get_dict, get_int, get_str
from lib.jobqueue import QueueEntry
from task import distributed_queue, github, labels_of_pull
# RHEL tasks have to be done inside Red Hat network
REDHAT_TASKS = [
"rhel",
"redhat"
]
sys.dont_write_bytecode = True
logging.basicConfig(level=logging.INFO)
def main() -> int:
parser = argparse.ArgumentParser(description="Scan issues for image-refresh tasks")
parser.add_argument("-v", "--human-readable", "--verbose", action="store_true", default=False,
help="Print human readable output")
parser.add_argument('--amqp', default=None,
help='The host:port of the AMQP server to publish to (format host:port)')
parser.add_argument('--issues-data', default=None,
help='issues or pull request event GitHub JSON data to evaluate')
parser.add_argument('--repo', default=None,
help='Repository to scan and checkout.')
opts = parser.parse_args()
for result in scan(opts.issues_data, opts.repo):
if opts.amqp:
with distributed_queue.DistributedQueue(opts.amqp, queues=['rhel', 'public']) as dq:
queue_task(dq, result)
elif opts.human_readable:
print(result['human'])
else:
print(json.dumps(result))
return 0
def contains_any(string: str, matches: Collection[str]) -> bool:
for match in matches:
if match in string:
return True
return False
# Map all checkable work items to fixtures
def tasks_for_issues(issues_data: str, opts_repo: str | None) -> Sequence[tuple[str, JsonObject, str]]:
issues: Sequence[JsonObject]
if issues_data:
event = json.loads(issues_data)
repo = event["repository"]["full_name"]
issue = event.get("issue") or event.get("pull_request")
if 'bot' not in labels_of_pull(issue):
return []
api = github.GitHub(repo=repo)
issues = [issue]
else:
api = github.GitHub(repo=opts_repo)
issues = api.issues(state="open")
results = []
for issue in issues:
if get_str(issue, "title").strip().startswith("WIP"):
continue
login = get_str(get_dict(issue, "user", {}), "login", None)
if login not in ALLOWLIST:
continue
#
# We only consider the first unchecked item per issue
#
# The bots think the list needs to be done in order.
# If the first item in the checklist is not something
# the bots can do, then the bots will ignore this issue
# (below in output_task)
#
checklist = github.Checklist(get_str(issue, "body", None))
for item, checked in checklist.items.items():
if not checked:
results.append((item, issue, api.repo))
break
return results
def output_task(command: str, issue: JsonObject, repo: str) -> QueueEntry | None:
name, _, context = command.partition(" ")
if name != "image-refresh" or not repo.endswith("/bots"):
return None
number = get_int(issue, "number", None)
if number is None:
return None
number = int(number)
context = shlex.quote(context.strip())
api = github.GitHub(repo=repo)
# `--issues-data` should also be able to receive pull_request events, in that
# case pull_request won't be present in the object, but commits will be
if "pull_request" in issue or "commits" in issue:
ref = f"pull/{number}/head"
sha = api.get(f"git/ref/{ref}")["object"]["sha"]
pull = number
else:
ref = testmap.get_default_branch(repo)
sha = api.get(f"git/ref/heads/{ref}")["object"]["sha"]
pull = None
current = time.strftime('%Y%m%d-%H%M%S')
slug = f'{name}-{context}-{sha[:8]}-{current}'
return {
'job': {
'repo': repo,
'sha': sha,
'pull': pull,
'slug': slug,
'context': f'{name}/{context}',
'command': (f'./{name}', '--verbose', f'--issue={number}', context),
'secrets': ('github-token', 'image-upload'),
},
'human': f"issue-{number} {name} {context} {ref}",
}
def queue_task(dq: distributed_queue.DistributedQueue, body: QueueEntry) -> None:
queue = 'rhel' if contains_any(body['job']['context'], REDHAT_TASKS) else 'public'
dq.channel.basic_publish('', queue, json.dumps(body),
properties=pika.BasicProperties(priority=distributed_queue.MAX_PRIORITY))
logging.info("Published issue task: %r", body)
# Default scan behavior run for each task
def scan(issues_data: str, opts_repo: str) -> Sequence[QueueEntry]:
results = []
# Now go through each fixture
for (command, issue, repo) in tasks_for_issues(issues_data, opts_repo):
result = output_task(command, issue, repo)
if result is not None:
results.append(result)
return results
if __name__ == '__main__':
sys.exit(main())