基于Pytorch深度学习——Transformer论文讲解/代码实现
Transformer的历史由来
Transformer模型是由Vaswani等人在2017年提出的,自其发布以来,已成为自然语言处理(NLP)领域中最具影响力的模型之一。Transformer模型最初是在论文《Attention Is All You Need》中介绍的,该论文提出了一种全新的基于注意力机制(Attention Mechanism)的深度学习架构,用于解决序列到序列(Seq2Seq)任务,比如机器翻译。
Attention is all you need
摘要
这一段主要为我们呈现的是,模型完全使用的是注意力机制,而没有用到任何传统的RNN和CNN架构,这一点也是本文的最大创新点
同时还为我们展现了本文提出的架构在具体Task的表现,表现效果好,训练速度较快
引言
这一段它主要讲的就是,传统的RNN,LSTM模型,无论怎么变换它都是一个连接一个的模型,也就是一种递归的结构,而本文提出的Transformer架构是不基于递归架构的
在这里也就是说,这个架构完全依赖于注意力机制来绘制输入和输出之间的全局依赖关系
模型
实际上Q,K,V就是对Input做的一个线性变换,换句话说就是矩阵相乘之后的结果
其中Wq,Wk,Wv都是可以用感知机来学习的
这个公式看起来很恐怖,直接来看图
这个图就是论文中多头注意力机制的原图,对V,K,Q的Linear层,文章说的是将它投影到较低的维度中,文章说他们发现这样更有益
接着进行刚才的注意力机制,h的意思是学习h次,我感觉有点多通道的意思
其实就看论文中的讲解,我还不是特别理解,我去网上查了一下相关的资料,查到了这张图片
我们可以有不同的Q,K,V,不同的Q可以负责不同种类的相关性
要获得不同的QKV可以通过感知机来学习
然后将两个生成的B接起来,和Wo进行相乘输出,对应前面concat的做法
代码
位置编码
class PositionalEncoding(nn.Module):def __init__(self, d_model, max_seq_length):"""初始化位置编码:param d_model: 词嵌入的维度:param max_seq_length: 序列的最大长度"""super(PositionalEncoding, self).__init__()# 初始化位置编码矩阵self.encoding = torch.zeros(max_seq_length, d_model)self.encoding.requires_grad = False# 生成位置矩阵, unsqueeze使得维度为(max_seq_length,1)position = torch.arange(0, max_seq_length).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))# 正余弦部分self.encoding[:, 0::2] = torch.sin(position * div_term)self.encoding[:, 1::2] = torch.cos(position * div_term)def forward(self, x):"""前向传播函数:param x: 输入张量 (batch_size,seq_len,d_model):return: 加上位置编码的张量"""batch_size, seq_len, d_model = x.size()return x + self.encoding[:seq_len, :].to(x.device)
在我们的位置编码中,我们仿照原论文的思路进行书写:
也就是说,在偶数的位置采用一个正弦的位置编码;在奇数的时候采用一个余弦的位置编码
多头注意力机制
class MultiHeadAttention(nn.Module):def __init__(self, d_model, nhead):"""初始化多头注意力机制:param d_model: 词嵌入的维度:param nhead: 注意力头的数量"""super(MultiHeadAttention, self).__init__()# 确保 d_model 可以被 nhead 整除assert d_model % nhead == 0, "d_model must be divisible by nhead"self.d_model = d_modelself.nhead = nheadself.d_k = d_model // nhead# 定义线性变换层self.query = nn.Linear(d_model, d_model)self.key = nn.Linear(d_model, d_model)self.value = nn.Linear(d_model, d_model)self.out = nn.Linear(d_model, d_model)self.dropout = nn.Dropout(0.1)def attention(self, query, key, value, mask=None):"""计算注意力分数:param query: 查询张量,(batch_size,nhead,seq_len,d_k):param key: 键张量,(batch_size,nhead,seq_len,d_k):param value: 值张量,(batch_size,nhead,seq_len,d_k):param mask: 掩码张量,(batch_size,1,seq_len):return: 和value乘好的和注意力权重张量"""scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(self.d_k)if mask is not None:scores = scores.masked_fill(mask == 0, 1e-9)# 计算注意力权重p_attn = F.softmax(scores, dim=-1)p_attn = self.dropout(p_attn)return torch.matmul(p_attn, value), p_attndef forward(self, query, key, value, mask=None):"""前向传播函数。参数:query: 查询张量,形状为 (batch_size, seq_len, d_model)。key: 键张量,形状为 (batch_size, seq_len, d_model)。value : 值张量,形状为 (batch_size, seq_len, d_model)。mask : 掩码张量,形状为 (batch_size, 1, seq_len)。返回:torch.Tensor: 经过多头注意力机制后的张量,形状为 (batch_size, seq_len, d_model)。"""batch_size = query.size(0)# 线性变换并分头query = self.query(query).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)key = self.key(key).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)value = self.value(value).view(batch_size, -1, self.nhead, self.d_k).transpose(1, 2)# 计算注意力分数x, attn = self.attention(query, key, value, mask=mask)# 组合多头结果x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)return self.out(x)
在多头注意力机制的实现中,我们需要注意的是对kqv三者的线性变换层,这一层在我们的代码书写中是非常容易漏掉的一层。
在论文中,这个对应了:
非线性变换层
class FeedForward(nn.Module):def __init__(self, d_model, dim_feedforward, dropout=0.1, *args, **kwargs):""":param d_model::param dim_feedforward::param dropout:"""super().__init__(*args, **kwargs)# 定义两层全连接网络self.linear1 = nn.Linear(d_model, dim_feedforward)self.dropout = nn.Dropout(dropout)self.linear2 = nn.Linear(dim_feedforward, d_model)def forward(self, x):"""前向传播函数:param x: 输入张量 (batch_size,seq_len,d_model):return: 经过前馈神经网络后的张量"""x = self.dropout(F.relu(self.linear1(x)))return self.linear2(x)
在这里我们的作用实际上主要是为了:
① 让数据从低维空间映射到高维空间,数据在高维空间可以更好的学习特征,同时学习的特征更加的丰富
② 让数据从高维空间重新映射为低维空间,可以保证输入和输出的维度一致性和对于信息的一个压缩能力
Encoder
class EncoderLayer(nn.Module):def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):"""初始化编码器:param d_model: 词嵌入的维度:param nhead: 注意力头的个数:param dim_feedforward: 前馈神经网络的隐藏层维度:param dropout: 概率"""super(EncoderLayer, self).__init__()self.self_attn = MultiHeadAttention(d_model, nhead)self.feed_forward = FeedForward(d_model, dim_feedforward, dropout)self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.dropout1 = nn.Dropout(dropout)self.dropout2 = nn.Dropout(dropout)def forward(self, src, src_mask=None):"""前向传播:param src: 源输入(batch_size,seq_len,d_model):param src_mask: 源序列掩码(batch_size,1,seq_len):return: 编码层后的张量"""# 多头注意力机制src2 = self.self_attn(src, src, src, src_mask)# 残差连接和归一化src = src + self.dropout1(src2)src = self.norm1(src)# 前馈网络src2 = self.feed_forward(src)# 残差连接和归一化src = src + self.dropout2(src2)src = self.norm2(src)return src
在这里我们相当于是将上面实现的模块进行了组合拼接
Decoder
class DecoderLayer(nn.Module):def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):"""初始化解码层:param d_model: 词嵌入维度:param nhead: 注意力头的个数:param dim_feedforward: 前馈神经网络的隐藏层维度:param dropout: 概率"""super(DecoderLayer, self).__init__()# 自注意力机制self.self_attn = MultiHeadAttention(d_model, nhead)# 连接编码器输出self.multihead_attn = MultiHeadAttention(d_model, nhead)# 前馈神经网络self.feed_forward = FeedForward(d_model, dim_feedforward, dropout)# 定义层归一化层self.norm1 = nn.LayerNorm(d_model)self.norm2 = nn.LayerNorm(d_model)self.norm3 = nn.LayerNorm(d_model)# 定义 dropout 层self.dropout1 = nn.Dropout(dropout)self.dropout2 = nn.Dropout(dropout)self.dropout3 = nn.Dropout(dropout)def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):"""前向传播函数:param tgt: 目标序列输入 (batch_size,seq_len,d_model):param memory: 编码器输出 (batch_size,src_seq_len,d_model):param tgt_mask: 目标序列掩码 (batch_size,seq_len,seq_len):param memory_mask: 编码器输出掩码 (batch_size,1,src_seq_len):return: 经过解码器层后的张量"""# 自注意力机制tgt2 = self.self_attn(tgt, tgt, tgt, tgt_mask)# 残差连接和层归一化tgt = tgt + self.dropout1(tgt2)tgt = self.norm1(tgt)# 交叉注意力机制tgt2 = self.multihead_attn(tgt, memory, memory, memory_mask)# 残差连接和层归一化tgt = tgt + self.dropout2(tgt2)tgt = self.norm2(tgt)# 前馈神经网络tgt2 = self.feed_forward(tgt)# 残差连接和层归一化tgt = tgt + self.dropout3(tgt2)tgt = self.norm3(tgt)return tgt
Transformer
class BaseTransformer(nn.Module):def __init__(self, d_model, nhead,num_encoder_layers, num_decoder_layers,dim_feedforward, max_seq_length,vocab_size, dropout=0.1):"""初始化Transformer模型。参数:d_model : 词嵌入的维度(即特征数)。nhead: 多头注意力机制中的头数。num_encoder_layers : 编码器的层数。num_decoder_layers : 解码器的层数。dim_feedforward : 前馈网络中的隐藏层维度。max_seq_length : 序列的最大长度。vocab_size : 词汇表的大小。dropout: dropout概率。"""super(BaseTransformer, self).__init__()self.d_model = d_model# 嵌入层self.encoder_embedding = nn.Embedding(vocab_size, d_model)self.decoder_embedding = nn.Embedding(vocab_size, d_model)# 位置编码self.position_encoding = PositionalEncoding(d_model, max_seq_length)# 编码器self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_encoder_layers)])# 解码器self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_decoder_layers)])# 输出层,将解码器的输出映射到词汇表大小self.fc_out = nn.Linear(d_model, vocab_size)def forward(self, src, tgt, src_mask=None, tgt_mask=None):"""前向传播函数。参数:src : 源序列输入,形状为 (batch_size, src_seq_length)。tgt : 目标序列输入,形状为 (batch_size, tgt_seq_length)。src_mask : 源序列的掩码,形状为 (batch_size, 1, src_seq_length)。tgt_mask : 目标序列的掩码,形状为 (batch_size, tgt_seq_length, tgt_seq_length)。返回:模型输出,形状为 (batch_size, tgt_seq_length, vocab_size)。"""# 获取源序列的嵌入表示并且进行位置编码src_embedding = self.encoder_embedding(src) * math.sqrt(self.d_model)src_embedding = self.position_encoding(src_embedding) # 获取目标序列的嵌入表示并进行位置编码tgt_embedding = self.decoder_embedding(tgt) * math.sqrt(self.d_model)tgt_embedding = self.position_encoding(tgt_embedding)# 编码器部分memory = src_embeddingfor layer in self.encoder_layers:memory = layer(memory, src_mask)# 解码器部分output = tgt_embeddingfor layer in self.decoder_layers:output = layer(output, memory, tgt_mask, src_mask)# 最后输出层output = self.fc_out(output)return outputdef make_src_mask(self, src):"""指示序列中哪些位置是填充:param src: 源输入序列 (batch_size,src_seq_length):return: 源序列的掩码 (batch_size,1,1,src_seq_length)"""scr_mask = (src != 0).unsqueeze(1).unsqueeze(2)return scr_maskdef make_future_mask(self, tgt):"""生成一个上三角矩阵,用于屏蔽解码器在未来的输入:param tgt: 目标序列:return: 上三角矩阵 (tgt_seq_length,tgt_seq_length)"""tgt_seq_length = tgt.size(1)future_mask = torch.triu(torch.ones((tgt_seq_length, tgt_seq_length)), diagonal=1).to(tgt.device)# 把矩阵里面是1的替换成负无穷,是0的替换成0return future_mask.masked_fill(future_mask == 1, float('-inf')).masked_fill(future_mask == 0, float(0.0))def train_step(self, optimizer, loss_fn, src, tgt, src_mask, tgt_mask):"""单步训练过程:param optimizer: 优化器:param loss_fn: 损失函数:param src: 源输入序列 (batch_size,src_seq_length):param tgt: 目标序列输入 (batch_size,tgt_seq_length):param src_mask: 源序列掩码 (batch_size,1,src_seq_length):param tgt_mask: 目标序列掩码 (batch_size,tgt_seq_length,tgt_seq_length):return: 当前训练步骤的损失"""self.train() # 训练模式# 前向传播output = self(src, tgt, src_mask, tgt_mask)output = output.view(-1, output.size(-1))tgt = tgt.view(-1)# 计算损失loss = loss_fn(output, tgt)optimizer.zero_grad()loss.backward()optimizer.step()return loss.item()def train_model(self, train_loader, optimizer, loss_fn, epochs):"""模型训练过程:param train_loader: 训练数据Dataloader:param optimizer: 优化器:param loss_fn: 损失函数:param epochs: 训练轮速:return: None"""device = GPU.get_device()self.to(device)for epoch in range(epochs):epoch_loss = 0for src, tgt in train_loader:src_mask = self.make_src_mask(src).to(device)tgt_mask = self.make_future_mask(tgt).to(device)src, tgt = src.to(device), tgt.to(device)loss = self.train_step(optimizer, loss_fn, src, tgt, src_mask, tgt_mask)epoch_loss += lossprint(f'Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss / len(train_loader)}')def predict(self, src, max_length):"""模型预测过程:param src: 源序列输入 (batch_size,src_seq_length):param max_length: 生成序列的最大长度:return: 生成的目标序列(batch_size,max_length)"""device = GPU.get_device()self.to(device)self.eval()src_mask = self.make_src_mask(src).to(device)src = src.to(device)# 初始化目标序列tgt = torch.zeros((src.size(0), 1), dtype=torch.long, device=src.device)generated = tgtfor _ in range(max_length):tgt_mask = self.make_future_mask(generated).to(device)output = self(src, generated, src_mask, tgt_mask)next_token = output[:, -1, :].argmax(dim=-1, keepdim=True) # (batch_size,vocab_size)generated = torch.cat((generated, next_token), dim=1)if next_token.item() == 0: # 假设0是结束标志breakreturn generated[:, 1:]
在这里,我们把所有的模块进行组合,形成一个带有预测、训练和mask的BaseTransformer