-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
151 lines (128 loc) · 4.47 KB
/
Copy pathdatabase.py
File metadata and controls
151 lines (128 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import psycopg2
from env import DB_NAME, DB_USER, DB_PASSWORD, DB_HOST, DB_PORT
from sqlOperations import Sql
sql = Sql("user")
class DatabaseError(Exception):
pass
class PostgreSQLDatabase:
def __init__(self):
self.db_name = DB_NAME
self.db_user = DB_USER
self.db_password = DB_PASSWORD
self.db_host = DB_HOST
self.db_port = DB_PORT
def connect(self):
try:
connection = psycopg2.connect(
database=self.db_name,
user=self.db_user,
password=self.db_password,
host=self.db_host,
port=self.db_port,
)
except Exception:
raise DatabaseError
return connection
def init_table(self):
with self.connect() as c:
cur = c.cursor()
query = """
CREATE TABLE IF NOT EXISTS "user" (
user_id VARCHAR(255),
server_id VARCHAR(255),
status INTEGER,
in_time REAL,
total_time REAL,
weekly_time REAL,
daily_time REAL,
last_updated DATE,
PRIMARY KEY (user_id, server_id)
);
"""
cur.execute(query)
c.commit()
def insert(self, fields, values):
with self.connect() as c:
cur = c.cursor()
query = sql.insert(fields)
cur.execute(query, values)
c.commit()
def select(self, fields, constraint_key, constraint_value):
with self.connect() as c:
cur = c.cursor()
query = sql.select(fields, constraint_key)
if constraint_value is not None:
cur.execute(query, constraint_value)
else:
cur.execute(query)
val = cur.fetchall()
return val
def delete(self, constraint_key, constraint_value):
with self.connect() as c:
cur = c.cursor()
query = sql.delete(constraint_key)
cur.execute(query, constraint_value)
c.commit()
def update(self, fields, values, constraint_key, constraint_value):
with self.connect() as c:
cur = c.cursor()
query = sql.update(fields, constraint_key)
cur.execute(query, values + constraint_value)
c.commit()
def update_many(self, fields, values_list, constraint_key, constraint_value_list):
with self.connect() as c:
cur = c.cursor()
query = sql.update(fields, constraint_key)
for i, values in enumerate(values_list):
cur.execute(query, values + constraint_value_list[i])
c.commit()
def drop_table(self):
with self.connect() as c:
cur = c.cursor()
cur.execute(sql.drop_table())
c.commit()
def list_tables(connection):
query = """
SELECT table_schema, table_name
FROM information_schema.tables
WHERE (table_schema = 'public')
ORDER BY table_name;
"""
cursor = connection.cursor()
cursor.execute(query)
connection.commit()
result = cursor.fetchall()
return result
def list_fields(connection, name):
"""
returns a python list of field names
"""
query = """
SELECT column_name FROM
INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = %s;
"""
cursor = connection.cursor()
cursor.execute(query, (name, ))
result = cursor.fetchall()
fields = [field[0] for field in result]
return fields
def list_schema(connection):
table_names = list_tables(connection)
with open("DB_SCHEMA.txt", "w", encoding="utf-8") as f:
cur = connection.cursor()
for el in table_names:
if el[1] != 'pg_stat_statements':
query = f""" SELECT * FROM "{el[1]}" ; """
cur.execute(query)
result = cur.fetchall()
f.write(f"===TABLE: {el[1]}===\n")
f.write(f"Columns: {list_fields(connection, el[1])}\n")
for r in result:
f.write(f"{r}\n")
if __name__ == "__main__":
db = PostgreSQLDatabase()
all_fields = ['user_id', 'server_id', 'status', 'in_time',
'total_time', 'weekly_time', 'daily_time', 'last_updated']
with db.connect() as c:
list_schema(c)