-
Notifications
You must be signed in to change notification settings - Fork 0
/
redact.py
222 lines (180 loc) · 7.48 KB
/
redact.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#!/usr/bin/env python
"""
Redact the sensitive info from the YAML and TF config files.
"""
from collections import deque
from glob import glob
from os import makedirs
from os.path import abspath, commonpath, dirname, isfile, join, relpath, splitext
from typing import Optional, Tuple
from urllib.parse import urlparse
import click
import tiktoken
from inflect import engine
from marko import Markdown
from marko.element import Element
from marko.inline import Link
SECRET_LENGTH_REQUIREMENT = 10
plural = engine().no
tokenizer = tiktoken.encoding_for_model("gpt-4")
def as_file_destination(dest: str, source: str, base: str) -> Optional[str]:
"""
Compute a destination path to a file, relative to a base directory.
:param dest: The destination path to convert, e.g. '../some/path/to/file'
:param source: The source path to derive the base directory from, e.g. '/home/folder'
:param base: The base directory for all the directories, e.g. '/home'
:return: Converted file path relative to `base`; None if the file is outside the base directory
"""
try:
url = urlparse(dest)
if url.scheme:
return None
except ValueError:
pass
resolved = abspath(join(base, dirname(source), dest))
if commonpath([resolved, base]) != base:
return None
return relpath(resolved, base)
def secret_minimum_requirement(value):
"""
:param value: The value to check for the secret minimum requirement.
:return: True if the value meets the secret minimum requirement, False otherwise.
"""
return any(x.isdigit() for x in value) and len(value) >= SECRET_LENGTH_REQUIREMENT
def value_looks_random(value):
"""
Check if the given value appears to be random using the tokenization algorithm.
:param value: The value to be checked.
:return: True if the value appears to be random, False otherwise.
"""
if " " in value:
return any(value_looks_random(part) for part in value.split(" "))
return (
secret_minimum_requirement(value)
and len(tokenizer.encode(value)) > (len(value) * 0.6)
or len(tokenizer.encode(value)) > (len(value) * 0.45) > 12
)
def is_a_secret(key, value):
"""We define secret as:
- a sequence with a digit
- of length at least 10
- which is either
- hinted with the key containing one of two strings OR
- is part of the value separated by the whitespace which looks random for ChatGPT tokenizer
"""
key = key.lower()
return (
secret_minimum_requirement(value)
if "token" in key or "password" in key
else value_looks_random(value)
)
def redact_text(text, file_ext) -> Tuple[str, int]:
"""
:param text: The input text to be redacted. It can be a multiline string.
:param file_ext: The file extension for the format of the input text (e.g., '.yaml', '.txt').
:return: A tuple containing the redacted text (str) and the count of redacted lines (int).
This method iterates through the key/value pairs (according to the rules specific for the given
file extension) of the input text and redacts anything that looks like sensitive information
found in the values with "REDACTED".
The resulting text is returned along with the count of redacted values.
"""
count_redacted = 0
def gen():
nonlocal count_redacted
for line in text.splitlines(keepends=True):
sep = ":" if file_ext == ".yaml" else "="
if "#" in line and not line.partition("#")[0].strip():
yield line
continue
if sep not in line and "=" in line:
sep = "="
if sep not in line and ":" in line:
sep = ":"
key, sep_, value = line.partition(sep)
stripped_value = value.partition("#")[0].strip(" \n\"',")
if key and sep_ and value and is_a_secret(key, stripped_value):
yield key + sep_ + (" " if value[0] == " " else "") + "REDACTED" + (
"\n" if value[-1] == "\n" else ""
)
count_redacted += 1
else:
yield line
out_text = "".join(gen())
return out_text, count_redacted
def create_and_write(out_dir, filename, text):
"""
A simple helper to create and write contents to a file.
:param out_dir: The directory to create the file in.
:param filename: The name of the file to create.
:param text: The text to write to the file.
"""
output_file = join(out_dir, filename)
makedirs(dirname(output_file), exist_ok=True)
with open(output_file, "wt", encoding="utf-8") as output_stream:
output_stream.write(text)
class ProcessingMessage:
"""A class for producing nicely formatted "processing xxx... done" messages."""
def __init__(self, file_name):
self.file_name = file_name
self.formatted_file_name = click.style(
click.format_filename(file_name), "white", bold=True
)
def __enter__(self):
message = f"Processing {self.formatted_file_name}... "
click.echo(message, nl=False)
return self
def __exit__(self, exception_type, value, traceback):
message = (
click.style("error", fg="red") if exception_type else click.style("done", "green")
)
click.echo(message)
@click.command()
@click.argument("in_dir")
@click.argument("out_dir")
def redact(in_dir, out_dir):
"""Copy markdown and YAML/TF files referenced there, but redact the secrets
:param in_dir: Input directory path (relative to current working directory)
:param out_dir: Output directory path (will be created if it does not exist)
"""
markdown = Markdown()
referenced_files = set()
in_dir = abspath(in_dir)
out_dir = abspath(out_dir)
for md_file in glob("**/*.md", recursive=True, root_dir=in_dir):
with ProcessingMessage(md_file):
found_links = 0
with open(join(in_dir, md_file), "rt", encoding="utf-8") as input_stream:
text = input_stream.read()
document = markdown.parse(text)
queue = deque([document])
while queue:
node: Element = queue.popleft()
if hasattr(node, "children"):
queue.extend(node.children)
if isinstance(node, Link):
if file_dest := as_file_destination(node.dest, md_file, in_dir):
found_links += 1
referenced_files.add(file_dest)
click.echo(
f"found {click.style(plural('link', found_links), 'blue', underline=True)}, ",
nl=False,
)
create_and_write(out_dir, md_file, text)
for value_file in sorted(referenced_files):
try:
with ProcessingMessage(value_file):
fullname = join(in_dir, value_file)
if isfile(fullname):
with open(fullname, "rt", encoding="utf-8") as input_stream:
text = input_stream.read()
out_text, found_secrets = redact_text(text, splitext(value_file)[1])
click.echo(
f"redacted {click.style(plural('secret', found_secrets), reverse=True)}, ",
nl=False,
)
create_and_write(out_dir, value_file, out_text)
except FileNotFoundError:
pass
if __name__ == "__main__":
# pylint: disable=no-value-for-parameter
redact()