-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
102 lines (80 loc) · 3.02 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModel
class MeanPooling(nn.Module):
def __init__(self):
super(MeanPooling, self).__init__()
def forward(self, last_hidden_state, attention_mask):
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
)
sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1)
sum_mask = input_mask_expanded.sum(1)
sum_mask = torch.clamp(sum_mask, min=1e-9)
mean_embeddings = sum_embeddings / sum_mask
return mean_embeddings
class Encoder(nn.Module):
"""인코더 모델
.. note::
projection_dim 적정값 찾기
"""
def __init__(
self,
model_name_or_path="microsoft/mdeberta-v3-base",
projection_dim=128,
hidden_dim=768,
use_grad_ckpt=False,
norm_repres=False,
):
super(Encoder, self).__init__()
self.norm_repres = norm_repres
config = AutoConfig.from_pretrained(model_name_or_path)
self.model = AutoModel.from_pretrained(model_name_or_path, config=config)
if use_grad_ckpt:
self.model.gradient_checkpointing_enable()
self.projection_dim = projection_dim
if projection_dim > 0:
self.projection = nn.Linear(hidden_dim, projection_dim)
self.mean_pooler = MeanPooling()
def forward(self, input_ids, attention_mask):
#: batch x seq_len x hidden_dim
x = self.model(input_ids, attention_mask)
if self.projection_dim > 0:
repres = self.projection(
self.mean_pooler(x.last_hidden_state, attention_mask)
)
else:
repres = self.mean_pooler(x.last_hidden_state, attention_mask)
if self.norm_repres:
repres = nn.functional.normalize(repres, p=2, dim=-1)
return repres
class BiEncoder(nn.Module):
"""바이 인코더 모델"""
def __init__(self, topic_encoder: Encoder, content_encoder: Encoder):
super().__init__()
self.topic_encoder = topic_encoder
self.content_encoder = content_encoder
def forward(
self,
topic_ids: torch.Tensor,
topic_attention_mask: torch.Tensor,
content_ids: torch.Tensor,
content_attention_mask: torch.Tensor,
return_repres: bool = False,
):
topic_repres = self.topic_encoder.forward(
input_ids=topic_ids,
attention_mask=topic_attention_mask,
)
content_repres = self.content_encoder.forward(
input_ids=content_ids,
attention_mask=content_attention_mask,
)
score = self.calculate_score(topic_repres, content_repres)
if return_repres:
return score, topic_repres, content_repres
return score
def calculate_score(
self, topic_repres: torch.Tensor, content_repres: torch.Tensor
) -> torch.Tensor:
return torch.matmul(topic_repres, content_repres.transpose(0, 1))