当采用 RNN 训练序列样本数据时,会面临序列样本数据长短不一的情况。比如做 NLP 任务、语音处理任务时,每个句子或语音序列的长度经常是不相同。难道要一个序列一个序列的喂给网络进行训练吗?这显然是行不通的。

为了更高效的进行 batch 处理,就需要对样本序列进行填充,保证各个样本长度相同,在 PyTorch 里面使用函数 pad_sequence 对序列进行填充。填充之后的样本序列,虽然长度相同了,但是序列里面可能填充了很多无效值 0 ,将填充值 0 喂给 RNN 进行 forward 计算,不仅浪费计算资源,最后得到的值可能还会存在误差。因此在将序列送给 RNN 进行处理之前,需要采用 pack_padded_sequence 进行压缩,压缩掉无效的填充值。序列经过 RNN 处理之后的输出仍然是压紧的序列,需要采用 pad_packed_sequence 把压紧的序列再填充回来,便于进行后续的处理。

1 torch.nn.utils.rnn.pad_sequence

1.函数形式

torch.nn.utils.rnn.pad_sequence(sequences, batch_first=False, padding_value=0.0)

2.函数功能

使用padding_value填充list中不等长的Tensor。

该函数在新的维度堆叠list中的Tensor,并将list中不等长的Tensor填充到相同长度。

B为batch size,T为最长的sequences的长度,L为序列的长度。

3.函数参数

  • sequences:list[Tensor],不定长度的Tensor列表
  • batch_first:bool,如果为True,则输出为B \times T \times *,否则则为T \times B \times *
  • padding_value:float,填充的值,默认为0

4.函数输出

如果batch_first参数为False,则返回形状为T \times B \times *的Tensor;如果设置为True,则返回形状为B \times T \times *的Tensor。

5.使用示例

该函数主要用来对样本进行填充,填充值一般为 0 。我们在训练网络时,一般会采用一个一个 mini-batch 的方式,将训练样本数据喂给网络。在 PyTorch 里面数据都是以 Tensor 的形式存在,一个 mini-batch 实际上就是一个高维的 Tensor ,每个序列数据的长度必须相同才能组成一个 Tensor 。为了使网络可以处理 mini-batch 形式的数据,就必须对序列样本进行填充,保证一个 mini-batch 里面的数据长度是相同的。

在 PyTorch 里面一般是使用 DataLoader 进行数据加载,返回 mini-batch 形式的数据,再将此数据喂给网络进行训练。我们一般会自定义一个 collate_fn 函数,完成对数据的填充。

import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pack_sequence, pad_packed_sequence


class MyData(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        length = len(self.data[idx])
        return self.data[idx]


def collate_fn(data):
    data.sort(key=lambda x: len(x), reverse=True)
    seq_len = [s.size(0) for s in data]  # 获取数据真实的长度
    data = pad_sequence(data, batch_first=True)
    return data


if __name__ == '__main__':
    a = torch.tensor([1, 2, 3, 4])  # sentence 1
    b = torch.tensor([5, 6, 7])  # sentence 2
    c = torch.tensor([7, 8])  # sentence 3
    d = torch.tensor([9])  # sentence 4

    train_x = [a, b, c, d]

    data = MyData(train_x)
    data_loader = DataLoader(data, batch_size=2, shuffle=True, collate_fn=collate_fn)

    batch_x = iter(data_loader)
    for i in range(2):
        print(batch_x.next())

输出

tensor([[7, 8],
        [9, 0]])
tensor([[1, 2, 3, 4],
        [5, 6, 7, 0]])

从上述代码输出的结果我们可以看见,对于一个mini-batch中的两个不等长的序列,会对较短的序列进行填充0,使得较短的序列填充的后的长度与较长的序列长度一致。

2 torch.nn.utils.rnn.pack_padded_sequence

1.函数形式

torch.nn.utils.rnn.pack_padded_sequence(input, lengths, batch_first=False, enforce_sorted=True)

2.函数功能

压缩填充之后的不定长度的Tensor。

如果batch_first为False,则input的大小为T \times B \times *,如果batch_first为True,则input的大小为B \times T \times *,其中T为序列中最长Tensor的长度(等于lengths[0]的长度),B为batch size,*为任意数量的维度(包括0)。

如果设置enforce_sorted=True,则输入序列应该按长度降序排列,input[:,0]应该为最长序列,input[:,B-1]应该为最短序列。在导出onnx时必须要设置enforce_sorted=True

3.函数参数

  • input:Tensor,填充的一个batch的可变长度序列
  • lengths:Tensor或者list(int),batch中每一个序列的填充前的真实长度
  • batch_first:bool,如果设置为True,则input应该为B \times T \times *,如果设置为False,则input的大小为T \times B \times *
  • enforce_sorted:bool,如果设置为True,则输入应该为按长度降序排列的序列,如果为False,则无需排序。

4.函数输出

返回一个PackedSeqence对象。

5.使用示例

import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pack_sequence, pad_packed_sequence


class MyData(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        length = len(self.data[idx])
        return self.data[idx]


def collate_fn(data):
    data.sort(key=lambda x: len(x), reverse=True)
    seq_len = [s.size(0) for s in data]  # 获取数据真实的长度
    data = pad_sequence(data, batch_first=True)
    print(data)
    data = pack_padded_sequence(data, seq_len, batch_first=True)
    return data


if __name__ == '__main__':
    a = torch.tensor([1, 2, 3, 4])  # sentence 1
    b = torch.tensor([5, 6, 7])  # sentence 2
    c = torch.tensor([7, 8])  # sentence 3
    d = torch.tensor([9])  # sentence 4

    train_x = [a, b, c, d]

    data = MyData(train_x)
    data_loader = DataLoader(data, batch_size=2, shuffle=True, collate_fn=collate_fn)

    batch_x = iter(data_loader)
    for i in range(2):
        print(batch_x.next())

输出

tensor([[7, 8],
        [9, 0]])
PackedSequence(data=tensor([7, 9, 8]), batch_sizes=tensor([2, 1]), sorted_indices=None, unsorted_indices=None)
tensor([[1, 2, 3, 4],
        [5, 6, 7, 0]])
PackedSequence(data=tensor([1, 5, 2, 6, 3, 7, 4]), batch_sizes=tensor([2, 2, 2, 1]), sorted_indices=None, unsorted_indices=None)

3 torch.nn.utils.rnn.pack_sequence

1.函数形式

torch.nn.utils.rnn.pack_sequence(sequences, enforce_sorted=True)

2.函数功能

压缩不定长度的序列Tensor。

如果设置enforce_sorted=True,则输入序列应该按长度降序排列,input[:,0]应该为最长序列,input[:,B-1]应该为最短序列。在导出onnx时必须要设置enforce_sorted=True

pack_sequence的功能就等价于pad_sequencepack_padded_sequence两个函数的组合体,将不定长序列进行填充以及将填充之后的序列进行压缩。

3.函数参数

  • sequences:list[Tensor],按序列长度降序排列的序列列表
  • enforce_sorted:bool,默认值为False。如果设置为True,则检查input是否包含按长度降序排列的序列,如果设置为False,则不需要检查。

4.函数输出

返回一个PackedSeqence对象。

5.使用示例

import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pack_sequence, pad_packed_sequence


class MyData(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        length = len(self.data[idx])
        return self.data[idx]


def collate_fn(data):
    data.sort(key=lambda x: len(x), reverse=True)
    seq_len = [s.size(0) for s in data]  # 获取数据真实的长度
    # data = pad_sequence(data, batch_first=True)
    # data = pack_padded_sequence(data, seq_len, batch_first=True)
    data = pack_sequence(data)
    return data


if __name__ == '__main__':
    a = torch.tensor([1, 2, 3, 4])  # sentence 1
    b = torch.tensor([5, 6, 7])  # sentence 2
    c = torch.tensor([7, 8])  # sentence 3
    d = torch.tensor([9])  # sentence 4

    train_x = [a, b, c, d]

    data = MyData(train_x)
    data_loader = DataLoader(data, batch_size=2, shuffle=True, collate_fn=collate_fn)

    batch_x = iter(data_loader)
    for i in range(2):
        print(batch_x.next())

输出

PackedSequence(data=tensor([7, 9, 8]), batch_sizes=tensor([2, 1]), sorted_indices=None, unsorted_indices=None)
PackedSequence(data=tensor([1, 5, 2, 6, 3, 7, 4]), batch_sizes=tensor([2, 2, 2, 1]), sorted_indices=None, unsorted_indices=None)

从输出的结果上看,此函数pack_sequence与第2节的输出结果一致,说明pack_sequence的功能其实就是pad_sequencepack_padded_sequence两个函数功能的组合体。

4 torch.nn.utils.rnn.pad_packed_sequence

1.函数形式

torch.nn.utils.rnn.pad_packed_sequence(sequence, batch_first=False, padding_value=0.0, total_length=None)

2.函数功能

填充一个batch的可变长度序列。

此函数pad_packed_sequencepack_padded_sequence函数的逆操作。

如果batch_first为False,则返回的Tensor的形状为T \times B \times *,如果batch_first为True,则返回Tensor的形状为B \times T \times *,其中B为batch size,T为最长序列的长度。

3.函数参数

  • sequence:PackedSequence对象,需要填充的一个batch的可变长度序列
  • batch_first:bool,如果batch_first为False,则返回的Tensor的形状为T \times B \times *,如果batch_first为True,则返回Tensor的形状为B \times T \times *
  • padding_value:float,默认值为0.0。填充元素的值。
  • total_length:int。如果为None,输出将会被填充到长度total_length。如果total_length小于序列中的最大序列长度,将会排除ValueError。

4.函数输出

该函数返回包含填充序列Tensor元祖,以及包含当前batch中每个序列长度Tensor列表。

5.使用示例

import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pack_sequence, pad_packed_sequence


class MyData(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        length = len(self.data[idx])
        return self.data[idx]


def collate_fn(data):
    data.sort(key=lambda x: len(x), reverse=True)
    seq_len = [s.size(0) for s in data]  # 获取数据真实的长度
    data = pad_sequence(data, batch_first=True)
    print(data)

    data = pack_padded_sequence(data, seq_len, batch_first=True)

    data_unpacked,lens_unpacked = pad_packed_sequence(data,batch_first=True)
    print(data_unpacked)
    print(lens_unpacked)

    return data


if __name__ == '__main__':
    a = torch.tensor([1, 2, 3, 4])  # sentence 1
    b = torch.tensor([5, 6, 7])  # sentence 2
    c = torch.tensor([7, 8])  # sentence 3
    d = torch.tensor([9])  # sentence 4

    train_x = [a, b, c, d]

    data = MyData(train_x)
    data_loader = DataLoader(data, batch_size=2, shuffle=True, collate_fn=collate_fn)

    batch_x = iter(data_loader)
    for i in range(2):
        batch_x.next()

输出

tensor([[5, 6, 7],
        [9, 0, 0]])
tensor([[5, 6, 7],
        [9, 0, 0]])
tensor([3, 1])
tensor([[1, 2, 3, 4],
        [7, 8, 0, 0]])
tensor([[1, 2, 3, 4],
        [7, 8, 0, 0]])
tensor([4, 2])

参考链接