-
Notifications
You must be signed in to change notification settings - Fork 1
/
routes.py
198 lines (164 loc) · 5.63 KB
/
routes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import time
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, Request, Form
from fastapi.responses import RedirectResponse
from fastapi_oauth.common.context import OAuthContext
from fastapi_oauth.common.urls import quote
from oauthlib.oauth2 import OAuth2Error
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from werkzeug.security import gen_salt
from dep import get_current_user, get_session, extract_session_in_request
from models import User, OAuthClient
from oauth2 import AUTHORIZATION, require_scope
from schema import CreateClientRequest
from session import SESSIONS, SESSION_KEY
from template import TEMPLATE
router = APIRouter()
def split_by_crlf(s):
return [v for v in s.splitlines() if v]
@router.get('/')
async def home(
request: Request,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session)
):
if current_user:
clients = (await session.execute(select(OAuthClient).filter_by(user_id=current_user.id))).scalars()
else:
clients = []
return TEMPLATE.TemplateResponse(
name='home.html.jinja2',
context=dict(
request=request,
user=current_user,
clients=clients
)
)
@router.post('/')
async def home_post(
*,
next: str = None,
username: str = Form(),
session: AsyncSession = Depends(get_session)
):
user = (await session.scalars(select(User).filter_by(email=username))).first()
if not user:
user = User(email=username)
session.add(user)
await session.commit()
await session.refresh(user)
new_session = uuid.uuid4().hex
SESSIONS[new_session] = user.id
# if user is not just to log in, but need to head back to the auth page, then go for it
if next:
response = RedirectResponse(next, status_code=status.HTTP_302_FOUND)
else:
response = RedirectResponse('/', status_code=status.HTTP_302_FOUND)
response.set_cookie(SESSION_KEY, new_session)
return response
@router.get('/logout')
def logout(
session_id: str = Depends(extract_session_in_request)
):
if session_id and session_id in SESSIONS:
del SESSIONS[session_id]
return RedirectResponse('/', status_code=status.HTTP_302_FOUND)
@router.get('/create_client')
async def create_client_index(
request: Request,
current_user: User = Depends(get_current_user),
):
if not current_user:
return RedirectResponse('/', status_code=status.HTTP_302_FOUND)
return TEMPLATE.TemplateResponse(
'create_client.html.jinja2',
context=dict(
request=request
)
)
@router.post('/create_client')
async def create_client(
*,
current_user: User = Depends(get_current_user),
session: AsyncSession = Depends(get_session),
client_info: CreateClientRequest = Depends()
):
if not current_user:
return RedirectResponse('/', status_code=status.HTTP_302_FOUND)
client_id = gen_salt(24)
client_id_issued_at = int(time.time())
client = OAuthClient(
client_id=client_id,
client_id_issued_at=client_id_issued_at,
user_id=current_user.id,
)
client_info.grant_types = split_by_crlf(client_info.grant_types)
client_info.redirect_uris = split_by_crlf(client_info.redirect_uris)
client_info.response_types = split_by_crlf(client_info.response_types)
client_metadata = client_info.__dict__
client.set_client_metadata(client_metadata)
if client_info.token_endpoint_auth_method == 'none':
client.client_secret = ''
else:
client.client_secret = gen_salt(48)
session.add(client)
await session.commit()
return RedirectResponse('/', status_code=status.HTTP_302_FOUND)
@router.get('/oauth/authorize')
async def authorize(
*,
context: OAuthContext = Depends(AUTHORIZATION.get_oauth_context),
request: Request
):
# if user log status is not true (Auth server), then to log it in
if not context.user_from_session:
return RedirectResponse(f"/?next={quote(str(request.url))}")
try:
grant = await AUTHORIZATION.get_consent_grant(context=context)
except OAuth2Error as error:
return error.error
return TEMPLATE.TemplateResponse(
name='authorize.html.jinja2',
context=dict(
request=request,
user=context.user_from_session,
grant=grant
)
)
@router.post('/oauth/authorize')
async def authorize(
*,
request: Request,
context: OAuthContext = Depends(AUTHORIZATION.get_oauth_context),
confirm: bool = Form(False)
):
# if user log status is not true (Auth server), then to log it in
if not context.user_from_session:
return RedirectResponse(f"/?next={quote(str(request.url))}")
grant_user = None
if confirm:
grant_user = context.user_from_session
return await AUTHORIZATION.create_authorization_response(
context=context,
grant_user=grant_user
)
@router.post('/oauth/token')
async def issue_token(
context: OAuthContext = Depends(AUTHORIZATION.get_oauth_context),
):
return await AUTHORIZATION.create_token_response(context=context)
@router.post('/oauth/revoke')
async def revoke_token(
context: OAuthContext = Depends(AUTHORIZATION.get_oauth_context)
):
return await AUTHORIZATION.create_endpoint_response('revocation', context=context)
@router.get('/api/me')
@require_scope('profile') # TODO: lay dependent tu token khong phai session
async def api_me(
context: OAuthContext = Depends(AUTHORIZATION.get_oauth_context)
):
user: Optional[User] = context.user_from_token
return user