-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery.py
67 lines (49 loc) · 1.99 KB
/
query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from models import *
from sqlalchemy import func
import os
from dotenv import load_dotenv
load_dotenv()
class PostgresORM:
def get_postgres_uri():
DB_HOST = os.getenv('POSTGRES_DB_HOST')
DB_NAME = os.getenv('POSTGRES_DB_NAME')
DB_USER = os.getenv('POSTGRES_DB_USER')
DB_PASS = os.getenv('POSTGRES_DB_PASS')
return f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}'
def get_issue_query():
results = (
db.session.query(
DmpOrg.id.label('org_id'),
DmpOrg.name.label('org_name'),
func.json_agg(
func.json_build_object(
'id', DmpIssue.id,
'name', DmpIssue.title
)
).label('issues')
)
.outerjoin(DmpIssue, DmpOrg.id == DmpIssue.org_id)
.group_by(DmpOrg.id)
.order_by(DmpOrg.id)
.all()
)
return results
def get_issue_owner(name):
response = DmpOrg.query.filter_by(name=name).all()
return response
def get_actual_owner_query(owner):
results = DmpIssue.query.filter(DmpIssue.repo_owner.like(f'%{owner}%')).all()
results = [val.to_dict() for val in results]
return results
def get_dmp_issues(issue_id):
results = DmpIssue.query.filter_by(id=issue_id).all()
results = [val.to_dict() for val in results]
return results
def get_dmp_issue_updates(dmp_issue_id):
results = DmpIssueUpdate.query.filter_by(dmp_id=dmp_issue_id).all()
results = [val.to_dict() for val in results]
return results
def get_pr_data(dmp_issue_id):
pr_updates = Prupdates.query.filter_by(dmp_id=dmp_issue_id).all()
pr_updates_dict = [pr_update.to_dict() for pr_update in pr_updates]
return pr_updates_dict