-
Notifications
You must be signed in to change notification settings - Fork 2
/
server-example.py
103 lines (89 loc) · 3.54 KB
/
server-example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
from dotenv import load_dotenv
from orbis_python import OrbisDB
from flask import Flask, request
import json
app = Flask(__name__)
load_dotenv(dotenv_path='.env',override=True)
ENV_ID = os.getenv("ENV_ID")
TABLE_ID = os.getenv("TABLE_ID")
CONTEXT_ID = os.getenv("CONTEXT_ID")
AGENT_ONE_SEED = os.getenv("AGENT_ONE_SEED")
AGENT_TWO_SEED = os.getenv("AGENT_TWO_SEED")
AGENT_THREE_SEED = os.getenv("AGENT_THREE_SEED")
CERAMIC_ENDPOINT = os.getenv("CERAMIC_ENDPOINT")
ORBIS_ENDPOINT = os.getenv("ORBIS_ENDPOINT")
switcher = {
"agent_one": AGENT_ONE_SEED,
"agent_two": AGENT_TWO_SEED,
"agent_three": AGENT_THREE_SEED
}
# GET http://127.0.0.1:5000?agent=agent_two
@app.route('/')
def get():
agent = request.args.get('agent')
seed = switcher.get(agent)
orbis = OrbisDB(c_endpoint=CERAMIC_ENDPOINT,
o_endpoint=ORBIS_ENDPOINT,
context_stream=CONTEXT_ID,
table_stream=TABLE_ID,
controller_private_key=seed)
return json.dumps(orbis.ceramic_client.did.id)
# POST http://127.0.0.1:5000/create_document?agent=agent_three
# {"page": "/home", "address": "0x8071f6F971B438f7c0EA72C950430EE7655faBCe", "customer_user_id": 3, "timestamp": "2024-09-25T15:06:14.957719+00:00"}
@app.route('/create_document', methods=['POST'])
def create_document():
agent = request.args.get('agent')
seed = switcher.get(agent)
orbis = OrbisDB(c_endpoint=CERAMIC_ENDPOINT,
o_endpoint=ORBIS_ENDPOINT,
context_stream=CONTEXT_ID,
table_stream=TABLE_ID,
controller_private_key=seed)
content = request.json
doc = orbis.add_row(content)
# Return stringified stream_id
return json.dumps(doc)
# GET http://127.0.0.1:5000/get?agent=agent_one
@app.route('/get', methods=['GET'])
def get_documents():
agent = request.args.get('agent')
seed = switcher.get(agent)
orbis = OrbisDB(c_endpoint=CERAMIC_ENDPOINT,
o_endpoint=ORBIS_ENDPOINT,
context_stream=CONTEXT_ID,
table_stream=TABLE_ID,
controller_private_key=seed)
return orbis.read(ENV_ID)
## Need to update below
# # GET http://127.0.0.1:5000/filter?customer_user_id=3&agent=agent_two
@app.route('/filter')
def get_filtered_documents():
agent = request.args.get('agent')
seed = switcher.get(agent)
orbis = OrbisDB(c_endpoint=CERAMIC_ENDPOINT,
o_endpoint=ORBIS_ENDPOINT,
context_stream=CONTEXT_ID,
table_stream=TABLE_ID,
controller_private_key=seed)
# Get the filter from the query string but remove the agent key
filter = {k: v for k, v in request.args.items() if k != 'agent'}
res = orbis.filter(ENV_ID, filter)
return res
# # PATCH http://127.0.0.1:5000/update_document?agent=agent_two
# # payload example: { "filters": {"customer_user_id": 3 }, "content": {"customer_user_id": 4} }
@app.route('/update_document', methods=['PATCH'])
def update_document():
agent = request.args.get('agent')
seed = switcher.get(agent)
orbis = OrbisDB(c_endpoint=CERAMIC_ENDPOINT,
o_endpoint=ORBIS_ENDPOINT,
context_stream=CONTEXT_ID,
table_stream=TABLE_ID,
controller_private_key=seed)
content = request.json.get('content')
filters = request.json.get('filters')
doc = orbis.update_rows(ENV_ID, filters, content)
return doc
if __name__ == '__main__':
app.run(debug=True)