-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
112 lines (100 loc) · 3.04 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
from datetime import datetime
import requests
from db import membersdb
from models import Member
from otypes import MemberType
inter_communication_secret = os.getenv("INTER_COMMUNICATION_SECRET")
def non_deleted_members(member_input) -> MemberType:
"""
Function to return non-deleted members for a particular cid, uid
Only to be used in admin functions,
as it returns both approved/non-approved members.
"""
updated_sample = membersdb.find_one(
{
"$and": [
{"cid": member_input["cid"]},
{"uid": member_input["uid"]},
]
},
{"_id": 0},
)
if updated_sample is None:
raise Exception("No such Record")
roles = []
for i in updated_sample["roles"]:
if i["deleted"] is True:
continue
roles.append(i)
updated_sample["roles"] = roles
return MemberType.from_pydantic(Member.model_validate(updated_sample))
def unique_roles_id(uid, cid):
"""
Function to give unique ids for each of the role in roles list
"""
pipeline = [
{
"$set": {
"roles": {
"$map": {
"input": {"$range": [0, {"$size": "$roles"}]},
"in": {
"$mergeObjects": [
{"$arrayElemAt": ["$roles", "$$this"]},
{
"rid": {
"$toString": {
"$add": [
{"$toLong": datetime.now()},
"$$this",
]
}
}
},
]
},
}
}
}
}
]
membersdb.update_one(
{
"$and": [
{"cid": cid},
{"uid": uid},
]
},
pipeline,
)
def getUser(uid, cookies=None):
"""
Function to get a particular user details
"""
try:
query = """
query GetUserProfile($userInput: UserInput!) {
userProfile(userInput: $userInput) {
firstName
lastName
email
rollno
}
}
"""
variable = {"userInput": {"uid": uid}}
if cookies:
request = requests.post(
"http://gateway/graphql",
json={"query": query, "variables": variable},
cookies=cookies,
)
else:
request = requests.post(
"http://gateway/graphql",
json={"query": query, "variables": variable},
)
return request.json()["data"]["userProfile"]
except Exception:
return None