-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
136 lines (107 loc) · 4.55 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import os
import numpy as np
import torch
import torch.nn.functional as F
cuda = True if torch.cuda.is_available() else False
def cal_sample_weight(labels, num_class, use_sample_weight=True):
if not use_sample_weight:
return np.ones(len(labels)) / len(labels)
count = np.zeros(num_class)
for i in range(num_class):
count[i] = np.sum(labels == i)
sample_weight = np.zeros(labels.shape)
for i in range(num_class):
sample_weight[np.where(labels == i)[0]] = count[i] / np.sum(count)
return sample_weight
def one_hot_tensor(y, num_dim):
y_onehot = torch.zeros(y.shape[0], num_dim)
y_onehot.scatter_(1, y.view(-1, 1), 1)
return y_onehot
def cosine_distance_torch(x1, x2=None, eps=1e-8):
x2 = x1 if x2 is None else x2
w1 = x1.norm(p=2, dim=1, keepdim=True)
w2 = w1 if x2 is x1 else x2.norm(p=2, dim=1, keepdim=True)
return 1 - torch.mm(x1, x2.t()) / (w1 * w2.t()).clamp(min=eps)
def to_sparse(x):
x_typename = torch.typename(x).split('.')[-1]
sparse_tensortype = getattr(torch.sparse, x_typename)
indices = torch.nonzero(x)
if len(indices.shape) == 0: # if all elements are zeros
return sparse_tensortype(*x.shape)
indices = indices.t()
values = x[tuple(indices[i] for i in range(indices.shape[0]))]
return sparse_tensortype(indices, values, x.size())
def cal_adj_mat_parameter(edge_per_node, data, metric="cosine"):
assert metric == "cosine", "Only cosine distance implemented"
dist = cosine_distance_torch(data, data)
parameter = torch.sort(dist.reshape(-1, )).values[edge_per_node * data.shape[0]]
return np.ndarray.item(parameter.data.cpu().numpy())
def graph_from_dist_tensor(dist, parameter, self_dist=True):
if self_dist:
assert dist.shape[0] == dist.shape[1], "Input is not pairwise dist matrix"
g = (dist <= parameter).float()
if self_dist:
diag_idx = np.diag_indices(g.shape[0])
g[diag_idx[0], diag_idx[1]] = 0
return g
def gen_adj_mat_tensor(data, parameter, metric="cosine"):
assert metric == "cosine", "Only cosine distance implemented"
dist = cosine_distance_torch(data, data)
g = graph_from_dist_tensor(dist, parameter, self_dist=True)
if metric == "cosine":
adj = 1 - dist
else:
raise NotImplementedError
adj = adj * g
adj_T = adj.transpose(0, 1)
I = torch.eye(adj.shape[0])
if cuda:
I = I.cuda()
adj = adj + adj_T * (adj_T > adj).float() - adj * (adj_T > adj).float()
adj = F.normalize(adj + I, p=1)
adj = to_sparse(adj)
return adj
def gen_test_adj_mat_tensor(data, trte_idx, parameter, metric="cosine"):
assert metric == "cosine", "Only cosine distance implemented"
adj = torch.zeros((data.shape[0], data.shape[0]))
if cuda:
adj = adj.cuda()
num_tr = len(trte_idx["tr"])
dist_tr2te = cosine_distance_torch(data[trte_idx["tr"]], data[trte_idx["te"]])
g_tr2te = graph_from_dist_tensor(dist_tr2te, parameter, self_dist=False)
if metric == "cosine":
adj[:num_tr, num_tr:] = 1 - dist_tr2te
else:
raise NotImplementedError
adj[:num_tr, num_tr:] = adj[:num_tr, num_tr:] * g_tr2te
dist_te2tr = cosine_distance_torch(data[trte_idx["te"]], data[trte_idx["tr"]])
g_te2tr = graph_from_dist_tensor(dist_te2tr, parameter, self_dist=False)
if metric == "cosine":
adj[num_tr:, :num_tr] = 1 - dist_te2tr
else:
raise NotImplementedError
adj[num_tr:, :num_tr] = adj[num_tr:, :num_tr] * g_te2tr # retain selected edges
adj_T = adj.transpose(0, 1)
I = torch.eye(adj.shape[0])
if cuda:
I = I.cuda()
adj = adj + adj_T * (adj_T > adj).float() - adj * (adj_T > adj).float()
adj = F.normalize(adj + I, p=1)
adj = to_sparse(adj)
return adj
def save_model_dict(folder, model_dict):
if not os.path.exists(folder):
os.makedirs(folder)
for module in model_dict:
torch.save(model_dict[module].state_dict(), os.path.join(folder, module + ".pth"))
def load_model_dict(folder, model_dict):
for module in model_dict:
if os.path.exists(os.path.join(folder, module + ".pth")):
# print("Module {:} loaded!".format(module))
model_dict[module].load_state_dict(torch.load(os.path.join(folder, module + ".pth"),
map_location="cuda:{:}".format(torch.cuda.current_device())))
else:
print("WARNING: Module {:} from model_dict is not loaded!".format(module))
if cuda:
model_dict[module].cuda()
return model_dict