-
Notifications
You must be signed in to change notification settings - Fork 1
/
db.py
77 lines (57 loc) · 2.88 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os, sys
from typing import Any
from supabase import create_client, Client
from supabase.lib.client_options import ClientOptions
from abc import ABC, abstractmethod
from dotenv import load_dotenv
load_dotenv()
client_options = ClientOptions(postgrest_client_timeout=None)
url: str = os.getenv("SUPABASE_URL")
key: str = os.getenv("SUPABASE_KEY")
class SupabaseInterface():
_instance = None
def __init__(self):
# Initialize Supabase client upon first instantiation
if not SupabaseInterface._instance:
self.supabase_url =url
self.supabase_key =key
self.client: Client = create_client(self.supabase_url, self.supabase_key, options=client_options)
SupabaseInterface._instance = self
else:
SupabaseInterface._instance = self._instance
@staticmethod
def get_instance():
# Static method to retrieve the singleton instance
if not SupabaseInterface._instance:
# If no instance exists, create a new one
SupabaseInterface._instance = SupabaseInterface()
return SupabaseInterface._instance
def readAll(self, table):
data = self.client.table(f"{table}").select("*").execute()
return data.data
def add_data(self, data,table_name):
data = self.client.table(table_name).insert(data).execute()
return data.data
def update_data(self,data,table_name, match_column, match_value):
response = self.client.table(table_name).update(data).eq(match_column, match_value).execute()
return response.data
def multiple_update_data(self,data,table_name, match_column, match_value):
response = self.client.table(table_name).update(data).eq(match_column[0], match_value[0]).eq(match_column[1], match_value[1]).execute()
return response.data
def upsert_data(self,data,table_name):
response = self.client.table(table_name).upsert(data).execute()
return response.data
def add_data_filter(self, data, table_name):
# Construct the filter based on the provided column names and values
filter_data = {column: data[column] for column in ['dmp_id','issue_number','owner']}
# Check if the data already exists in the table based on the filter
existing_data = self.client.table(table_name).select("*").eq('dmp_id',data['dmp_id']).execute()
# If the data already exists, return without creating a new record
if existing_data.data:
return "Data already exists"
# If the data doesn't exist, insert it into the table
new_data = self.client.table(table_name).insert(data).execute()
return new_data.data
def get_dmp_issues(self):
response = self.client.table('dmp_issues').select('*, dmp_orgs(*)').execute()
return response.data