-
Notifications
You must be signed in to change notification settings - Fork 1
/
secrets.py
97 lines (78 loc) · 2.84 KB
/
secrets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import datetime
import os
import async_hvac
import settings
from aiohttp import web
from marshmallow import Schema, fields, validate
from webargs.aiohttpparser import use_args
CUBBYHOLE_PATH = os.path.join(settings.VAULT_SECRET_BASE, "secret")
class SecretField(Schema):
name = fields.Str(validate=validate.Length(max=100, error="String is too long"))
value = fields.Str(validate=validate.Length(max=100, error="String is too long"))
class Meta:
strict = True
class Secret(Schema):
name = fields.Str(validate=validate.Length(max=100, error="String is too long"))
fields = fields.Nested(SecretField, many=True)
class Meta:
strict = True
async def new_cubbyhole(secret, master_client):
"""
Create a new token with a lifetime of a week, and with it store the secret
in a cubbyhole.
"""
token = await master_client.create_token(
policies=["single-secure-share"], lease="168h", meta={"name": secret["name"]}
)
client = async_hvac.AsyncClient(
settings.VAULT_ADDR, token=token["auth"]["client_token"]
)
await client.write(
CUBBYHOLE_PATH, lease=f"{7 * 24}h", fields=[dict(f) for f in secret["fields"]]
)
await client.close()
return token
@use_args(Secret)
async def new_secret(request, secret: Secret):
token = await new_cubbyhole(secret, request.config_dict["vault_master"])
return web.json_response(
{
"url": str(
request.app.router["show_secret"].url_for(
token=token["auth"]["client_token"]
)
),
"token": token["auth"]["client_token"],
"expiration": (
datetime.datetime.now()
+ datetime.timedelta(seconds=token["auth"]["lease_duration"])
).isoformat(),
}
)
@use_args({"token": fields.Str(required=True, location="match_info")})
async def show_secret(request, kwargs):
token = kwargs["token"]
client = async_hvac.AsyncClient(settings.VAULT_ADDR, token=token)
try:
return web.json_response(await client.lookup_token())
except async_hvac.exceptions.Forbidden:
raise web.HTTPNotFound()
finally:
await client.close()
@use_args({"token": fields.Str(required=True, location="match_info")})
async def show_secret_contents(request, kwargs):
token = kwargs["token"]
client = async_hvac.AsyncClient(settings.VAULT_ADDR, token=token)
try:
return web.json_response(await client.read(CUBBYHOLE_PATH))
except async_hvac.exceptions.Forbidden:
raise web.HTTPNotFound()
finally:
await client.close()
routes = [
web.post("/new", handler=new_secret),
web.get("/show/{token}", handler=show_secret, name="show_secret"),
web.get("/show/{token}/contents", handler=show_secret_contents),
]
app = web.Application()
app.add_routes(routes)