-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathns.py
141 lines (130 loc) · 3.91 KB
/
ns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import weakref
from util import *
import logging
logger = logging.getLogger('namespace')
root_namespace = {}
cache = {}
cache_names = {}
lru_cache = list(map(lambda x: None, range(0, 8)))
def cache_object(obj):
ident = obj.ident()
assert_ident(ident)
logger.debug(f'cache: {ident}')
if ident in cache and cache[ident]() is not obj:
logger.error(f'(cache) different object with same identifier {ident}!')
# todo: raise
# add to cache
cache[ident] = weakref.ref(obj)
# add to lru
lru_add(obj)
# add to names
name = head_ident(ident)
if name not in cache_names:
cache_names[name] = set()
cache_names[name].add(ident)
def lru_add(obj):
obj_pos = 0
for i in range(1, len(lru_cache)):
lru_obj = lru_cache[i]
if obj is lru_obj:
obj_pos = i
break
elif lru_obj is None:
obj_pos = i
for i in range(obj_pos, len(lru_cache)-1):
lru_cache[i] = lru_cache[i+1]
lru_cache[len(lru_cache)-1] = obj
def uncache_object(obj, byGC=False):
ident = obj.ident()
assert_ident(ident)
logger.debug(f'uncache{"(GC)" if byGC else ""}: {ident}')
if ident in cache and cache[ident]() is not obj:
logger.error(f'(uncache) different object with same identifier {ident}!')
# todo: raise
if hasattr(obj, 'on_uncache'):
obj.on_uncache()
# remove from cache
del(cache[ident])
# remove from lru
lru_remove(obj)
# remove from names
name = head_ident(ident)
cache_names[name].discard(ident)
def lru_remove(obj):
for i in range(0, len(lru_cache)):
lru_obj = lru_cache[i]
if obj is lru_obj:
lru_cache[i] = None
break
def cacheable(klass):
original_init = klass.__init__
def new_init(self, *args, **kw):
original_init(self, *args, **kw)
cache_object(self)
klass.__init__ = new_init
original_del = None
if hasattr(klass, '__del__'):
original_del = klass.__del__
def new_del(self, *args, **kw):
ident = self.ident()
if ident in cache and cache[ident]() is self:
uncache_object(self, byGC=True)
if original_del is None:
return
original_del(self, *args, **kw)
klass.__del__ = new_del
klass.uncache = uncache_object
return klass
def query_object(ident, suppress_error=True):
assert_ident(ident)
logger.debug(f'query_object: {ident}')
try:
if ident in cache:
obj = cache[ident]()
if obj is not None:
lru_add(obj)
return obj
ids = split_ident(ident)
ns = root_namespace[ids[0]]
if len(ids) == 1:
return ns
obj = ns.query(ids[1:])
return obj
except:
logger.debug('query_object', exc_info=True)
if suppress_error:
return None
else:
raise
def _(ident):
obj = query_object(ident, suppress_error=False)
if obj is None:
raise ValueError('Object not found')
return obj
def register_root(obj):
name = obj.ident()
assert_name(name)
logger.info(f'register_root: {name}')
if name in root_namespace:
logger.error('duplicated root object!')
# todo: raise
root_namespace[name] = obj
if hasattr(obj, 'on_register_root'):
obj.on_register_root()
def unregister_root(obj):
name = obj.ident()
assert_name(name)
logger.info(f'unregister_root: {name}')
if name in root_namespace and root_namespace[name] is not obj:
logger.error('wrong root object!')
# todo: raise
if hasattr(obj, 'on_unregister_root'):
obj.on_unregister_root()
del(root_namespace[name])
# remove names
if name in cache_names:
for ident in list(cache_names[name]):
obj = cache[ident]()
if obj is not None:
uncache_object(obj)
del(cache_names[name])