forked from OCA/server-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpurge_tables.py
More file actions
141 lines (125 loc) · 4.98 KB
/
purge_tables.py
File metadata and controls
141 lines (125 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright 2014-2016 Therp BV <http://therp.nl>
# Copyright 2021 Camptocamp <https://camptocamp.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
# pylint: disable=consider-merging-classes-inherited
from psycopg2.extensions import AsIs
from odoo import _, api, fields, models
from odoo.exceptions import UserError
from ..identifier_adapter import IdentifierAdapter
_TABLE_TYPE_SELECTION = [
("base", "SQL Table"),
("view", "SQL View"),
]
class CleanupPurgeLineTable(models.TransientModel):
_inherit = "cleanup.purge.line"
_name = "cleanup.purge.line.table"
_description = "Cleanup Purge Line Table"
wizard_id = fields.Many2one(
"cleanup.purge.wizard.table", "Purge Wizard", readonly=True
)
table_type = fields.Selection(selection=_TABLE_TYPE_SELECTION)
def purge(self):
"""
Unlink tables upon manual confirmation.
"""
if self:
objs = self
else:
objs = self.env["cleanup.purge.line.table"].browse(
self._context.get("active_ids")
)
tables = objs.mapped("name")
for line in objs:
if line.purged:
continue
# Retrieve constraints on the tables to be dropped
# This query is referenced in numerous places
# on the Internet but credits probably go to Tom Lane
# in this post http://www.postgresql.org/\
# message-id/22895.1226088573@sss.pgh.pa.us
# Only using the constraint name and the source table,
# but I'm leaving the rest in for easier debugging
self.env.cr.execute(
"""
SELECT conname, confrelid::regclass, af.attname AS fcol,
conrelid::regclass, a.attname AS col
FROM pg_attribute af, pg_attribute a,
(SELECT conname, conrelid, confrelid,conkey[i] AS conkey,
confkey[i] AS confkey
FROM (select conname, conrelid, confrelid, conkey,
confkey, generate_series(1,array_upper(conkey,1)) AS i
FROM pg_constraint WHERE contype = 'f') ss) ss2
WHERE af.attnum = confkey AND af.attrelid = confrelid AND
a.attnum = conkey AND a.attrelid = conrelid
AND confrelid::regclass = '%s'::regclass;
""",
(IdentifierAdapter(line.name, quote=False),),
)
for constraint in self.env.cr.fetchall():
if constraint[3] in tables:
self.logger.info(
"Dropping constraint %s on table %s (to be dropped)",
constraint[0],
constraint[3],
)
self.env.cr.execute(
"ALTER TABLE %s DROP CONSTRAINT %s",
(
IdentifierAdapter(constraint[3]),
IdentifierAdapter(constraint[0]),
),
)
if line.table_type == "base":
_sql_type = "TABLE"
elif line.table_type == "view":
_sql_type = "VIEW"
self.logger.info("Dropping %s %s", _sql_type, line.name)
self.env.cr.execute(
"DROP %s %s", (AsIs(_sql_type), IdentifierAdapter(line.name))
)
line.write({"purged": True})
return True
class CleanupPurgeWizardTable(models.TransientModel):
_inherit = "cleanup.purge.wizard"
_name = "cleanup.purge.wizard.table"
_description = "Purge tables"
@api.model
def find(self):
"""
Search for tables and views that cannot be instantiated.
"""
known_tables = []
for model in self.env["ir.model"].search([]):
if model.model not in self.env:
continue
model_pool = self.env[model.model]
known_tables.append(model_pool._table)
known_tables += [
column.relation
for column in model_pool._fields.values()
if column.type == "many2many"
and (column.compute is None or column.store)
and column.relation
]
self.env.cr.execute(
"""
SELECT table_name, table_type FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type in ('BASE TABLE', 'VIEW')
AND table_name NOT IN %s""",
(tuple(known_tables),),
)
res = [
(
0,
0,
{"name": row[0], "table_type": "view" if row[1] == "VIEW" else "base"},
)
for row in self.env.cr.fetchall()
]
if not res:
raise UserError(_("No orphaned tables found"))
return res
purge_line_ids = fields.One2many(
"cleanup.purge.line.table", "wizard_id", "Tables to purge"
)