-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTwinklyDb.py
158 lines (133 loc) · 5.77 KB
/
TwinklyDb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import psycopg2
from contextlib import closing
from psycopg2.extras import DictCursor
from psycopg2.sql import SQL
from psycopg2 import sql
import json
from os import environ
import init
def run_query(stmt):
dbname, user, password, host, port = environ['db_name'], environ['db_user'], environ['db_password'], environ['db_host'], int(environ['db_port'])
with closing(psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)) as conn:
with conn.cursor() as cursor:
cursor.execute(stmt)
conn.commit()
return cursor.fetchall()
def run_query_nofetch(stmt):
dbname, user, password, host, port = environ['db_name'], environ['db_user'], environ['db_password'], environ['db_host'], int(environ['db_port'])
with closing(psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)) as conn:
with conn.cursor() as cursor:
# ETO NE PRAVILNO NO poidet!
try:
cursor.execute(stmt)
conn.commit()
except Exception as e:
print(str(e))
class User:
column_names = ['chat_id', 'first_name', 'last_name', 'username', 'language','reg_date']
def __init__(self, chat_id ,first_name,last_name, username, user_id = None):
self.chat_id = chat_id
self.first_name = first_name
self.last_name = last_name
self.username = username
self.user_id = user_id
def __repr__(self):
return f'User: {self.username}, id: {self.user_id}, chat_id: {self.chat_id}'
#Add new user to database
@classmethod
def addNewUser(cls, chat_id, first_name, last_name, username, language):
# ADD language table
stmt = SQL('INSERT INTO "Users" ({}) VALUES ({}, current_date)').format(
SQL(',').join(map(sql.Identifier, cls.column_names)),
SQL(',').join(map(sql.Literal, [chat_id, first_name, last_name, username, language])),
)
run_query_nofetch(stmt)
#Get user by chat ID, return User object list
@classmethod
def getUser(cls, chat_id):
stmt = SQL('SELECT * FROM "Users" WHERE chat_id = {}').format(
SQL(',').join(map(sql.Literal, [chat_id])),
)
return [User(*el[1:], user_id = el[0]) for el in run_query(stmt)]
@classmethod
def setUserLang(cls, chat_id, lang):
stmt = SQL('UPDATE "Users" SET language = {} WHERE chat_id = {}').format(
sql.Literal(lang),sql.Literal(chat_id),
)
run_query_nofetch(stmt)
@classmethod
def getUserLang(cls, chat_id):
stmt = SQL('SELECT language FROM "Users" WHERE chat_id = {}').format(
sql.Literal(chat_id),
)
try:
result = run_query(stmt)[0][0]
return result
except Exception as e:
return 'en'
class Review:
column_names = ['chat_id', 'q_json', 'form_id', 'lat', 'lon', 'adr', 'place', 'comment', 'mark']
def __init__(self, chat_id, q_json, form_id, lat, lon, adr, place, comment, mark, review_id = None):
self.chat_id = chat_id
self.q_json = q_json
self.form_id = form_id
self.lat = lat
self.lon = lon
self.adr = adr
self.place = place
self.comment = comment
self.mark = mark
self.review_id = review_id
def __repr__(self):
return f'Review: {self.q_json}, by: {self.chat_id}, place: {self.place}, comment: {self.comment}, id: {self.review_id}'
@classmethod
def getReview(cls, review_id):
stmt = SQL('SELECT * FROM "Reviews" WHERE review_id = {}').format(
SQL(',').join(map(sql.Literal, [review_id])),
)
return [Review(*el[1:], review_id = el[0]) for el in run_query(stmt)]
@classmethod
def addReview(cls, *args, **kwargs):
stmt = SQL('INSERT INTO "Reviews" ({}, submit_time) VALUES ({}, current_timestamp)').format(
SQL(',').join(map(sql.Identifier, cls.column_names)),
SQL(',').join(map(sql.Literal, args)),
)
run_query_nofetch(stmt)
@classmethod
def getMark(cls, id):
stmt = SQL('SELECT AVG(mark) FROM "Reviews" WHERE adr = {}').format(sql.Literal(id))
result = run_query(stmt)
if len(result) == 0:
return 0
return(int(result[0][0]))
@classmethod
def getComments(cls, place_id,chat_id):
stmt = SQL('SELECT comment FROM "Reviews" WHERE adr = {} and chat_id != {} ORDER BY submit_time DESC LIMIT 3;').format(sql.Literal(place_id), sql.Literal(chat_id))
result = run_query(stmt)
if len(result) == 0:
return []
return([msg[0] for msg in result])
#EXAMPLES
# User.addNewUser(100500, 'Testname', 'TestLastName','chat','unmae')
# print(User.getUser(100500)[0].checkLocation())
# print(User.getUser(100500)[0].updateLocation(100, 500))
# print(User.getUser(100500))
# print(Form.getForm(1))
# print(Form.addForm('TestYForm', [1,2]))
# print(Question.getQuestion(1, 'RU'))
# print(Question.getQuestion(1, 'UA'))
# print(Question.addQuestion('Hellot there?'))
# print(Review.getReview(1))
# print (type(Review.getReview(1)[0].q_json))
# Review.addReview(100500, json.dumps({'1':0}), 1,1,1,'Tereshkovoy10', 'ONPU', 'cmnt', 10)
# Form example
# print(Question.getQuestions([1,2,3], 'UA'))
# form = Form.getForm(1)[0]
# questions = Question.getQuestions(form.questions, 'UA')
# for q in questions:
# print (q)
# print(Review.getMark('--'))
# print(Review.getComments('--', 100500))
# print(Review.isReviewEstimate('ChIJg8PGGlzUhg4RDMUagWtgV6E', 516233921))
# User.setUserLang(384341805, 'ru')
# print(User.getUserLang(384341805))