forked from hnu202409060624/python
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
78 lines
3.4 KiB
78 lines
3.4 KiB
import torch
|
|
from torch import nn
|
|
from attention import MultiHeadAttention
|
|
import config
|
|
from Feed_Forward import PoswiseFeedForwardNet
|
|
from torch_geometric.nn import GCNConv
|
|
import math
|
|
|
|
def get_attn_pad_mask(seq_q, seq_k):
|
|
batch_size, len_q = seq_q.size()
|
|
batch_size, len_k = seq_k.size()
|
|
# eq(zero) is PAD token
|
|
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # batch_size x 1 x len_k(=len_q), one is masking
|
|
# 扩展成多维度
|
|
return pad_attn_mask.expand(batch_size, len_q, len_k) # batch_size x len_q x len_k
|
|
|
|
|
|
def get_sinusoid_encoding_table(max_len, d_model):
|
|
# 创建一个位置编码表,大小为 [max_len, d_model]
|
|
position_enc = torch.zeros(max_len, d_model)
|
|
# 为每个位置生成编码
|
|
for pos in range(max_len):
|
|
for i in range(0, d_model, 2):
|
|
position_enc[pos, i] = math.sin(pos / (10000 ** (2 * i / d_model)))
|
|
position_enc[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / d_model)))
|
|
return position_enc
|
|
|
|
|
|
class EncoderLayer(nn.Module):
|
|
def __init__(self):
|
|
super(EncoderLayer, self).__init__()
|
|
self.conv = GCNConv(config.embedding_dim, config.embedding_dim, normalize=True,bias=config.bias,aggr='mean')
|
|
self.conv1 = GCNConv(config.embedding_dim, config.embedding_dim, normalize=True, bias=config.bias, aggr='mean')
|
|
self.conv2 = GCNConv(config.embedding_dim, config.embedding_dim, normalize=True, bias=config.bias, aggr='mean')
|
|
self.enc_feed_forward1 = PoswiseFeedForwardNet()
|
|
self.enc_feed_forward2=PoswiseFeedForwardNet()
|
|
self.Model_list=nn.ModuleList([MultiHeadAttention() for _ in range(4)])
|
|
def forward(self, enc_inputs,enc2,enc_self_attn_mask,edge_index):
|
|
|
|
enc_outputs=self.conv(enc_inputs,edge_index)
|
|
enc_outputs=self.enc_feed_forward2(enc_outputs)
|
|
enc_outputs=self.conv1(enc_outputs,edge_index)
|
|
enc_outputs=self.enc_feed_forward2(enc_outputs)
|
|
enc_outputs=self.conv2(enc_outputs,edge_index)
|
|
enc_outputs=self.enc_feed_forward2(enc_outputs)
|
|
|
|
attn=0
|
|
for i in self.Model_list:
|
|
enc2,attn=i(enc2,enc2,enc2,enc_self_attn_mask)
|
|
enc2 = self.enc_feed_forward1(enc2)
|
|
return enc_outputs,enc2, attn
|
|
class Encoder(nn.Module):
|
|
def __init__(self):
|
|
super(Encoder, self).__init__()
|
|
self.embedding = nn.Embedding(config.vocab_size, config.embedding_dim)
|
|
self.embedding1 = nn.Embedding(config.sm_size, config.embedding_dim)
|
|
self.attention = MultiHeadAttention()
|
|
self.pos_ffn = PoswiseFeedForwardNet()
|
|
self.layers = nn.ModuleList([EncoderLayer() for _ in range(config.Encoder_n_layers)])
|
|
self.dropout = nn.Dropout(config.dropout)
|
|
|
|
def forward(self, enc_inputs,edge_index,enc):
|
|
enc=self.embedding1(enc)
|
|
enc_outputs=self.embedding(enc_inputs)
|
|
atoms_enc_self_attns1 = []
|
|
enc_self_attn_mask = get_attn_pad_mask(enc.squeeze(0),
|
|
enc.squeeze(0))
|
|
enc_outputs=enc_outputs.squeeze(0)
|
|
edge_index=edge_index.squeeze(0)
|
|
enc_outputs=enc_outputs.unsqueeze(0)
|
|
|
|
for layer in self.layers:
|
|
enc_outputs,enc, attn = layer(enc_outputs,enc,enc_self_attn_mask,edge_index)
|
|
atoms_enc_self_attns1.append(attn)
|
|
|
|
return enc_outputs,enc
|
|
|