-
-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathpostgresql.py
74 lines (53 loc) · 2.04 KB
/
postgresql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import logging
import os
from typing import Dict, Iterable, Tuple
import psycopg2
from .interfaces import DatabaseInterface
def get_database_name():
return os.environ["POSTGRES_DB"]
def get_database_user():
return os.environ["POSTGRES_USER"]
def get_database_password():
return os.environ["POSTGRES_PASSWORD"]
def get_database_host():
return os.environ["POSTGRES_HOST"]
def get_database_port():
return os.environ["POSTGRES_PORT"]
def create_database_interface() -> DatabaseInterface:
return PostgreSQL(
get_database_host(),
get_database_name(),
get_database_user(),
get_database_password(),
get_database_port(),
)
class PostgreSQL(DatabaseInterface):
def __init__(self, host, database, user, password, port):
self._connection = psycopg2.connect(
dbname=database, user=user, password=password, host=host, port=port
)
def _commit_changes(self, command: str, data: Dict = {}) -> None:
with self._connection.cursor() as cursor:
logging.debug(f"Making change: {cursor.query}")
cursor.execute(command, data)
self._connection.commit()
def select(self, command: str) -> Iterable[Tuple]:
with self._connection.cursor() as cursor:
cursor.execute(command)
logging.debug(f"Starting query: {cursor.query}")
for entry in cursor:
logging.debug(entry)
yield entry
logging.debug(f"Finished query: {cursor.query}")
def insert(self, command: str, data: Dict = {}):
logging.debug("Inserting:")
self._commit_changes(command, data)
logging.debug("Finished inserting")
def update(self, command: str, data: Dict = {}):
logging.debug("Updating:")
self._commit_changes(command, data)
logging.debug("Finished updating")
def delete(self, command: str, data: Dict = {}):
logging.debug("Deleting:")
self._commit_changes(command, data)
logging.debug("Finished deleting")