导航

这篇文章是基于 Pytorch 的 Transformer 简易版复现教程。

本博客的 Transformer 系列文章共计四篇,导航如下:

前言

该教程代码初始来源于Jeff Jung,我阅读了一些博客和视频后做了大量的注释和修改,更加方便阅读和复现。

代码中为了加快可读性和运行速度,并没有用到大型的数据集,而是手动输入了两对中文→英语的句子,还有每个字的索引也是手动硬编码上去的,主要是为了降低代码执行速度和阅读难度,哪怕用普通的笔记本CPU也能在1分钟以内完成,从而方便读者把重点放到模型实现的部分!

# ======================================
"""
code by Tae Hwan Jung(Jeff Jung) @graykode, Derek Miller @dmmiller612, modify by shwei
Reference: https://github.com/jadore801120/attention-is-all-you-need-pytorch
https://github.com/JayParks/transformer
"""
# ====================================================================================================

数据预处理

import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data

device = 'cpu'
# device = 'cuda'

# transformer epochs
epochs = 100
# epochs = 1000

# 这里我没有用什么大型的数据集,而是手动输入了两对中文→英语的句子
# 还有每个字的索引也是我手动硬编码上去的,主要是为了降低代码阅读难度
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is shorter than time steps

# 训练集
sentences = [
# 中文和英语的单词个数不要求相同
# enc_input dec_input dec_output
['我 有 一 个 女 朋 友', 'S i have a girl friend . ', 'i have a girl friend . E'],
['我 有 零 个 好 朋 友', 'S i have zero good friend .', 'i have zero good friend . E']
]

# 中文和英语的单词要分开建立词库
# Padding Should be Zero
src_vocab = {
'P': 0, '我': 1, '有': 2, '一': 3, '个': 4, '好': 5, '朋': 6, '友': 7, '零': 8, '女': 9}
src_idx2word = {
i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)

tgt_vocab = {
'P': 0, 'i': 1, 'have': 2, 'a': 3, 'good': 4, 'friend': 5, 'zero': 6, 'girl': 7, 'S': 8, 'E': 9, '.': 10}
idx2word = {
i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)

src_len = 8 # (源句子的长度)enc_input max sequence length
tgt_len = 7 # dec_input(=dec_output) max sequence length

# Transformer Parameters
d_model = 512 # Embedding Size(token embedding和position编码的维度)
d_ff = 2048 # FeedForward dimension (两次线性层中的隐藏层 512->2048->512,线性层是用来做特征提取的),当然最后会再接一个projection层
d_k = d_v = 64 # dimension of K(=Q), V(Q和K的维度需要相同,这里为了方便让K=V)
n_layers = 6 # number of Encoder of Decoder Layer(Block的个数)
n_heads = 8 # number of heads in Multi-Head Attention(有几套头)


# ==============================================================================================
# 数据构建


def make_data(sentences):
"""把单词序列转换为数字序列"""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]] # [[1, 2, 3, 4, 0], [1, 2, 3, 5, 0]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]] # [[6, 1, 2, 3, 4, 8], [6, 1, 2, 3, 5, 8]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]] # [[1, 2, 3, 4, 8, 7], [1, 2, 3, 5, 8, 7]]

enc_inputs.extend(enc_input)
dec_inputs.extend(dec_input)
dec_outputs.extend(dec_output)

return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)


enc_inputs, dec_inputs, dec_outputs = make_data(sentences)


class MyDataSet(Data.Dataset):
"""自定义DataLoader"""

def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs

def __len__(self):
return self.enc_inputs.shape[0]

def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]


loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
# 这行代码创建了一个PyTorch数据加载器,用于在训练机器学习模型时加载数据。DataLoader是PyTorch的核心库torch.utils.data中的函数。它的作用是将
# 数据集(此处为MyDataSet类的实例,传入的enc_inputs、dec_inputs和dec_outputs作为数据)拆分成小批次以提高加载效率。
# 具体参数说明:
# batch_size: 2,每批加载的样本数。
# shuffle: True,是否打乱数据顺序。

上面都比较简单,下面开始涉及到模型就比较复杂了,因此我会将模型拆分成以下几个部分进行讲解

  • Positional Encoding
  • Pad Mask(序列本身固定长度,不够长的序列需要填充(pad),也就是'P')
  • Subsequence Mask(Decoder input 不能看到未来时刻单词信息,因此需要 mask)
  • ScaledDotProductAttention
  • Multi-Head Attention
  • FeedForward Layer
  • Encoder Layer
  • Encoder
  • Decoder Layer
  • Decoder
  • Transformer

关于代码中的注释,如果值为 src_len 或者 tgt_len 的,我一定会写清楚,但是有些函数或者类,Encoder 和 Decoder 都有可能调用,因此就不能确定究竟是 src_len 还是 tgt_len,对于不确定的,我会记作 seq_len

模型构建

Positional Encoding

class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
"""
在dropout函数中,参数p是指丢弃元素的概率。它决定了输入张量的元素在丢弃操作中被设置为零的比率。
例如,如果p=0.1,那么10%的元素将在丢弃操作中被设置为零。
"""
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
"""
通过调用self.register_buffer('pe', pe),我们将这个张量注册为模型的一个可学习参数,
这意味着这个张量在模型训练过程中不需要更新其梯度,即不需要在损失函数的计算中考虑这个张量的梯度。
"""
def forward(self, x):
"""
x: [seq_len, batch_size, d_model]
"""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)

Pad Mask

由于在 Encoder 和 Decoder 中都需要进行 mask 操作,因此就无法确定这个函数的参数中 seq_len 的值,如果是在 Encoder 中调用的,seq_len 就等于 src_len;如果是在 Decoder 中调用的,seq_len 就有可能等于 src_len,也有可能等于 tgt_len(因为 Decoder 有两次 mask)

pad mask的作用:在对value向量加权平均的时候,可以让pad对应的alpha_ij=0,这样注意力就不会考虑到pad向量。

这个函数最核心的一句代码是 seq_k.data.eq(0),这句的作用是返回一个大小和 seq_k 一样的 tensor,只不过里面的值只有 True 和 False。如果 seq_k 某个位置的值等于 0,那么对应位置就是 True,否则即为 False。举个例子,输入为 seq_data = [1, 2, 3, 4, 0]seq_data.data.eq(0) 就会返回 [False, False, False, False, True]

def get_attn_pad_mask(seq_q, seq_k):
# pad mask的作用:在对value向量加权平均的时候,可以让pad对应的alpha_ij=0,这样注意力就不会考虑到pad向量
"""这里的q,k表示的是两个序列(跟注意力机制的q,k没有关系),例如encoder_inputs (x1,x2,..xm)和encoder_inputs (x1,x2..xm)
encoder和decoder都可能调用这个函数,所以seq_len视情况而定
seq_q: [batch_size, seq_len]
seq_k: [batch_size, seq_len]
seq_len could be src_len or it could be tgt_len
seq_len in seq_q and seq_len in seq_k maybe not equal
"""
batch_size, len_q = seq_q.size() # 这个seq_q只是用来expand维度的
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# 例如:seq_k = [[1,2,3,4,0], [1,2,3,5,0]]
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # [batch_size, 1, len_k], True is masked
return pad_attn_mask.expand(batch_size, len_q, len_k) # [batch_size, len_q, len_k] 构成一个立方体(batch_size个这样的矩阵)

Subsequence Mask

Subsequence Mask 只有 Decoder 会用到,主要作用是屏蔽未来时刻单词的信息。

这段代码实现了获得一个注意力子序列掩码。它使用Numpy函数np.triu生成一个上三角矩阵,并用np.ones初始化这个矩阵。attn_shape变量储存了这个矩阵的形状,它是一个三维数组,分别是batch_size、tgt_len、tgt_len。然后,np.triu将这个矩阵初始化为上三角形,并通过参数k=1使对角线上的元素为0。最后,将这个矩阵转换为PyTorch tensor,并返回该张量。

def get_attn_subsequence_mask(seq):
"""建议打印出来看看是什么样的输出(一目了然)
seq: [batch_size, tgt_len]
"""
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
# attn_shape: [batch_size, tgt_len, tgt_len]
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # 生成一个上三角矩阵
subsequence_mask = torch.from_numpy(subsequence_mask).byte()
return subsequence_mask # [batch_size, tgt_len, tgt_len]

ScaledDotProductAttention

"Scaled Dot-Product Attention" 是一种用于自注意力机制的注意力机制方法,通过将输入矩阵Q、K和V与自身转置进行点积运算,得到关于每一个词的注意力分数。

使用Q、K、V三个变量作为输入,经过一系列矩阵运算后得到context和attn,其中context是计算出的注意力张量,attn是对应的注意力稀疏矩阵,并返回这两个结果。

具体地,在forward函数中,使用Q、K做矩阵乘法得到scores矩阵,scores中的每一个元素都是对应Q中词与K中词的相似程度。

下一步,通过使用mask矩阵对scores中的元素进行赋值,将与mask矩阵中值为1的元素相对应的scores元素赋值为-1e9,使其不被softmax计算。

最后,使用softmax对scores最后一维(也就是v)做软归一化,得到注意力稀疏矩阵attn。最后,使用attn矩阵对V做矩阵乘法,得到context矩阵,其中每一行对应一个词的向量表示。

(matmul函数是矩阵乘法,它返回两个矩阵的点积,即将两个矩阵对应元素相乘并相加。它对应的矩阵乘法操作是:C = A * B,其中C是乘积矩阵,A是左矩阵,B是右矩阵。)

class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, seq_len, seq_len]
说明:在encoder-decoder的Attention层中len_q(q1,..qt)和len_k(k1,...km)可能不同
"""
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
# mask矩阵填充scores(用-1e9填充scores中与attn_mask中值为1位置相对应的元素)
scores.masked_fill_(attn_mask, -1e9) # Fills elements of self tensor with value where mask is True.

attn = nn.Softmax(dim=-1)(scores) # 对最后一个维度(v)做softmax
# scores : [batch_size, n_heads, len_q, len_k] * V: [batch_size, n_heads, len_v(=len_k), d_v]
context = torch.matmul(attn, V) # context: [batch_size, n_heads, len_q, d_v]
# context:[[z1,z2,...],[...]]向量, attn注意力稀疏矩阵(用于可视化的)
return context, attn

MultiHeadAttention

完整代码中一定会有三处地方调用 MultiHeadAttention(),Encoder Layer 调用一次,传入的 input_Qinput_Kinput_V 全部都是 enc_inputs;Decoder Layer 中两次调用,第一次传入的全是 dec_inputs,第二次传入的分别是 dec_outputsenc_outputsenc_outputs

class MultiHeadAttention(nn.Module):
"""这个Attention类可以实现:
Encoder的Self-Attention
Decoder的Masked Self-Attention
Encoder-Decoder的Attention
输入:seq_len x d_model
输出:seq_len x d_model
"""
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False) # q,k必须维度相同,不然无法做点积
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
"""
nn.Linear 函数是 PyTorch 模型中的一种全连接层 (fully connected layer) 的实现。
它的作用是对输入数据进行线性变换,即 y = Wx + b,其中 W 是线性变换的系数矩阵,b 是偏移量,x 是输入数据。
torch.nn.Linear(in_features, # 输入的神经元个数
out_features, # 输出神经元个数
bias=True # 是否包含偏置
)
"""
def forward(self, input_Q, input_K, input_V, attn_mask):
"""
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, seq_len, seq_len]
"""
residual, batch_size = input_Q, input_Q.size(0)
# 下面的多头的参数矩阵是放在一起做线性变换的,然后再拆成多个头,这是工程实现的技巧
# B: batch_size, S:seq_len, D: dim
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, Head, W) -trans-> (B, Head, S, W)
# 线性变换 拆成多头

# Q: [batch_size, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
# K: [batch_size, n_heads, len_k, d_k] # K和V的长度一定相同,维度可以不同
K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1, 2)
# V: [batch_size, n_heads, len_v(=len_k), d_v]
V = self.W_V(input_V).view(batch_size, -1, n_heads, d_v).transpose(1, 2)

# 因为是多头,所以mask矩阵要扩充成4维的
# attn_mask: [batch_size, seq_len, seq_len] -> [batch_size, n_heads, seq_len, seq_len]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)

# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
# 下面将不同头的输出向量拼接在一起
# context: [batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(batch_size, -1, n_heads * d_v)

# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
output = self.fc(context) # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual), attn

FeedForward Layer

这段代码非常简单,就是做两次线性变换,残差连接后再跟一个 Layer Norm。用于实现Transformer模型中的前馈网络。

该网络由两个全连接层(nn.Linear)和一个 ReLU 激活函数(nn.ReLU)组成。第一个全连接层将输入从 d_model 维度转换到 d_ff 维度,第二个全连接层将输入从 d_ff 维度转换回 d_model 维度。

class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
def forward(self, inputs):
"""
inputs: [batch_size, seq_len, d_model]
"""
residual = inputs
output = self.fc(inputs)
return nn.LayerNorm(d_model).to(device)(output + residual) # [batch_size, seq_len, d_model]

Encoder Layer & Encoder

class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()

def forward(self, enc_inputs, enc_self_attn_mask):
"""
enc_inputs: [batch_size, src_len, d_model]
enc_self_attn_mask: [batch_size, src_len, src_len] mask矩阵(pad mask or sequence mask)
"""
# enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len]
# 第一个enc_inputs * W_Q = Q
# 第二个enc_inputs * W_K = K
# 第三个enc_inputs * W_V = V
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,
enc_self_attn_mask) # enc_inputs to same Q,K,V(未线性变换前)
enc_outputs = self.pos_ffn(enc_outputs)
# enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn

nn.ModuleList() 列表里面存了 n_layers 个 Encoder Layer。由于我们控制好了 Encoder Layer 的输入和输出维度相同,所以可以直接用个 for 循环以嵌套的方式,将上一次 Encoder Layer 的输出作为下一次 Encoder Layer 的输入。

n_layers个(本文为6个)EncoderLayer组件逐个拼起来,就是一个完整的Encoder。

class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model) # token Embedding
"""
Embedding解释
例如:如果你有一个词语表(vocabulary),其中包含了3个词语:"dog", "cat", "bird"。并且你指定了src_vocab_size = 3
和d_model = 5,那么这个Embedding层就可以将每一个词语表示成一个5维的实数向量,比如:"dog"
可以表示为[0.1, 0.2, 0.3, 0.4, 0.5]。
"""
self.pos_emb = PositionalEncoding(d_model) # Transformer中位置编码时固定的,不需要学习
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])

def forward(self, enc_inputs):
"""
enc_inputs: [batch_size, src_len]
"""
enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(0, 1) # [batch_size, src_len, d_model]
# Encoder输入序列的pad mask矩阵
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # [batch_size, src_len, src_len]
enc_self_attns = [] # 在计算中不需要用到,它主要用来保存你接下来返回的attention的值(这个主要是为了你画热力图等,用来看各个词之间的关系
for layer in self.layers: # for循环访问nn.ModuleList对象
# 上一个block的输出enc_outputs作为当前block的输入
# enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attn = layer(enc_outputs,
enc_self_attn_mask) # 传入的enc_outputs其实是input,传入mask矩阵是因为你要做self attention
enc_self_attns.append(enc_self_attn) # 这个只是为了可视化
return enc_outputs, enc_self_attns

Decoder Layer & Decoder

在 Decoder Layer 中会调用两次 MultiHeadAttention,第一次是计算 Decoder Input 的 self-attention,得到输出 dec_outputs。然后将 dec_outputs 作为生成 Q 的元素,enc_outputs 作为生成 K 和 V 的元素,再调用一次 MultiHeadAttention,得到的是 Encoder 和 Decoder Layer 之间的 context vector。最后将 dec_outptus 做一次维度变换,然后返回。

n_layers个(本文为6个)DecoderLayer组件逐个拼起来,就是一个完整的Decoder。

class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()

def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
"""
dec_inputs: [batch_size, tgt_len, d_model]
enc_outputs: [batch_size, src_len, d_model]
dec_self_attn_mask: [batch_size, tgt_len, tgt_len]
dec_enc_attn_mask: [batch_size, tgt_len, src_len]
"""
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs,
dec_self_attn_mask) # 这里的Q,K,V全是Decoder自己的输入
# dec_outputs: [batch_size, tgt_len, d_model], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs,
dec_enc_attn_mask) # Attention层的Q(来自decoder) 和 K,V(来自encoder)
dec_outputs = self.pos_ffn(dec_outputs) # [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attn, dec_enc_attn # dec_self_attn, dec_enc_attn这两个是为了可视化的

Decoder 中不仅要把 "pad"mask 掉,还要 mask 未来时刻的信息,因此就有了下面这三行代码,其中 torch.gt(a, value) 的意思是,将 a 中各个位置上的元素和 value 比较,若大于 value,则该位置取 1,否则取 0

class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model) # Decoder输入的embed词表
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)]) # Decoder的blocks

def forward(self, dec_inputs, enc_inputs, enc_outputs):
"""
dec_inputs: [batch_size, tgt_len]
enc_inputs: [batch_size, src_len]
enc_outputs: [batch_size, src_len, d_model] # 用在Encoder-Decoder Attention层
"""
dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs.transpose(0, 1)).transpose(0, 1).to(
device) # [batch_size, tgt_len, d_model]
# Decoder输入序列的pad mask矩阵(这个例子中decoder是没有加pad的,实际应用中都是有pad填充的)
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).to(device) # [batch_size, tgt_len, tgt_len]
# Masked Self_Attention:当前时刻是看不到未来的信息的
dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).to(
device) # [batch_size, tgt_len, tgt_len]

# Decoder中把两种mask矩阵相加(既屏蔽了pad的信息,也屏蔽了未来时刻的信息)
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask),
0).to(device) # [batch_size, tgt_len, tgt_len]; torch.gt比较两个矩阵的元素,大于则返回1,否则返回0

# 这个mask主要用于encoder-decoder attention层
# get_attn_pad_mask主要是enc_inputs的pad mask矩阵(因为enc是处理K,V的,求Attention时是用v1,v2,..vm去加权的,要把pad对应的v_i的相关系数设为0,这样注意力就不会关注pad向量)
# dec_inputs只是提供expand的size的
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # [batc_size, tgt_len, src_len]

dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
# Decoder的Block是上一个Block的输出dec_outputs(变化)和Encoder网络的输出enc_outputs(固定)
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask,
dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
# dec_outputs: [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attns, dec_enc_attns

Transformer

这段代码实现了Transformer类,用到的三种架构--前面定义过的Encoder和Decoder,以及下面新定义的投影层Projection(projection实现的是decoder后面的linear,之所以没有实现softmax是因为后续的贪婪解码器替代了softmax层的工作,直接得到概率最大值的词表索引并输出)。

在输入经过Encoder网络和Decoder网络处理后,得到的输出分别是enc_outputs和dec_outputs。最后再经过一个投影层,将dec_outputs映射成dec_logits,表示每个单词的词概率分布。返回 dec_logits, enc_self_attns, dec_self_attns, dec_enc_attns。

class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
# Transformer类继承自PyTorch的nn.Module类。
# 通过在Transformer的构造函数中调用super(Transformer, self).init(),
# 可以调用nn.Module的构造函数,以便初始化nn.Module的一些内部状态,以及设置Transformer类对象的一些公共属性。
self.encoder = Encoder().to(device)
self.decoder = Decoder().to(device)
self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False).to(device)
def forward(self, enc_inputs, dec_inputs):
"""Transformers的输入:两个序列
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
"""
# tensor to store decoder outputs
# outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)

# enc_outputs: [batch_size, src_len, d_model], enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
# 经过Encoder网络后,得到的输出还是[batch_size, src_len, d_model]
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attns: [n_layers, batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [n_layers, batch_size, tgt_len, src_len]
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)
# dec_outputs: [batch_size, tgt_len, d_model] -> dec_logits: [batch_size, tgt_len, tgt_vocab_size]
dec_logits = self.projection(dec_outputs)
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns

view函数说明

view 方法是 PyTorch 中 tensor 的一种 reshape 操作。它将一个 tensor 的 shape 变成给定的形状。

在这段代码中, dec_logits.view(-1, dec_logits.size(-1)) 表示将 dec_logits tensor 从原来的 shape 变成了一个新的 shape,其中第一维是 -1,这意味着该维的长度是自动计算的(其他维度的长度已经确定了),第二维是 dec_logits.size(-1),这是一个数字,代表 dec_logits tensor 的最后一维的长度。

forward函数说明:

通过进行一次前向传播的操作(比如 output = model(input))时,PyTorch 内部会对模型中每一个模块(包括 Encoder 和 EncoderLayer)中的 forward 函数进行调用,以计算输出结果。

如果不手动定义 forward 函数,那么模型将不会被调用。因此,forward 函数是必须被定义的,用于计算模型的前向传播过程。

模型调用 & 损失函数 & 优化器

这段代码调用了Transformer模型,并设置了交叉熵损失函数(将ignore_index参数设置为0),优化器使用随机梯度下降(SGD)算法。(优化器将使用model.parameters()作为参数进行优化,学习率为1e-3,动量为0.99)。

ignore_index参数被设置为0,这样损失计算将忽略任何索引为0的输入,这通常是为NLP模型中的padding token保留的。

使用Adam算法对于较小的数据量效果很差

model = Transformer().to(device)
# 这里的损失函数里面设置了一个参数 ignore_index=0,因为 "pad" 这个单词的索引为 0,这样设置以后,就不会计算 "pad" 的损失(因为本来 "pad" 也没有意义,不需要计算)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)
# optimizer = optim.Adam(model.parameters(), lr=1e-9) # 用adam的话效果不好

训练

最后三行代码是在进行一次反向传播迭代的操作。

分三步执行: optimizer.zero_grad():对梯度进行初始化,因为pytorch的梯度是累加的,所以每次计算前需要把梯度归零。 loss.backward():计算当前损失函数的梯度,并且完成反向传播。 optimizer.step():执行优化器的更新操作,根据梯度对模型参数进行更新。

总的来说,这三步代码是完成一次机器学习模型的参数优化的核心过程。

for epoch in range(epochs):
for enc_inputs, dec_inputs, dec_outputs in loader:
"""
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
dec_outputs: [batch_size, tgt_len]
"""
enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(device), dec_inputs.to(device), dec_outputs.to(device)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
loss = criterion(outputs, dec_outputs.view(-1)) # dec_outputs.view(-1):[batch_size * tgt_len * tgt_vocab_size]
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))

optimizer.zero_grad()
loss.backward()
optimizer.step()

训练输出如下:

# OUTPUT
Epoch: 0001 loss = 2.752963
Epoch: 0002 loss = 2.625792
Epoch: 0003 loss = 2.508290
Epoch: 0004 loss = 2.260287
Epoch: 0005 loss = 2.026300
Epoch: 0006 loss = 1.715278
Epoch: 0007 loss = 1.559423
Epoch: 0008 loss = 1.339594
Epoch: 0009 loss = 1.082600
Epoch: 0010 loss = 0.945365
Epoch: 0011 loss = 0.758650
Epoch: 0012 loss = 0.581669
Epoch: 0013 loss = 0.461126
Epoch: 0014 loss = 0.340352
Epoch: 0015 loss = 0.252243
Epoch: 0016 loss = 0.190971
Epoch: 0017 loss = 0.161172
Epoch: 0018 loss = 0.130554
Epoch: 0019 loss = 0.101027
Epoch: 0020 loss = 0.093038
Epoch: 0021 loss = 0.079294
Epoch: 0022 loss = 0.070799
Epoch: 0023 loss = 0.062795
Epoch: 0024 loss = 0.044259
Epoch: 0025 loss = 0.056274
Epoch: 0026 loss = 0.033928
Epoch: 0027 loss = 0.037328
Epoch: 0028 loss = 0.035663
Epoch: 0029 loss = 0.032550
Epoch: 0030 loss = 0.029425
Epoch: 0031 loss = 0.028057
Epoch: 0032 loss = 0.024588
Epoch: 0033 loss = 0.019545
Epoch: 0034 loss = 0.025953
Epoch: 0035 loss = 0.018335
Epoch: 0036 loss = 0.028104
Epoch: 0037 loss = 0.015952
Epoch: 0038 loss = 0.014356
Epoch: 0039 loss = 0.015536
Epoch: 0040 loss = 0.013210
Epoch: 0041 loss = 0.015791
Epoch: 0042 loss = 0.013085
Epoch: 0043 loss = 0.011149
Epoch: 0044 loss = 0.009110
Epoch: 0045 loss = 0.007416
Epoch: 0046 loss = 0.005960
Epoch: 0047 loss = 0.006156
Epoch: 0048 loss = 0.004907
Epoch: 0049 loss = 0.004867
Epoch: 0050 loss = 0.005042
Epoch: 0051 loss = 0.005796
Epoch: 0052 loss = 0.005398
Epoch: 0053 loss = 0.004669
Epoch: 0054 loss = 0.004401
Epoch: 0055 loss = 0.003372
Epoch: 0056 loss = 0.002630
Epoch: 0057 loss = 0.002565
Epoch: 0058 loss = 0.002309
Epoch: 0059 loss = 0.003040
Epoch: 0060 loss = 0.002470
Epoch: 0061 loss = 0.002096
Epoch: 0062 loss = 0.002189
Epoch: 0063 loss = 0.002061
Epoch: 0064 loss = 0.001174
Epoch: 0065 loss = 0.001599
Epoch: 0066 loss = 0.001527
Epoch: 0067 loss = 0.001685
Epoch: 0068 loss = 0.001565
Epoch: 0069 loss = 0.001718
Epoch: 0070 loss = 0.001291
Epoch: 0071 loss = 0.001259
Epoch: 0072 loss = 0.001222
Epoch: 0073 loss = 0.001179
Epoch: 0074 loss = 0.000965
Epoch: 0075 loss = 0.001888
Epoch: 0076 loss = 0.001052
Epoch: 0077 loss = 0.000888
Epoch: 0078 loss = 0.001349
Epoch: 0079 loss = 0.000916
Epoch: 0080 loss = 0.001315
Epoch: 0081 loss = 0.001191
Epoch: 0082 loss = 0.001341
Epoch: 0083 loss = 0.001674
Epoch: 0084 loss = 0.001122
Epoch: 0085 loss = 0.001133
Epoch: 0086 loss = 0.000839
Epoch: 0087 loss = 0.001059
Epoch: 0088 loss = 0.001204
Epoch: 0089 loss = 0.001092
Epoch: 0090 loss = 0.000943
Epoch: 0091 loss = 0.000699
Epoch: 0092 loss = 0.001015
Epoch: 0093 loss = 0.000730
Epoch: 0094 loss = 0.000795
Epoch: 0095 loss = 0.000926
Epoch: 0096 loss = 0.000948
Epoch: 0097 loss = 0.000945
Epoch: 0098 loss = 0.000730
Epoch: 0099 loss = 0.000747
Epoch: 0100 loss = 0.000749

测试

这段代码是一个贪心解码器(greedy decoder)的实现,其作用是在给定编码输入(enc_input)和起始符号(start_symbol)的情况下,根据给定的模型(model)预测出目标序列(greedy_dec_predict)。

首先,编码器(encoder)对编码输入(enc_input)进行处理,生成编码输出(enc_outputs)和注意力权值(enc_self_attns)。

然后初始化解码器(decoder)的输入(dec_input)为一个空的tensor。

接着,在没有到达终止符的情况下,不断执行以下步骤:

  1. 将解码器的输入(dec_input)拼接上当前的符号(next_symbol)。
  2. 解码器(decoder)对拼接后的输入(dec_input)、编码输入(enc_input)和编码输出(enc_outputs)进行处理,生成解码输出(dec_outputs)。
  3. 投影层(projection)将解码输出(dec_outputs)映射到词表上,生成预测概率分布(projected)。
  4. 根据预测概率分布(projected),选择概率最大的下一个词,并将其作为下一个符号(next_symbol)。
  5. 如果下一个符号是终止符,终止循环。

最后,返回除开初始符号以外的预测的目标序列(greedy_dec_predict)。

def greedy_decoder(model, enc_input, start_symbol):
"""贪心编码
For simplicity, a Greedy Decoder is Beam search when K=1. This is necessary for inference as we don't know the
target sequence input. Therefore we try to generate the target input word by word, then feed it into the transformer.
Starting Reference: http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding
:param model: Transformer Model
:param enc_input: The encoder input
:param start_symbol: The start symbol. In this example it is 'S' which corresponds to index 8
:return: The target input
"""
enc_outputs, enc_self_attns = model.encoder(enc_input)
dec_input = torch.zeros(1, 0).type_as(enc_input.data) # 初始化一个空的tensor: tensor([], size=(1, 0), dtype=torch.int64)
terminal = False
next_symbol = start_symbol
while not terminal:
# 预测阶段:dec_input序列会一点点变长(每次添加一个新预测出来的单词)
dec_input = torch.cat([dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
-1)
dec_outputs, _, _ = model.decoder(dec_input, enc_input, enc_outputs)
projected = model.projection(dec_outputs)
prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1]
# 这行代码替代了softmax层的工作,直接得到概率最大值的词表索引
# 1. 从 tensor 中删除所有维度大小为1的维:projected.squeeze(0);
# 2. 通过 dim=-1 参数,在最后一维(即维度的索引为 -1)上,找到最大的值的索引:max(dim=-1);
# 3. 通过 keepdim=False 参数,将最后一维的维度删除,同时返回结果的最大值的索引:[1]。
# 因此,该行代码得到prob 变量的结果,存储了 projected 处理后得到的最大值的索引。

# 增量更新(我们希望重复单词预测结果是一样的)
# 我们在预测时会选择性忽略重复的预测的词,只摘取最新预测的单词拼接到输入序列中
next_word = prob.data[-1] # 拿出当前预测的单词(数字)。我们用x_t对应的输出z_t去预测下一个单词的概率,不用z_1,z_2..z_{t-1}
next_symbol = next_word
if next_symbol == tgt_vocab["E"]:
terminal = True
# print(next_word)

# greedy_dec_predict = torch.cat(
# [dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
# -1)
greedy_dec_predict = dec_input[:, 1:]
return greedy_dec_predict

这段代码实现了一个简单的预测(因为数据量较小,因此测试集选用训练集中的一句)

  • 测试集(希望transformer能达到的效果)
  • 输入:"我 有 一 个 女 朋 友"
  • 输出:"i have a girl friend"

过程如下:

  1. 定义了一个句子列表sentences,其中包含一个中文句子和对应的空的英文句子。
  2. 使用make_data函数处理句子列表,获得编码句子、解码句子的输入和输出。
  3. 创建一个数据加载器test_loader,用于加载处理后的句子数据。
  4. 使用next函数从数据加载器中读取一个批次的数据,并将其分别赋给编码句子的输入。
  5. 对于每个编码句子,调用greedy_decoder函数,通过训练好的Transformer模型,将其翻译成英文句子。
  6. 最后,输出编码句子和对应的解码句子,以及它们的中英文词语对应关系。
# ==========================================================================================
# 预测阶段
# 测试集
sentences = [
# enc_input dec_input dec_output
['我 有 一 个 女 朋 友', '', '']
]

enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
test_loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
enc_inputs, _, _ = next(iter(test_loader))
print()
print("="*30)
print("利用训练好的Transformer模型将中文句子'我 有 一 个 女 朋 友' 翻译成英文句子: ")
for i in range(len(enc_inputs)):
greedy_dec_predict = greedy_decoder(model, enc_inputs[i].view(1, -1).to(device), start_symbol=tgt_vocab["S"])
print(enc_inputs[i], '->', greedy_dec_predict.squeeze())
print([src_idx2word[t.item()] for t in enc_inputs[i]], '->',
[idx2word[n.item()] for n in greedy_dec_predict.squeeze()])
==============================
利用训练好的Transformer模型将中文句子'我 有 一 个 女 朋 友' 翻译成英文句子: 
tensor([1, 2, 3, 4, 9, 6, 7]) -> tensor([ 1,  2,  3,  7,  5, 10])
['我', '有', '一', '个', '女', '朋', '友'] -> ['i', 'have', 'a', 'girl', 'friend', '.']

next 函数说明:

"next" 函数用于返回迭代器的下一个项目,即从迭代器中获取下一个数据项。在这段代码中,使用 next(iter(test_loader)) 获取第一个批次的数据,赋值给 enc_inputs, _ , _ 三个变量。

因为数据量较小,如果测试集选用新的句子,那么结果就不尽人意

# ==========================================================================================
# 预测阶段
# 测试集
sentences = [
# enc_input dec_input dec_output
['我 有 一 个 好 朋 友', '', '']
]

enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
test_loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
enc_inputs, _, _ = next(iter(test_loader))
print()
print("="*30)
print("利用训练好的Transformer模型将中文句子'我 有 一 个 好 朋 友' 翻译成英文句子: ")
for i in range(len(enc_inputs)):
greedy_dec_predict = greedy_decoder(model, enc_inputs[i].view(1, -1).to(device), start_symbol=tgt_vocab["S"])
print(enc_inputs[i], '->', greedy_dec_predict.squeeze())
print([src_idx2word[t.item()] for t in enc_inputs[i]], '->',
[idx2word[n.item()] for n in greedy_dec_predict.squeeze()])
==============================
利用训练好的Transformer模型将中文句子'我 有 一 个 好 朋 友' 翻译成英文句子: 
tensor([1, 2, 3, 4, 5, 6, 7]) -> tensor([ 1,  2,  3,  7,  5, 10])
['我', '有', '一', '个', '好', '朋', '友'] -> ['i', 'have', 'a', 'girl', 'friend', '.']

参考链接

本文详细代码

  1. Colab:

    https://github.com/serika-onoe/transformer_reproduction/blob/main/Transformer_Pytorch.ipynb

  2. Github:

    https://github.com/serika-onoe/transformer_reproduction

参考网站

  1. Transformer 的 PyTorch 实现:

    https://wmathor.com/index.php/archives/1455/

  2. 手把手教你用Pytorch代码实现Transformer模型(超详细的代码解读):

    https://blog.csdn.net/qq_43827595/article/details/120394042

完整代码实现