-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprobability.py
More file actions
280 lines (215 loc) · 10.8 KB
/
probability.py
File metadata and controls
280 lines (215 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import re
import copy
from itertools import permutations
# Define a probability distribution class
class Probability:
'''
condition은 get_new_probability로 우선 fraction 형태로 처리되고 가능한 경우 simplify에서 _var|_cond 로 변경됨
'''
def __init__(self, var=set(), cond=set(), do=set(), recursive=False, children=set(), sumset=set(), fraction=False,
divisor=None, scope: set = set()):
self._var = var # Random variables
self._cond = cond # Conditions
self._do = do # Do operator
self._recursive = recursive # Recursive flag
self._children = children # Set of children
self._sumset = sumset # Summation set
self._fraction = fraction # Fraction flag
self._divisor = divisor # Divisor
# 기본적으로 scope를 var과 cond로 설정
if not scope:
scope = self._var | self._cond
self._scope = scope
def copy(self):
new_P = copy.deepcopy(self)
return new_P
@property
def attributes(self):
'''Function that shows all attributes of the probability distribution.'''
out = {}
out["var"] = self._var
out["cond"] = self._cond
out["do"] = self._do
out["recursive"] = self._recursive
out["sumset"] = self._sumset
out["fraction"] = self._fraction
out["scope"] = self._scope
if self._recursive:
out["children"] = [child.attributes for child in self._children]
else:
out["children"] = self._children
if self._fraction:
out["divisor"] = self._divisor.attributes
else:
out["divisor"] = self._divisor
return out
def getFreeVariables(self) -> set:
'''Function that returns the free variables of the distribution.'''
# Probability가 정의된 random variables return : (vars - sumset) & scope
free = set()
if not self._recursive:
free = free.union(self._var)
else:
for prob in self._children:
free = free.union(prob.getFreeVariables())
# summation할 변수는 제외.
free = free - self._sumset
# 분모에 있는 변수들도 추가
if self._fraction:
free = free.union(self._divisor.getFreeVariables())
# P가 정의된 변수들만 해당(cond에 있는 변수가 상수인 경우 제외)
free = free & self._scope
return free
def simplify(self):
# for loop 돌면서 children 만드는 경우 children이 1개면 불필요하게 nested 됨
# 밖으로 꺼내주고 이미 get_new_probability에서 simplify해서 추가적인 정리 필요 없음
if self._recursive and len(self._children)==1:
child = next(iter(self._children))
self.__dict__ = child.__dict__ # child의 정보를 self에 복사
return
# get_new_probability에서 simplify 를 해서 안해도 될 것 같음
if self._divisor:
self._divisor.simplify()
# 한번이라도 simplify가 되었다면 다시 탐색
# self._sumset, self._recursive, self._fraction 의 유무에 따라 simplify 방법 달라짐
flag = True
while flag:
flag = False
# sumset도 없고, children도 없고, fraction도 없다면
if not self._sumset and not self._recursive and not self._fraction:
return
# sumset이 있는데, children이 없고, fraction도 없다면
# sum_c P(a, c) = P(a)
elif self._sumset and not self._recursive and not self._fraction:
sum_variables = self._sumset & self._var
self._sumset = self._sumset - sum_variables
self._var = self._var - sum_variables
flag = True
# children있으면서, fraction 있는 경우
elif not self._recursive and self._fraction:
# P(x, y) / P() = P(x, y)
if not self._divisor._var:
self._divisor = None
self._fraction = False
flag = True
# 만약 분모의 condition이 없고 divisor의 V가 분자 V의 부분집합이면 분모 제거
# P(x, y) / P(y) = P(x|y)
elif not self._divisor._cond and self._divisor._var<=self._var:
self._var = self._var - self._divisor._var
self._cond = self._cond | self._divisor._var
self._divisor = None
self._fraction = False
flag = True
# children이 있는 경우 child끼리 합칠 수 있는지 확인
elif self._recursive:
for prob1, prob2 in permutations(self._children, 2):
# 모두 children이 없다면
if not prob1._recursive and not prob2._recursive:
# P(Y|X,Z)P(X|Z) = P(Y,X|Z), P(Y|X)P(X) = P(Y,X)
if prob1._cond == prob2._var | prob2._cond:
removable = prob2
prob1._var = prob1._var | prob2._var
prob1._cond = prob1._cond - prob2._var
self._children -= {removable} # 합쳐져서 없어진 것 제거 (prob2)
# # 만약 children이 하나가 남으면 recursive False로 하고 올려줌
# if len(self._children) == 1:
# prob = next(iter(self._children))
# self.__dict__ = prob.__dict__
flag = True # 또 다른 simplify를 위해서 while문 돌아야 함
if flag: # 일단 하나 simplify 했으면 넘어감
break
# sum_c P(a, c|x, y, z) P(x | a, y) = P(a | x, y, z) P(x | a, y)
# 대신 모든 children이 recursive가 없어야 함. 있는 경우 그 안에 self.sumset에 해당하는 변수가 있을 수 있어 함부로 지울 수 없음
if self._sumset and self._recursive and not any(child._recursive for child in self._children):
vars, conds = set(), set()
for child in self._children:
vars |= child._var
conds |= child._cond
for child in self._children:
if removable := (child._var - conds) & self._sumset:
child._var -= removable
self._sumset -= removable
if not child._var:
self._children -= {child}
flag = True
break
def __lt__(self, other):
# 1. _cond가 있는 객체는 없는 객체보다 후순위
if self._sumset and not other._sumset:
return False
elif not self._sumset and other._sumset:
return True
if self._cond and not other._cond:
return False
elif not self._cond and other._cond:
return True
# 2. _var의 개수가 클수록 후순위
if len(self._var) < len(other._var):
return True
elif len(self._var) > len(other._var):
return False
else:
# 3. _var 개수가 같다면 _var의 원소 중 가장 작은 것을 포함한 객체를 더 선순위
# 4. _var을 정렬할 때는 먼저 알파벳 순서로 정렬하고 알파벳이 같으면 숫자 순으로 정렬. 숫자가 없는 것은 0으로 간주
sorted_self_var = sorted(self._var, key=lambda x: (x.strip('0123456789'), int(''.join(filter(str.isdigit, x)) or '0')))
sorted_other_var = sorted(other._var, key=lambda x: (x.strip('0123456789'), int(''.join(filter(str.isdigit, x)) or '0')))
return sorted_self_var < sorted_other_var
@staticmethod # 출력할 때 W1을 w_1로 출력하기 위한 함수
def underscore(input_set: frozenset):
# Regular expression to find digits
input_set = set(input_set)
pattern = r'(\d+)'
# Add an underscore before each number in each string of the set
modified_set = {re.sub(pattern, r'_\1', item) for item in input_set}
return modified_set
def printLatex(self, tab=0):
'''Function that returns a string in LaTeX syntax of the probability distribution.'''
out = ""
if self._fraction:
out += '\\left(\\frac{'
if self._sumset:
if tab == 0:
out += '\sum_{' + ', '.join(sorted(self.underscore(self._sumset))).lower() + '}'
else:
out += '\\left(\sum_{' + ', '.join(sorted(self.underscore(self._sumset))).lower() + '}'
if not self._recursive:
if self._var:
if self._do:
out += 'P_{'+ ', '.join(sorted(self.underscore(self._do))).lower() +'}(' + ', '.join(sorted(self.underscore(self._var))).lower()
else:
out += 'P(' + ', '.join(sorted(self.underscore(self._var))).lower()
if self._cond:
out += '|' + ', '.join(sorted(self.underscore(self._cond))).lower()
out += ')'
else: # useless?
out += '1'
else:
for prob in sorted(self._children):
out += prob.printLatex(tab=tab + 1)
if self._sumset and tab:
out += '\\right)'
if self._fraction:
out += '}{'
out += self._divisor.printLatex()
out += '}\\right)'
return out
def get_new_probability(P, var, cond={}):
'''
ID 알고리즘 line 6, 7에서 cond에는 있지만 S'에 속하지 않는 변수는 Freevariables에서 제거해야 함
'''
P_out = P.copy()
if len(cond) == 0:
P_out._sumset = P_out._sumset | (P.getFreeVariables() - var)
P_out.simplify()
else:
P_denom = P.copy()
P_out._sumset = P_out._sumset | (P.getFreeVariables() - (cond | var))
P_out.simplify() # 분자 simplify
P_out._fraction = True
P_denom._sumset = P_denom._sumset | (P.getFreeVariables()- cond)
P_denom.simplify() # 분모 simplify
P_out._divisor = P_denom
P_out.simplify()
return P_out
if __name__ == "__main__":
pass