-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathorm.py
227 lines (196 loc) · 7.28 KB
/
orm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import asyncio, logging
import aiomysql
def log(sql, args=()):
logging.info('SOL: %s' % sql)
async def create_pool(loop, **kw):
logging.info('create database connection pool...')
global __pool
__pool = await aiomysql.create_pool(
host = kw.get('host', 'localhost'),
port = kw.get('port', 3306),
user = kw['user'],
password = kw['password'],
db = kw['db'],
charset = kw.get('charset', 'utf8'),
autocommit = kw.get('autocommit', True), # 自动提交模式
maxsize = kw.get('maxsize', 10),
minsize = kw.get('minsize', 1),
loop = loop
)
async def select(sql, args, size = None):
log(sql, args)
global __pool
async with __pool.get() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql.replace('?', '%s'), args or ())
if size:
rs = await cur.fetchmany(size)
else:
rs = await cur.fetchall()
logging.info('rows returned: %s' % len(rs))
return rs
async def execute(sql, args, autocommit=True):
log(sql)
async with __pool.get() as conn:
if not autocommit:
await conn.begin()
try:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql.replace('?', '%s'), args)
affected = cur.rowcount
if not autocommit:
await conn.commit()
except BaseException as e:
if not autocommit:
await conn.rollback() # 回滚到更改前的状态
raise
return affected
def create_args_string(num : int) -> str:
L = []
for n in range(num):
L.append('?')
return ','.join(L)
class Field():
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
def __str__(self):
return "<%s, %s : %s>" % (self.__class__.__name__, self.column_type, self.name)
class StringField(Field):
def __init__(self, name = None, primary_key = False, default = None, ddl = 'varchar(100)'):
super().__init__(name, ddl, primary_key, default)
class BooleanField(Field):
def __init__(self, name = None, defalut = False):
super().__init__(name, 'boolean', False, defalut)
class IntegerField(Field):
def __init__(self, name = None, primary_key = False, default = 0):
super().__init__(name, 'bigint', primary_key, default)
class FloatField(Field):
def __init__(self, name = None, primary_key = False, default = 0.0):
super().__init__(name, 'real', primary_key, default)
class TextField(Field):
def __init__(self, name = None, default = None):
super().__init__(name, 'text', False, default)
class MediumBlobField(Field):
def __init__(self, name = None, primary_key = False, default = None):
super().__init__(name, 'mediumblob', primary_key, default)
class ModelMetaclass(type):
def __new__(cls, name, bases, attrs):
if name == 'Model':
return type.__new__(cls, name, bases, attrs)
tableName = attrs.get('__table__', None) or name
logging.info('found model : %s (table : %s)' % (name, tableName))
mappings = dict()
fields = []
primaryKey = None
for k, v in attrs.items():
if (isinstance(v, Field)):
logging.info(' found mapping: %s ==> %s' % (k, v))
mappings[k] = v
if v.primary_key:
if primaryKey:
raise Exception('Duplicate primary key for field %s' % k)
primaryKey = k
else:
fields.append(k)
if not primaryKey:
raise Exception('Primary key not found.')
for k in mappings.keys():
attrs.pop(k)
escaped_field = list(map(lambda f : '`%s`' % f, fields))
attrs['__mappings__'] = mappings
attrs['__table__'] = tableName
attrs['__primary_key__'] = primaryKey
attrs['__fields__'] = fields
attrs['__select__'] = 'select `%s`, %s from %s' % (primaryKey, ','.join(escaped_field), tableName)
attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ','.join(escaped_field), primaryKey, create_args_string(len(escaped_field) + 1))
attrs['__delete__'] = 'delete from `%s` where `%s` = ?' % (tableName, primaryKey)
attrs['__update__'] = 'update `%s` set %s where `%s` = ?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey)
return type.__new__(cls, name, bases, attrs)
class Model(dict, metaclass=ModelMetaclass):
def __init__(self, **kw):
super(Model, self).__init__(**kw)
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError("'Model' object has no attribute '%s'" % key)
def __setattr__(self, key, value):
self[key] = value
def getValue(self, key):
return getattr(self, key, None)
def getValueOrDefalut(self, key):
value = getattr(self, key, None)
if value is None:
field = self.__mappings__[key]
if field.default is not None:
value = field.default() if callable(field.default) else field.default
logging.debug('using default value for %s: %s' % (key, str(value)))
setattr(self, key, value)
return value
@classmethod
async def findAll(cls, where = None, args = None, **kw):
sql = [cls.__select__]
if where:
sql.append('where')
sql.append(where)
if args is None:
args = []
orderBy = kw.get('orderBy', None)
if orderBy:
sql.append('order by')
sql.append(orderBy)
limit = kw.get('limit', None)
if limit is not None:
sql.append('limit')
if isinstance(limit, int):
sql.append('?')
args.append(limit)
elif isinstance(limit, tuple) and len(limit) == 2:
sql.append('?,?')
args.extend(limit)
else:
raise ValueError('Invalid limit value: %s' % str(limit))
rs = await select(' '.join(sql), args)
return [cls(**r) for r in rs]
@classmethod
async def findNumber(cls, selectField, where=None, args=None):
sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]
if where:
sql.append('where')
sql.append(where)
rs = await select(' '.join(sql), args, 1)
if len(rs) == 0:
return None
return rs[0]['_num_']
# find object by primary key
@classmethod
async def find(cls, pk):
rs = await select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
if len(rs) == 0:
return None
return cls(**rs[0])
# find object by using the 'find-like' in sql
@classmethod
async def find_use_like(cls, column, what):
rs = await select('%s where `%s` like ?' % (cls.__select__, column), ['%' + what + '%'])
return [cls(**r) for r in rs]
async def save(self):
args = list(map(self.getValueOrDefalut, self.__fields__))
args.append(self.getValueOrDefalut(self.__primary_key__))
rows = await execute(self.__insert__, args)
if rows != 1:
logging.warn('failed to insert record: affected rows: %s' % rows)
async def update(self):
args = list(map(self.getValue, self.__fields__))
args.append(self.getValue(self.__primary_key__))
rows = await execute(self.__update__, args)
if rows != 1:
logging.warn('failed to update by primary key: affected rows: %s' % rows)
async def remove(self):
args = [self.getValue(self.__primary_key__)]
rows = await execute(self.__delete__, args)
if rows != 1:
logging.warn('failed to remove by primary key: affected rows: %s' % rows)