[TOC]

不妨构想一下,我们现在要训练一个模型来识别MNIST中的数字。我们已经知道如何用PyTorch来进行梯度下降,但是,只依赖于PyTorch的这一个特性的话,写出来的代码会非常的冗长复杂。因此,我们将在下面的讲解中逐渐优化代码结构,让代码越来越高级、抽象。不过,我们先从最简单的内容开始。

误差函数

L1和L2误差

abs_loss = F.l1_loss(image_of_3.float(),mean7)
mean_squared_error = F.mse_loss(image_of_3,mean7).sqrt()

交叉熵

详细见第一部分“softmax数值稳定性“。

二元交叉熵

F.binary_cross_entropy and its module equivalent nn.BCELoss calculate cross entropy on a one-hot-encoded target, but do not include the initial sigmoid. Normally, for one-hot-encoded targets you’ll want F.binary_cross_entropy_with_logits (or nn.BCEWithLogitsLoss), which do both sigmoid and binary cross entropy in a single function.

def binary_cross_entropy(inputs, targets):
    inputs = inputs.sigmoid()
    return -torch.where(targets==1, inputs, 1-inputs).log().mean()

有时候我们想用图像的多标签分类,所以不能采用softmax作为交叉熵的先决函数,而是改成sigmoid。

线性层的纯Python实现

def lin(x, w, b): return x@w + b
def relu(x): return x.clamp_min(0.)
def lin_grad(inp, out, w, b):
    # grad of matmul with respect to input
    inp.g = out.g @ w.t()
    w.g = (inp.unsqueeze(-1) * out.g.unsqueeze(1)).sum(0)
    b.g = out.g.sum(0)
def forward_and_backward(inp, targ):
    # forward pass:
    l1 = lin(inp, w1, b1)
    l2 = relu(l1)
    out = lin(l2, w2, b2)
    diff = out[:,0]-targ
    loss = diff.pow(2).mean()
    
    # backward pass:
    out.g = 2.*diff[:,None] / inp.shape[0]
    lin_grad(l2, out, w2, b2)
    l1.g = (l1>0).float() * l2.g
    lin_grad(inp, l1, w1, b1)

这玩意真的不好讲明白,记住这个实现方案能够成功运行就好。我不打算在这方面多花费精力来研究透彻了……

如何用纯Python实现自动梯度计算?

其实神经网络的梯度计算没有那么难,我们真正需要解决的问题就是寻找到每一个权重中的元素对误差函数的导数。

因此,我们并不需要考虑矩阵乘法、线性代数那么多,只需要考虑标量的导数。对此,Andrew Karpathy的解决方案是:

class Value:
    """ stores a single scalar value and its gradient """
    def __init__(self, data, _children=(), _op=''):
        self.data,self.grad,self._op = data,0,_op
        self._backward = lambda: None
        self._prev = set(_children)
    def __add__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data + other.data, (self, other), '+')
        def _backward():
            self.grad += out.grad
            other.grad += out.grad
        out._backward = _backward
        return out
    def __mul__(self, other):
        other = other if isinstance(other, Value) else Value(other)
        out = Value(self.data * other.data, (self, other), '*')
        def _backward():
            self.grad += other.data * out.grad
            other.grad += self.data * out.grad
        out._backward = _backward
        return out
    def __pow__(self, other):
        assert isinstance(other, (int, float)), "only supporting int/float powers for now"
        out = Value(self.data**other, (self,), f'**{other}')
        def _backward():
            self.grad += (other * self.data**(other-1)) * out.grad
        out._backward = _backward
        return out
    def relu(self):
        out = Value(0 if self.data < 0 else self.data, (self,), 'ReLU')
        def _backward():
            self.grad += (out.data > 0) * out.grad
        out._backward = _backward
        return out
    def backward(self):
        topo = []
        visited = set()
        def build_topo(v):
            if v not in visited:
                visited.add(v)
                for child in v._prev:
                    build_topo(child)
                topo.append(v)
        build_topo(self)
        self.grad = 1
        for v in reversed(topo):
            v._backward()
    def __neg__(self): return self * -1
    def __radd__(self, other): return self + other
    def __sub__(self, other): return self + (-other)
    def __rsub__(self, other): return other + (-self)
    def __rmul__(self, other): return self * other
    def __truediv__(self, other): return self * other**-1
    def __rtruediv__(self, other): return other * self**-1
    def __repr__(self): return f"Value(data={self.data}, grad={self.grad})"

这就实现了标量求导的功能。然后我们据此用神经元把多个标量联系在一起,再用神经层把多个神经元联系在一起,再用多个层实现一个完整的神经网络……这个网络的梯度就可以被这样的程序直接自动搞定了!

PyTorch基本概念:Module,ModuleList,Sequential和Optimizer

Module算是所有PyTorch模型里面的父类。它可以存储一些关于模型的信息,然后自动检测里面的参数。

m1 = nn.Module()
m1.foo = nn.Linear(3,4)

可以用list(m1.named_children())获得都有哪些层,然后m1.parameters()获得所有权重矩阵(但返回的形式是一个生成器)。

这样方便的地方在于,如果我们把所有的神经网络层都统一放到Module类里面,或者新建一个继承Module的子类,就可以很方便的在同一层循环对所有的参数一块用梯度更新。

layers = [nn.Linear(m,nh), nn.ReLU(), nn.Linear(nh,10)]
class SequentialModel(nn.Module):
    def __init__(self, layers):
        super().__init__()
        self.layers = nn.ModuleList(layers)
    def forward(self, x):
        for l in self.layers: x = l(x)
        return x
model = nn.Sequential(nn.Linear(m,nh), nn.ReLU(), nn.Linear(nh,10))

ModuleList使用起来也很简单,只是把神经网络的多个层整理到了一起。而Sequential则是我们上面定义的SequentialModel的完美替代,就是相当于自动实现了我们需要的forward方法。

class Optimizer():
    def __init__(self, params, lr=0.5): self.params,self.lr=list(params),lr
    def step(self):
        with torch.no_grad():
            for p in self.params: p -= p.grad * self.lr
    def zero_grad(self):
        for p in self.params: p.grad.data.zero_()

Optimizer做的,则是负责存储某个模型的所有参数,然后通过step()统一更新这些参数,然后通过zero_grad()将这些梯度清零。

使用reduce设计的前向传播

这里介绍functools.reduce方法。

reduce(lambda acc, x: acc + x, [1, 2, 3, 4], 0)

想要理解这个函数,必须一步一步跟着它的算法走。最开始,传入函数acc=0, x=1。运算之后,acc=1,同时迭代到数组的下一项,x=2。进行函数运算之后,新的acc=3。再把x移动到数组的下一项……最后得到结果10。它的优雅之处是用一行代码代替了一整个for循环。如果只给reduce提供了两个参数,那么数组的第一项成为acc,第二项成为x,开始迭代。

from functools import reduce
layers = [nn.Linear(m,nh), nn.ReLU(), nn.Linear(nh,10)]
class Model(nn.Module):
    def __init__(self, layers):
        super().__init__()
        self.layers = layers
        for i,l in enumerate(self.layers): self.add_module(f'layer_{i}', l)
    def forward(self, x): return reduce(lambda val,layer: layer(val), self.layers, x)

再看这个换到神经网络的例子:最开始val=x,用第一层神经网络处理x得到中间结果;然后中间结果成为第二个val,被self.layers中的第二个层处理,以此类推。把这种中间值不断迭代的for循环省略成了一行。

从Dataset到DataLoaders

目前,我们的训练代码可能长这样:

for epoch in range(epochs):
    for i in range(0, n, bs):
        s = slice(i, min(n,i+bs))
        xb,yb = x_train[s],y_train[s]
        preds = model(xb)
        loss = loss_func(preds, yb)
        loss.backward()
        opt.step()
        opt.zero_grad()
    report(loss, preds, yb)

并没有什么技术含量,而且隔时间久的话,再看上去很让人头疼。对此,我们引入第一个抽象结构:Dataset。与其使用xb,yb = x_train[s],y_train[s],不如直接使用xb, yb = train_ds[s]。可以看出,我们做出的改动并不多,对应的代码量也非常小,写在下面:

class Dataset():
    def __init__(self, x, y): self.x,self.y = x,y
    def __len__(self): return len(self.x)
    def __getitem__(self, i): return self.x[i],self.y[i]

还有一件必须要简化的事情,那就是我们不希望自己来手动分出各种批次,希望把for i in range(0, n, bs)...改成for xb, yb in train_dl。这样可以减轻读程序的负担。DataLoader接受Datasetbatch_size作为输入,在被迭代的时候,自动处理切片。

class DataLoader():
    def __init__(self, ds, bs): self.ds,self.bs = ds,bs
    def __iter__(self):
        for i in range(0, len(self.ds), self.bs): yield self.ds[i:i+self.bs]

到目前为止,原先训练的代码可以被简化为:

def fit():
    for epoch in range(epochs):
        for xb,yb in train_dl:
            preds = model(xb)
            loss = loss_func(preds, yb)
            loss.backward()
            opt.step()
            opt.zero_grad()
        report(loss, preds, yb)

看上去清爽不少。但是我们后面还有更多的任务要做——架构基本弄好了,剩下的就是添加功能了!

这其中的功能之一就是:随机取样。为了能和前面的DataLoader比较好融合,这里定义的Sampler返回的是对应的数据集中的索引。其实用语言不太好解释Sampler做的事情,但是代码一看就懂。

from itertools import islice
class Sampler():
    def __init__(self, ds, shuffle=False): self.n,self.shuffle = len(ds),shuffle
    def __iter__(self):
        res = list(range(self.n))
        if self.shuffle: random.shuffle(res)
        return iter(res)
ss = Sampler(train_ds, shuffle=True)
list(islice(ss, 5)) # [44358, 3365, 28345, 30533, 5284]

我之前提到过islice的用法,但是没有细讲。它无非就是产生一个迭代器,但是迭代器针对的是数组的一部分,而不是整个数组。参数的含义和range基本都是一样的。

接下来我们实现BatchSampler

import fastcore.all as fc
class BatchSampler():
    def __init__(self, sampler, bs, drop_last=False): fc.store_attr()
    def __iter__(self): yield from fc.chunked(iter(self.sampler), self.bs, drop_last=self.drop_last)

借讲解这段代码的机会引入好几个新的知识点!

  • fc.store_attr替我们完成了设置变量的工作。
  • 使用yield from iterable可以节省一些情况的代码量,等价于for i in iterable: yield i
  • fc.chunked就是把数据分成一块一块的,并且可以接受每一批数据的大小。

紧接着我们修改DataLoader的代码。

def collate(b):
    xs,ys = zip(*b)
    return torch.stack(xs),torch.stack(ys)
class DataLoader():
    def __init__(self, ds, batchs, collate_fn=collate): fc.store_attr()
    def __iter__(self): yield from (self.collate_fn(self.ds[i] for i in b) for b in self.batchs)
train_samp = BatchSampler(Sampler(train_ds, shuffle=True ), bs)
valid_samp = BatchSampler(Sampler(valid_ds, shuffle=False), bs)
train_dl = DataLoader(train_ds, batchs=train_samp)
valid_dl = DataLoader(valid_ds, batchs=valid_samp)

这里我引入collate函数。我们具体看DataLoader怎么使用:self.batchs是一个传入的Sampler迭代器,我们进行yield from操作,那么就是依次返回self.collate_fn(self.ds[i] for i in b)。记住BatchSampler返回的是对应的数据编号的索引,因此我们需要根据编号来进行查询,这就是self.ds[i] for i in b做的事情;而这样虽然能够简化代码,但是我们会得到许多(x,y)的数据对,为了把x和y之间分开,我们在外部定义了collate

这样,我们就可以在实际写程序的时候用for xb, yb in train_dl: ...的语句了,非常方便!

PyTorch里面的Sampler和DataLoader

import torch.multiprocessing as mp
class DataLoader():
    def __init__(self, ds, batchs, n_workers=1, collate_fn=collate): fc.store_attr()
    def __iter__(self):
        with mp.Pool(self.n_workers) as ex: yield from ex.map(self.ds.__getitem__, iter(self.batchs))

这就是DataLoader通过并发操作来加速获取数据的原理,PyTorch针对这一点做了很多优化,确保加载数据的速度足够快。这里的并发具体发生在__getitem__方法同时调用上。

具体到PyTorch里如何写各种Sampler,没有什么好讲的,我把代码直接摆出来:

from torch.utils.data import DataLoader, SequentialSampler, RandomSampler, BatchSampler
train_samp = BatchSampler(RandomSampler(train_ds),     bs, drop_last=False)
valid_samp = BatchSampler(SequentialSampler(valid_ds), bs, drop_last=False)
train_dl = DataLoader(train_ds, batch_sampler=train_samp, collate_fn=collate)
valid_dl = DataLoader(valid_ds, batch_sampler=valid_samp, collate_fn=collate)

以上是比较繁琐的写法,PyTorch比较聪明,在下面几种写法中(尤其注意第二种),会自动给我们套上数据集对应的Sampler,而且有多线程功能。

train_dl = DataLoader(train_ds, bs, sampler=RandomSampler(train_ds), collate_fn=collate)
valid_dl = DataLoader(valid_ds, bs, sampler=SequentialSampler(valid_ds), collate_fn=collate)

train_dl = DataLoader(train_ds, bs, shuffle=True, drop_last=True, num_workers=2)
valid_dl = DataLoader(valid_ds, bs, shuffle=False, num_workers=2)

# 因为Torch知道怎么整合数据,所以可以省去collate_fn参数
train_dl = DataLoader(train_ds, sampler=train_samp)
valid_dl = DataLoader(valid_ds, sampler=valid_samp)

因为我们还有测试集,所以DataLoaders应运而生!

def get_dls(train_ds, valid_ds, bs, **kwargs):
    return (DataLoader(train_ds, batch_size=bs, shuffle=True, **kwargs),
            DataLoader(valid_ds, batch_size=bs*2, **kwargs))
class DataLoaders:
    def __init__(self, *dls): self.train,self.valid = dls[:2]

    @classmethod
    def from_dd(cls, dd, batch_size, as_tuple=True, **kwargs):
        f = collate_dict(dd['train'])
        return cls(*get_dls(*dd.values(), bs=batch_size, collate_fn=f, **kwargs))

这一段python代码也有很多能深入研究的地方。

  • classmethod可以允许我们使用DataLoaders.from_dd(...)的方法调用这个函数,而DataLoaders这个类被传入了参数cls中,允许我们变相地构造一个新的DataLoaders对象。
  • 记住collate_dict得到的是一个函数,它可以接受包含多个字典的列表,并且将其整理成张量的形式。因为这个Dataloaders类是针对HuggingFace数据集的,需要一些特殊的整合函数,对应的知识在后面一小节会细讲。

HuggingFace Datasets使用

我们以后还会经常从Huggingface上下载数据集使用,所以对这种结构的数据处理也是需要掌握的。下面我们以fashion_mnist的加载为例演示datasets这个模块的用法。首先是加载数据集,我们可以通过load_dataset_builder加载数据集的一些元数据,也可以直接通过load_dataset下载数据集。因为网络因素和load_dataset会自动打开每一个图片文件,代码实际运行起来可能比较慢。

from datasets import load_dataset,load_dataset_builder
name = "fashion_mnist"
ds_builder = load_dataset_builder(name)
dsd = load_dataset(name)

在加载完数据集之后,我们可以在ds_builder.info中获得一些关于数据的有用的信息:

  • ds_builder.info.description是对这个数据集的文字描述。但是不知道为什么,fashion_mnist的数据集描述是空的。Jeremy在课程里面演示的时候,里面有详细的文字说明。
  • ds_builder.info.features获得数据集的输入、输出的数据类型。
{'image': Image(mode=None, decode=True, id=None),
 'label': ClassLabel(names=['T - shirt / top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot'], id=None)}
  • ds_builder.info.splits告诉我们关于测试集的信息,比如Validation Set的大小是多少。在Huggingface的数据集中,数据已经被分好类了,不需要我们再去选择哪些用于训练,哪些用于测试。
{'train': SplitInfo(name='train', num_bytes=31304707, num_examples=60000, shard_lengths=None, dataset_name='fashion_mnist'),
 'test': SplitInfo(name='test', num_bytes=5235160, num_examples=10000, shard_lengths=None, dataset_name='fashion_mnist')}

常用功能

在加载完数据集之后,我们得到一个类似于字典的东西,可以直接通过查询来获得训练集和测试集:

dsd
# DatasetDict({
#    train: Dataset({
#        features: ['image', 'label'],
#        num_rows: 60000
#    })
#    test: Dataset({
#        features: ['image', 'label'],
#        num_rows: 10000
#    })
# })
train,test = dsd['train'],dsd['test']
train[0]
# {'image': <PIL.PngImagePlugin.PngImageFile image mode=L size=28x28>,
#  'label': 9}

我们用x,y = ds_builder.info.features获得输入和输出的标签,虽然等号的右边是一个字典,但是解包时我们只考虑字典的键,所以可以运行。同时,需要注意的是虽然train可以接受索引,但是它的运作方式并不等价于Python的列表。train[:5]返回的是一个字典,里面包含了堆叠好的数据。

x,y = ds_builder.info.features
x,y # ('image', 'label')
xb = train[:5][x]
yb = train[:5][y]

标签转化为文本:

featy = train.features[y]
featy.int2str(yb) # ['Ankle boot', 'T - shirt / top', 'T - shirt / top', 'Dress', 'T - shirt / top']

整合数据

我之所以把datasets的使用说明放在了第二部分而不是“数据准备“的第一部分,正是因为出现了collate_fn这样的高级知识。这个函数本身做的事情很好看懂,把一个装满字典的列表变成了一个装着数据堆叠而成的张量的大字典。最开始有点让我困惑的是,如果整合函数返回的是字典的话,PyTorch怎么处理这个数据结构呢?答案其实就是不处理!用法上,b = next(iter(dl))直接返回的就是一个字典,然后我们通过b[x].shape,b[y]这样的操作才能获取里面的内容。

def collate_fn(b):
    return {x:torch.stack([TF.to_tensor(o[x]) for o in b]),
            y:tensor([o[y] for o in b])}
dl = DataLoader(train, collate_fn=collate_fn, batch_size=16)

你也可以看出来,不管数据结构怎么变,我只需要改变整合函数的细节就可以创建正确的DataLoader。但是这样做很繁琐,我们只需要对数据加一层Transformation,就可以用PyTorch默认的整合函数节省代码量!这就引出了下面的修改:

def transforms(b):
    b[x] = [TF.to_tensor(o) for o in b[x]]
    return b
tds = train.with_transform(transforms)
dl = DataLoader(tds, batch_size=16)

Transforms函数接受的是一批完整的数据Batch,然后希望我们返回修改之后的数据。这里的任务是把PIL形式的图片对象转换成28x28的张量。然后因为PyTorch已经知道如何整合字典类型的数据,我们就不需要再传入自定义的整合函数,于是就方便了很多!进行转换的语句是train.with_transform

可是这个return b显得很冗余,明明我们直接按位操作的语句更简洁,于是这段代码还可以用装饰器进一步简化,最终只需要在transform函数中进行按位操作:

def inplace(f):
    def _f(b):
        f(b)
        return b
    return _f
@inplace
def transformi(b): b[x] = [torch.flatten(TF.to_tensor(o)) for o in b[x]]
tdsf = train.with_transform(transformi)

如果我们想让DataLoader在返回数据的时候不是字典的形式,那么就需要对整合函数做进一步的修改:

from operator import itemgetter
def collate_dict(ds):
    get = itemgetter(*ds.features)
    def _f(b): return get(default_collate(b))
    return _f
dlf = DataLoader(tdsf, batch_size=4, collate_fn=collate_dict(tdsf))

这里的itemgetter就相当于先把要查询的键(这里就是train和valid两个字符串)规定好,然后再把传入的字典作为参数,直接在调用的时候把值取出来。同时,operator库中还有一个类似工作方式的函数:attrgetter,以后也会用到。

自建框架

这算是很高级的一部分内容了,我们的目的是建立一个灵活的深度学习框架,在最大化自由度的基础上,能够简化训练模型的所有流程。回想一下,如果我们想训练一个模型,真正必须要有的,就是神经网络、DataLoaders、优化器、损失函数这几样东西。我们可以据此建立一个抽象的框架,只接受这四个参数,然后就可以实现模型训练。

这就是我们想要的Learner,但是光能够按部就班的训练模型还不够,为了让训练过程灵活、可以检测训练中的种种数据,我们还要创建一系列的回调类。不过,我们先从Learner的源代码开始说起。

Learner类

我们后面马上就会了解到,正是因为支持回调,Learner的功能才会如此强大。下面定义了一个with_cbs类修饰器,和普通的函数形式的修饰器是一样的。当修饰Learner类里的某个方法的时候,with_cbs先被括号里的参数初始化,然后被调用__call__方法原封不动的替换了被修饰函数的功能。可以看出,这个修饰器基本的功能是:在原先的方法调用前,执行对应的回调;执行完之后,再来回调;如果遇到异常让训练中断,仍然会调用cleanup来做清理工作。

class with_cbs:
    def __init__(self, nm): self.nm = nm
    def __call__(self, f):
        def _f(o, *args, **kwargs):
            try:
                o.callback(f'before_{self.nm}')
                f(o, *args, **kwargs)
                o.callback(f'after_{self.nm}')
            except globals()[f'Cancel{self.nm.title()}Exception']: pass
            finally: o.callback(f'cleanup_{self.nm}')
        return _f

然后我们来看如何实现Learner。乍一看,这个代码晦涩难懂,从fit开始一个函数调用另一个函数……但是总体思路很清晰,而且也没有什么复杂的语法点,所以我反而不用详细探究每一行代码的功能。如果细心的话,会注意到整个Learner类都没有负责梯度下降和优化参数的部分,反而多了一堆predict(), get_loss(), backward()之类的未定义的方法。继续追踪下去,会发现它们最终被传送到了回调函数里面。

所以说,在定义完Learner之后,我们要做的就是设计那些合适的回调函数了。

class Learner():
    def __init__(self, model, dls=(0,), loss_func=F.mse_loss, lr=0.1, cbs=None, opt_func=optim.SGD):
        cbs = fc.L(cbs)
        fc.store_attr()
    @with_cbs('batch')
    def _one_batch(self):
        self.predict()
        self.callback('after_predict')
        self.get_loss()
        self.callback('after_loss')
        if self.training:
            self.backward()
            self.callback('after_backward')
            self.step()
            self.callback('after_step')
            self.zero_grad()
    @with_cbs('epoch')
    def _one_epoch(self):
        for self.iter,self.batch in enumerate(self.dl): self._one_batch()
    def one_epoch(self, training):
        self.model.train(training)
        self.dl = self.dls.train if training else self.dls.valid
        self._one_epoch()
    @with_cbs('fit')
    def _fit(self, train, valid):
        for self.epoch in self.epochs:
            if train: self.one_epoch(True)
            if valid: torch.no_grad()(self.one_epoch)(False)
    def fit(self, n_epochs=1, train=True, valid=True, cbs=None, lr=None):
        cbs = fc.L(cbs)
        # `add_cb` and `rm_cb` were added in lesson 18
        for cb in cbs: self.cbs.append(cb)
        try:
            self.n_epochs = n_epochs
            self.epochs = range(n_epochs)
            if lr is None: lr = self.lr
            if self.opt_func: self.opt = self.opt_func(self.model.parameters(), lr)
            self._fit(train, valid)
        finally:
            for cb in cbs: self.cbs.remove(cb)
    def __getattr__(self, name):
        if name in ('predict','get_loss','backward','step','zero_grad'): return partial(self.callback, name)
        raise AttributeError(name)
    def callback(self, method_nm): run_cbs(self.cbs, method_nm, self)
    @property
    def training(self): return self.model.training

回调

class CancelFitException(Exception): pass
class CancelBatchException(Exception): pass
class CancelEpochException(Exception): pass
class Callback(): order = 0
def run_cbs(cbs, method_nm, learn=None):
    for cb in sorted(cbs, key=attrgetter('order')):
        method = getattr(cb, method_nm, None)
        if method is not None: method(learn)

可以看出来,在实际Learner回调的时候,run_cbs会按照一定顺序遍历所有的回调类,并且如果对应的类定义了对应的函数,就会运行那个函数!最上面定义的三个异常类型,实际上给了回调函数直接终止训练过程的能力,这也是增加灵活性的一部分。还有一个大窟窿需要补上,那就是补全Learner里的模型优化环节,为此我们定义下面的TrainCB

class TrainCB(Callback):
    def __init__(self, n_inp=1): self.n_inp = n_inp
    def predict(self, learn): learn.preds = learn.model(*learn.batch[:self.n_inp])
    def get_loss(self, learn): learn.loss = learn.loss_func(learn.preds, *learn.batch[self.n_inp:])
    def backward(self, learn): learn.loss.backward()
    def step(self, learn): learn.opt.step()
    def zero_grad(self, learn): learn.opt.zero_grad()

Metrics Callback

在训练过程中,一个epoch会包含许多批次的数据,然后每个批次都有对应的损失需要计算。而我们最重要的肯定是一个批次的总体误差——而衡量这些误差的主要方法就是loss的平均值和预测的准确程度。为了方便编写代码,有一个专门的库帮助我们完成这样的任务,那就是torcheval。使用起来非常简单,只需要用到三个方法:update()增加数据,compute()计算结果,reset()刷新。

from torcheval.metrics import MulticlassAccuracy,Mean
metric = MulticlassAccuracy()
metric.update(tensor([0, 2, 1, 3]), tensor([0, 1, 2, 3]))
metric.compute() # tensor(0.50)
metric.reset()

但是torcheval处理GPU上的数据会很成问题,所以我们需要一个函数来实现把显卡上的数据转换成存储在CPU上的数据:

def to_cpu(x):
    if isinstance(x, Mapping): return {k:to_cpu(v) for k,v in x.items()}
    if isinstance(x, list): return [to_cpu(o) for o in x]
    if isinstance(x, tuple): return tuple(to_cpu(list(x)))
    res = x.detach().cpu()
    return res.float() if res.dtype==torch.float16 else res

然后我们定义MetricsCB!我们最后创建Callback的形式是metrics = MetricsCB(accuracy=MulticlassAccuracy())。知道这些之后,下面的代码就没有什么好说的了,就是记录loss和你额外规定的metrics,在对应的时间节点添加数据和计算结果。

from torcheval.metrics import MulticlassAccuracy,Mean
class MetricsCB(Callback):
    def __init__(self, *ms, **metrics):
        for o in ms: metrics[type(o).__name__] = o
        self.metrics = metrics
        self.all_metrics = copy(metrics)
        self.all_metrics['loss'] = self.loss = Mean()
    def _log(self, d): print(d)
    def before_fit(self, learn): learn.metrics = self
    def before_epoch(self, learn): [o.reset() for o in self.all_metrics.values()]
    def after_epoch(self, learn):
        log = {k:f'{v.compute():.3f}' for k,v in self.all_metrics.items()}
        log['epoch'] = learn.epoch
        log['train'] = 'train' if learn.model.training else 'eval'
        self._log(log)
    def after_batch(self, learn):
        x,y,*_ = to_cpu(learn.batch)
        for m in self.metrics.values(): m.update(to_cpu(learn.preds), y)
        self.loss.update(to_cpu(learn.loss), weight=len(x))

为了进一步方便数据存储位置的转换,我们定义DeviceCB。如果直接传入DeviceCB()作为回调函数,那么它会默认把所有数据都转换成def_device,也就是cudamps之类的。

class DeviceCB(Callback):
    def __init__(self, device=def_device): fc.store_attr()
    def before_fit(self, learn):
        if hasattr(learn.model, 'to'): learn.model.to(self.device)
    def before_batch(self, learn): learn.batch = to_device(learn.batch, device=self.device)

ProgressBar Callback

在训练过程中,我们需要用到进度条,恰好fastprogress库可以帮我们方便的实现这一点。

from fastprogress.fastprogress import *
from time import sleep
for i in (mb:=master_bar(range(10))):
    for j in mb.progress(range(100)):
        sleep(0.01)
        mb.child.comment = f'second bar stat'
    mb.main_bar.comment = f'first bar stat'
    mb.write(f'Finished loop {i}.')
    
import numpy as np
for i in mb:=master_bar(range(10), names=['cos', 'sin']):
    for j in mb.progress(range(100)):
        if j%10 == 0:
            k = 100 * i + j
            x = np.arange(0, 2*k*np.pi/1000, 0.01)
            y1, y2 = np.cos(x), np.sin(x)
            graphs = [[x,y1], [x,y2]]
            x_bounds = [0, 2*np.pi]
            y_bounds = [-1,1]
            mb.update_graph(graphs, x_bounds, y_bounds)
            mb.child.comment = f'second bar stat'
    mb.main_bar.comment = f'first bar stat'
    mb.write(f'Finished loop {i}.')

语言很难解释,但是结合动图一看就会了。master_bar负责显示训练到了第几epoch,然后下面的progress是小进度条展示每一个epoch训练的进度。我们还可以在进度条旁边放注释,显示中间结果。除此之外,我还展示了第二个例子,那就是你可以随着进度条画一个实时的图像,用到的关键方法是master_bar里面添加names参数,以及update_graph里面紧跟图像坐标和作图范围。

有了上面这些基础知识作铺垫,我们再来看ProgressCB

class ProgressCB(Callback):
    order = MetricsCB.order+1
    def __init__(self, plot=False): self.plot = plot
    def before_fit(self, learn):
        learn.epochs = self.mbar = master_bar(learn.epochs)
        self.first = True
        if hasattr(learn, 'metrics'): learn.metrics._log = self._log
        self.losses = []
        self.val_losses = []
    def _log(self, d):
        if self.first:
            self.mbar.write(list(d), table=True)
            self.first = False
        self.mbar.write(list(d.values()), table=True)
    def before_epoch(self, learn): learn.dl = progress_bar(learn.dl, leave=False, parent=self.mbar)
    def after_batch(self, learn):
        learn.dl.comment = f'{learn.loss:.3f}'
        if self.plot and hasattr(learn, 'metrics') and learn.training:
            self.losses.append(learn.loss.item())
            if self.val_losses: self.mbar.update_graph([[fc.L.range(self.losses), self.losses],[fc.L.range(learn.epoch).map(lambda x: (x+1)*len(learn.dls.train)), self.val_losses]])
            else: self.mbar.update_graph([[fc.L.range(self.losses), self.losses],[[],[]]])    
    def after_epoch(self, learn): 
        if not learn.training:
            if self.plot and hasattr(learn, 'metrics'): 
                self.val_losses.append(learn.metrics.all_metrics['loss'].compute())
                self.mbar.update_graph([[fc.L.range(self.losses), self.losses],[fc.L.range(learn.epoch+1).map(lambda x: (x+1)*len(learn.dls.train)), self.val_losses]])
  • 为什么order要比MetricsCB的优先级要加上1?注意,这里的order越大,优先级其实越低,也就是说,我们通过这行代码,让进度条的显示一定在计算完本批次的误差数据之后再进行。
  • 因此if hasattr(learn, 'metrics'):这行代码就很好理解了,只要设置了Metrics,就一定要重载输出的方法。
  • 很显然learn.dl是一个迭代器,而且有着确定的长度,leave参数决定进度条本身迭代完成之后是否留在屏幕上;我们可以使用write方法来在屏幕上输出内容,比如一个表格就可以用_log的方法很容易显示出来。
  • 明明没有看见_log被调用,怎么回事?其实答案藏在另一个回调函数中:MetricsCB!d参数是一个字典,包含我们的进度条需要显示的所有信息,因为这些都已经在MetricsCBafter_batch方法中定义了。记住所有的torcheval类的使用方法:update(pred, y)之后再用compute就可以吐出我们想要的值。
  • 这里的ProgessCB还实现了一个有用的功能,那就是画出实时的误差图像。在after_batch中可以实现,这需要我们额外单开一个列表记录每一个批次下的误差,但是各个回调函数之间既可以相互联系,又可以平行运行,因此实现起来非常方便省力。
  • 在更新图像的部分,虽然表达式特别长,但是表达了很简单的事情:learn.dls.trainlen操作得到的不是训练样本的个数,而是总共批次的个数,这样我们既能够画出训练集的误差,也能画出验证集的误差!

修改Learner的行为

class TrainLearner(Learner):
    def predict(self): self.preds = self.model(self.batch[0])
    def get_loss(self): self.loss = self.loss_func(self.preds, self.batch[1])
    def backward(self): self.loss.backward()
    def step(self): self.opt.step()
    def zero_grad(self): self.opt.zero_grad()
class MomentumLearner(TrainLearner):
    def __init__(self, model, dls, loss_func, lr=None, cbs=None, opt_func=optim.SGD, mom=0.85):
        self.mom = mom
        super().__init__(model, dls, loss_func, lr, cbs, opt_func)
    def zero_grad(self):
        with torch.no_grad():
            for p in self.model.parameters(): p.grad *= self.mom

我们不再用回调的方式去实现具体的训练过程,而是直接重载Learner类。

这样的好处是,修改具体的训练行为变得非常容易,比如说我们可以立刻实现带有动量的梯度下降——在清除梯度的时候只是将其缩小。这可以让模型更快的收敛。(当然,这不是非常标准的做法,但是达到的功能是类似的)。再强调一点细节:之所以要在修改梯度的时候套上with torch.no_grad(),是因为梯度本身是一个张量,我们不希望这个乘法操作影响之后的梯度运算。

寻找最佳学习率:LRFinder

原理是,我们从一个非常小的学习率开始,对数据一批一批的训练,然后每一步都把学习率调高到原先的1.3倍。我们的loss一般来说会先下降,然后学习率过大之后loss会快速上升,如果上升到记录的最小loss的三倍,我们就停止训练。

from torch.optim.lr_scheduler import ExponentialLR
class LRFinderCB(Callback):
    def __init__(self, gamma=1.3, max_mult=3): fc.store_attr()    
    def before_fit(self, learn):
        self.sched = ExponentialLR(learn.opt, self.gamma)
        self.lrs,self.losses = [],[]
        self.min = math.inf
    def after_batch(self, learn):
        if not learn.training: raise CancelEpochException()
        self.lrs.append(learn.opt.param_groups[0]['lr'])
        loss = to_cpu(learn.loss)
        self.losses.append(loss)
        if loss < self.min: self.min = loss
        if math.isnan(loss) or (loss > self.min*self.max_mult):
            raise CancelFitException()
        self.sched.step()
    def cleanup_fit(self, learn):
        plt.plot(self.lrs, self.losses)
        plt.xscale('log')
@fc.patch
def lr_find(self:Learner, gamma=1.3, max_mult=3, start_lr=1e-5, max_epochs=10):
    self.fit(max_epochs, lr=start_lr, cbs=LRFinderCB(gamma=gamma, max_mult=max_mult))

代码很简单,没有什么特别新的东西;然后我们就可以画图来寻找最佳的学习率了!

监视训练过程中的异常

这一部分的内容是为了应对卷积神经网络的缺陷而产生的。比如说对于fashion_mnist数据集,我们使用如下的简单模型进行训练:

def conv(ni, nf, ks=3, act=True):
    res = nn.Conv2d(ni, nf, stride=2, kernel_size=ks, padding=ks//2)
    if act: res = nn.Sequential(res, nn.ReLU())
    return res
def cnn_layers():
    return [
        conv(1 ,8, ks=5),        #14x14
        conv(8 ,16),             #7x7
        conv(16,32),             #4x4
        conv(32,64),             #2x2
        conv(64,10, act=False),  #1x1
        nn.Flatten()]

看上去很正常,但是我们会发现训练的准确度很低。开始模型的损失确实在下降,但是会在过程中的某一瞬间突然爆炸,然后再也没有恢复回来。通过手工的一些操作记录每一层的激活值(展示方法会和后面的代码重复,我就不在这里列出了),可以发现:每一层的激活值无论是平均值还是标准差最后都非常接近于零,而且在训练的时候,激活值的平均数和标准差都会出现病态的峰值。我们可以通过一些系统化的方法来获得这些模型内部的数据,从而对症下药,优化模型!

我们借助的工具就是PyTorch提供的register_forward_hook。可以直接像下面这样写:

model = nn.Sequential(*cnn_layers())
act_means = [[] for _ in model]
act_stds  = [[] for _ in model]
def append_stats(i, mod, inp, outp):
    act_means[i].append(to_cpu(outp).mean())
    act_stds [i].append(to_cpu(outp).std())
for i,m in enumerate(model): m.register_forward_hook(partial(append_stats, i))

这样我们给神经网络的每一层都连接了一个钩子,在前向传播的过程中,钩子被调用,我们得以存储激活值的有关数据。但是有一个小问题:当我们删除模型的时候,钩子没有被删除,因此会一直占用内存。我们需要在训练结束之后把钩子解绑,用hook.remove()就能做到。为了解决这个问题,我们重构代码,定义了Hook类。

class Hook():
    def __init__(self, m, f): self.hook = m.register_forward_hook(partial(f, self))
    def remove(self): self.hook.remove()
    def __del__(self): self.remove()
def append_stats(hook, mod, inp, outp):
    if not hasattr(hook,'stats'): hook.stats = ([],[],[])
    acts = to_cpu(outp)
    hook.stats[0].append(acts.mean())
    hook.stats[1].append(acts.std())
    hook.stats[2].append(acts.abs().histc(40,0,10))
hooks = [Hook(l, append_stats) for l in model[:5].children()]

在模型的每个前向传播过程完成后,PyTorch都会调用我们注册的Hook,并且提供模型本身、输入值和输出值三个参数。我们实际关心的就是输出值,特别是它们的均值和方差——关于第三个存储的数据我们稍后再详细说明。

  • 特别说明:创建Hook时的参数m并不一定是整个模型本身,而是模型的某一层!这样我们就可以同时检测模型中所有层的激活值情况。

为了更方便地管理创建的所有Hooks,我们再建立一个专门的类!可以看出它会自动为模型的每一层添加一个函数f对应的hook,然后使用形式是一个上下文管理器。

class Hooks(list):
    def __init__(self, ms, f): super().__init__([Hook(m, f) for m in ms])
    def __enter__(self, *args): return self
    def __exit__ (self, *args): self.remove()
    def __del__(self): self.remove()
    def __delitem__(self, i):
        self[i].remove()
        super().__delitem__(i)
    def remove(self):
        for h in self: h.remove()
with Hooks(model, append_stats) as hooks:
    fit(model)
    fig,axs = plt.subplots(1,2, figsize=(10,4))
    for h in hooks:
        for i in 0,1: axs[i].plot(h.stats[i])
    plt.legend(range(6));

但是这仍然需要我们在训练模型的外部继续编写代码,不利于封装。为了弥补这个缺点,我们把Hooks写成一个Learner的回调:

class HooksCallback(Callback):
    def __init__(self, hookfunc, mod_filter=fc.noop, on_train=True, on_valid=False, mods=None):
        fc.store_attr()
        super().__init__()    
    def before_fit(self, learn):
        if self.mods: mods=self.mods
        else: mods = fc.filter_ex(learn.model.modules(), self.mod_filter)
        self.hooks = Hooks(mods, partial(self._hookfunc, learn))
    def _hookfunc(self, learn, *args, **kwargs):
        if (self.on_train and learn.training) or (self.on_valid and not learn.training): self.hookfunc(*args, **kwargs)
    def after_fit(self, learn): self.hooks.remove()
    def __iter__(self): return iter(self.hooks)
    def __len__(self): return len(self.hooks)
hc = HooksCallback(append_stats, mod_filter=fc.risinstance(nn.Conv2d))
  • fc.filter_ex干的事情和内置的filter差不多,就是只保留那些mod_filter返回值为True的元素。
  • learn.model.modules()返回的不只是我们定义的那么多Sequential对象,而是真的一层一层的把所有的嵌套的PyTorch模组都剥开,然后让mod_filter来判断(如果是conv则为真)。

使用方法是,把HooksCallback加入Learner的回调之后,在训练完成之后就可以直接使用hc.stats来访问具体的训练数据。非常方便!

最后,我们检测训练过程中数值分布的变化,方法是画直方图。torch.histc就是一个画直方图的函数,参数依次是分组的个数,最小值和最大值。为了可视化,我们把它竖直堆叠,产生一个彩色图片:

>>> torch.histc(torch.tensor([1., 2, 1]), bins=4, min=0, max=3)
    tensor([ 0.,  2.,  1.,  0.])
def get_hist(h): return torch.stack(h.stats[2]).t().float().log1p()
fig,axes = get_grid(len(hc), figsize=(11,5))
for ax,h in zip(axes.flat, hc):
    show_image(get_hist(h), ax, origin='lower')

可以从实际的图中看出,接近于零的激活值在后面的层中占了绝大多数。那很糟糕了。

我们再把刚才的功能封装成一个新的回调:

class ActivationStats(HooksCallback):
    def __init__(self, mod_filter=fc.noop): super().__init__(append_stats, mod_filter)
    def color_dim(self, figsize=(11,5)):
        fig,axes = get_grid(len(self), figsize=figsize)
        for ax,h in zip(axes.flat, self):
            show_image(get_hist(h), ax, origin='lower')
    def dead_chart(self, figsize=(11,5)):
        fig,axes = get_grid(len(self), figsize=figsize)
        for ax,h in zip(axes.flatten(), self):
            ax.plot(get_min(h))
            ax.set_ylim(0,1)
    def plot_stats(self, figsize=(10,4)):
        fig,axs = plt.subplots(1,2, figsize=figsize)
        for h in self:
            for i in 0,1: axs[i].plot(h.stats[i])
        axs[0].set_title('Means')
        axs[1].set_title('Stdevs')
        plt.legend(fc.L.range(self))

初始化与正则化

我们发现激活值在训练过程中根本就没有保持均值为0、方差为1的稳定分布,有非常大的波动。同时,我们观察到下面的数值爆炸和数值消失现象:

x = torch.randn(200, 100)
for i in range(50): x = x @ torch.randn(100,100) # 全变成nan

x = torch.randn(200, 100)
for i in range(50): x = x @ (torch.randn(100,100) * 0.01) # 全变成0

同样遵循标准正态分布,为什么矩阵多相乘几次就会在数值稳定性上出问题?这是因为矩阵乘法会计算很多向量的点乘,如果向量的长度大,那么最后的结果就会变的很大,缩放不当就会爆炸或者全部变成零。为了应对这个问题,我们在进行矩阵连续乘法的时候,应该遵循Xavier提出的方法:给权重矩阵乘以$\sqrt{\frac{1}{n}}$的因子。n代表输入的数目,在上面的例子中是100,那么因子就应该是0.1。

x = torch.randn(200, 100)
for i in range(50): x = x @ (torch.randn(100,100) * 0.1)

果然,这么操作下来,最终的数值就维持了稳定。但是Xavier初始化也有局限性——我们经常使用ReLU作为激活函数,而它会把所有负数都变成零,这样最后的激活值就不可能平均值接近零了。需要新的初始化方法!

Kaiming Normal初始化

和Xavier提出的公式很相似,它就是把权重的每一项都乘以$\sqrt{\frac{2}{n}}$。PyTorch已经帮我们照顾好了细节,所以只需要调用定义好的函数即可。

def init_weights(m):
    if isinstance(m, (nn.Conv1d,nn.Conv2d,nn.Conv3d)): init.kaiming_normal_(m.weight)
model.apply(init_weights);

这里的model.apply可能很有用,让我们按位操作模型里面的各个层的权重。

Kaiming Uniform, Xavier Uniform初始化

有时候我们也可以让模型的初始权重是均匀分布的。如果是ReLU类激活函数,就使用Kaiming Uniform,本质是让元素的反差变成2/n,如果是Sigmoid、Tanh这类函数,就是用Xavier。直接实现的代码如下:

def xavier_uniform(tensor):
    fan_in = tensor.shape[1]  # input connections
    fan_out = tensor.shape[0]  # output connections
    bound = math.sqrt(6.0 / (fan_in + fan_out))
    tensor.uniform_(-bound, bound)
def kaiming_uniform(tensor):
    fan_in = tensor.shape[1]  # input connections
    bound = math.sqrt(6.0 / fan_in)
    tensor.uniform_(-bound, bound)

公式看上去和上面有不小的差异,但其实本质没有变——只是对方差的操作不同,因为两个权重服从的分布不同!

经过这番操作,模型的表现已经有了一些改善,比如训练误差已经呈现出了下降的趋势,而且在寻找学习率的时候出现的误差图像形状也正常了很多。但是我们可以做得更好,不让模型在训练初期出现尖锐的峰值!同时我们也观察到,尽管我们做了这些初始化的努力,激活值仍然没有做到均值为零、方差为一。所以我们采用的第二个方法是:对每一批次的输入进行正则化,直接操作让它的均值为零,方差为1。怎么实现?当然是再定义一个回调!

class BatchTransformCB(Callback):
    def __init__(self, tfm, on_train=True, on_val=True): fc.store_attr()
    def before_batch(self, learn):
        if (self.on_train and learn.training) or (self.on_val and not learn.training):
            learn.batch = self.tfm(learn.batch)
def _norm(b): return (b[0]-xmean)/xstd,b[1]
norm = BatchTransformCB(_norm)

模型的表现进一步改善了,准确率提升到了83%左右,但是初期峰值仍然存在,同时大部分神经元的值都接近零。

调整ReLU函数:GeneralReLU

这可能是我们采用了ReLU函数出了问题,因为激活值经过它的处理之后,均值不可能保持为零,那么我们对它稍作修改,使其整体下移,并且在小于零的时候斜率变为-0.1而不是0。

class GeneralRelu(nn.Module):
    def __init__(self, leak=None, sub=None, maxv=None):
        super().__init__()
        self.leak,self.sub,self.maxv = leak,sub,maxv
    def forward(self, x): 
        x = F.leaky_relu(x,self.leak) if self.leak is not None else F.relu(x)
        if self.sub is not None: x -= self.sub
        if self.maxv is not None: x.clamp_max_(self.maxv)
        return x

同时我们还需要改很多的其他东西。我们想要更方便地生成一个简单的卷积神经网络,同时允许自定义激活函数,于是就有了act参数;Kaiming初始化原先只针对正常的ReLU,如果采用leaky relu,我们只需要添加一个参数,就是其leaky的程度。

def conv(ni, nf, ks=3, stride=2, act=nn.ReLU):
    res = nn.Conv2d(ni, nf, stride=stride, kernel_size=ks, padding=ks//2)
    if act: res = nn.Sequential(res, act())
    return res
def get_model(act=nn.ReLU, nfs=None):
    if nfs is None: nfs = [1,8,16,32,64]
    layers = [conv(nfs[i], nfs[i+1], act=act) for i in range(len(nfs)-1)]
    return nn.Sequential(*layers, conv(nfs[-1],10, act=None), nn.Flatten()).to(def_device)
def init_weights(m, leaky=0.):
    if isinstance(m, (nn.Conv1d,nn.Conv2d,nn.Conv3d)): init.kaiming_normal_(m.weight, a=leaky)
act_gr = partial(GeneralRelu, leak=0.1, sub=0.4)
astats = ActivationStats(fc.risinstance(GeneralRelu))
cbs = [DeviceCB(), metrics, ProgressCB(plot=True), astats]
iw = partial(init_weights, leaky=0.1)
model = get_model(act_gr).apply(iw)
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(3)

这次终于有了明显的成果:损失曲线不再有尖锐的峰,而是比较平缓的下降,而大部分神经元的激活值都在合理范围内,最后激活值的方差虽然没有到1,但是也维持在了0.8左右,识别准确度到达了87%,进步显著!

但是我们可以更加极端一些,用强制的手段把每个层的激活值输出都变成标准正态分布,那就需要引出下面的方法。

LSUV

这是Layer-wise Sequential Unit-Variance的缩写。具体的过程就是,我们给模型一批次的数据让它进行前向传播,然后每经过一层,就调整那一层的权重和偏置,通过不断的微小调整让均值为零、方差为一。然后再到下一层,以此类推。实现这个的最好方式其实是钩子,在下面的代码中展示:

def _lsuv_stats(hook, mod, inp, outp):
    acts = to_cpu(outp)
    hook.mean = acts.mean()
    hook.std = acts.std()
def lsuv_init(model, m, m_in, xb):
    h = Hook(m, _lsuv_stats)
    with torch.no_grad():
        while model(xb) is not None and (abs(h.std-1)>1e-3 or abs(h.mean)>1e-3):
            m_in.bias -= h.mean
            m_in.weight.data /= h.std
    h.remove()
model = get_model(act_gr)
relus = [o for o in model.modules() if isinstance(o, GeneralRelu)]
convs = [o for o in model.modules() if isinstance(o, nn.Conv2d)]
for ms in zip(relus,convs): lsuv_init(model, *ms, xb.to(def_device))
learn = MomentumLearner(model, dls, F.cross_entropy, lr=0.2, cbs=cbs)
learn.fit(3)
  • 我们先调用model(xb)来触动钩子,让h.stdh.mean有定义;
  • m和m_in传入的分别是激活函数和卷积层,我们需要修改的是卷积层的权重和偏置,让紧跟其后的激活函数输出的激活值分布是正确的。很显然由于激活函数非线性的影响,不可能更新权重和偏置一步到位,所以要用while循环。

效果仍然很好,准确率在86%左右。其实,LSUV的思想已经很接近于下面要介绍的正则化了。

Layer Normalization

如果我们能够让模型自己决定激活值的分布,那会怎么样?这就是我们赋予LayerNorm的权利。如果传进去一张图片,模型可以自己决定输入在中间层的分布并且改变分布。

class LayerNorm(nn.Module):
    def __init__(self, dummy, eps=1e-5):
        super().__init__()
        self.eps = eps
        self.mult = nn.Parameter(tensor(1.))
        self.add  = nn.Parameter(tensor(0.))
    def forward(self, x):
        m = x.mean((1,2,3), keepdim=True) # NCHW
        v = x.var ((1,2,3), keepdim=True)
        x = (x-m) / ((v+self.eps).sqrt())
        return x*self.mult + self.add

我们先对整层进行正则化,然后我们再使用multadd来让输出进行灵活调整。而且我们只需要把这个当成额外的一层插入进原先的架构中就行!当然,对应的convget_model也需要修改,我就不再粘贴代码了。

Batch Normalization

相当于刚才的升级版本。不同之处是,我们对每一个channel都有不同的变换系数multadd。同时,我们采用移动平均。

class BatchNorm(nn.Module):
    def __init__(self, nf, mom=0.1, eps=1e-5):
        super().__init__()
        # NB: pytorch bn mom is opposite of what you'd expect
        self.mom,self.eps = mom,eps
        self.mults = nn.Parameter(torch.ones (nf,1,1))
        self.adds  = nn.Parameter(torch.zeros(nf,1,1))
        self.register_buffer('vars',  torch.ones(1,nf,1,1))
        self.register_buffer('means', torch.zeros(1,nf,1,1))        
    def update_stats(self, x):
        m = x.mean((0,2,3), keepdim=True)
        v = x.var ((0,2,3), keepdim=True)
        self.means.lerp_(m, self.mom)
        self.vars.lerp_ (v, self.mom)
        return m,v        
    def forward(self, x):
        if self.training:
            with torch.no_grad(): m,v = self.update_stats(x)
        else: m,v = self.means,self.vars
        x = (x-m) / (v+self.eps).sqrt()
        return x*self.mults + self.adds

使用这个方法可以让模型达到87%准确率!

我们为什么在除了channels的所有维度上做平均:我们让上一个维度的每个Kernel都有自己对应的线性变化因子,这样不管是检测哪一种模式,信号都不会有在数值范围上的大变化,从而做到使训练过程更加顺畅。因此我们也知道,因为BatchNorm有神经网络需要学习的参数,我们必须在定义BatchNorm层的时候明确Channels的数量,这和池化层有根本的不同。

Each feature (or channel) represents a different “detector” or pattern the network is learning. For example, in an image, one channel might detect edges, another might detect textures.

By normalizing each feature independently across all other dimensions (batch, spatial positions, time steps), you’re asking: “What’s the typical activation level for this detector across all the data?”

This keeps each detector’s output in a consistent range, preventing some features from dominating while others fade away. It’s like making sure all team members speak at a similar volume so everyone can be heard.

The key insight: features are what you’re learning, so you normalize them separately. Everything else (batch items, positions, time) are just different examples or contexts where that feature appears.

使用移动平均方法计算数据的平均值和标准差的意义:在BatchNorm中,我们使用lerp方法来记录经历了数个Batch之后的数据均值和标准差。我们需要这样做是因为BatchNorm的特性:既然是在每个Batch中取平均,那么最后进行Inference的时候使用的平均值数据和训练过程中动态计算(不加指数平均操作)的数据产生随机的误差——而我们需要避免这种误差。

除了应用在图片处理之外,BatchNorm也有对应全连接层或者嵌入层的1d版本——道理是一样的,就是在Batch维度上取平均。

Instance Normalization

在刚才,我们平均了(0,2,3)维度,但是如果我们不希望一张图片正则化之后的结果被其他图片的像素值影响,比如在图片风格转移的任务类型中,我们就可以只在(2,3)维度做平均,对每个图片分别计算均值和方差来进行正则化!

特别注意的是,在这种正则化的方法中,我们不需要计算移动平均值和标准差,因为我们在训练过程中计算mean和var的方式和在Inference过程中完全相同,因为这时的mean和var只依赖于图片本身。

class InstanceNorm2d(nn.Module):
    def __init__(self, num_features, eps=1e-5, affine=True):
        super().__init__()
        self.eps = eps
        self.affine = affine
        if affine:
            self.gamma = nn.Parameter(torch.ones(1, num_features, 1, 1))
            self.beta = nn.Parameter(torch.zeros(1, num_features, 1, 1))
        
    def forward(self, x):
        # x shape: (batch, channels, height, width)
        # Compute mean and var for each instance and channel
        mean = x.mean(dim=(2, 3), keepdim=True)
        var = ((x - mean) ** 2).mean(dim=(2, 3), keepdim=True)
        # Normalize
        x_norm = (x - mean) / torch.sqrt(var + self.eps)
        # Apply scale and shift if affine
        if self.affine:
            x_norm = self.gamma * x_norm + self.beta
        return x_norm

其中,affine的作用是决定是否给模型自主控制输入分布的能力。如果这个值为真,我们就定义伸缩系数gamma和beta,否则就不定义。

WeightNorm和SpectralNorm

这两个正则化方法并不是作为一个层单独插入神经网络,而是用PyTorch的Wrapper直接处理已有的卷积神经网络:

from torch.nn.utils import weight_norm, spectral_norm
# Apply to a conv layer
conv = nn.Conv2d(3, 64, 3)
conv_wn = weight_norm(conv)
conv_sn = spectral_norm(conv)

WeightNorm背后的原理是,把参数重新计算为:

weight = g * (v / ||v||)

g是一个标量,然后分别优化g和v。这样做的好处是,神经网络的激活值不会受到影响,但是我们把权重的更新分为了两部分,而且这两部分的更新都更加平滑,模型可以更快的收敛到一个损失低的地方。

SpectralNorm本质是让权重除以自己最大的奇异值,

weight = W / sigma(W)

这种正则化方法做的本质上是保持神经网络层的一致连续性——在输入中很小的变化也只能对应输出中很小的变化,不会因为某一层的权重因素而让信号被过度放大。在对抗网络中,我们对Discriminator用SpectralNorm修饰,目的是不要让它在训练的前期对自己的结果过度自信,慢慢地优化到最合适的权重。

实际应用:

Weight Normalization is used in:

  • Recurrent networks (RNNs, LSTMs) where it helps with gradient flow
  • WaveNet and other audio generation models
  • Generally when you want faster convergence than BatchNorm provides

Spectral Normalization is most popular in:

  • GANs (Generative Adversarial Networks) - especially in the discriminator to stabilize training
  • Any network where you need to control Lipschitz continuity
  • Style transfer and image generation tasks

Spectral norm became particularly famous after the “Spectral Normalization for GANs” paper showed it significantly improved GAN training stability.

优化器

我们可以通过调整SGD更改梯度和更新参数的方式来让模型更快收敛。

SGD

就是一个最基本的类,顺便实现了一下Weight Decay。

class SGD:
    def __init__(self, params, lr, wd=0.):
        params = list(params)
        fc.store_attr()
        self.i = 0
    def step(self):
        with torch.no_grad():
            for p in self.params:
                self.reg_step(p)
                self.opt_step(p)
        self.i +=1
    def opt_step(self, p): p -= p.grad * self.lr
    def reg_step(self, p):
        if self.wd != 0: p *= 1 - self.lr*self.wd
    def zero_grad(self):
        for p in self.params: p.grad.data.zero_()

Momentum

我们在上一节实现的“带动量的梯度下降”其实并不准确,真正的动量计算应该是取移动平均值。

class Momentum(SGD):
    def __init__(self, params, lr, wd=0., mom=0.9):
        super().__init__(params, lr=lr, wd=wd)
        self.mom=mom
    def opt_step(self, p):
        if not hasattr(p, 'grad_avg'): p.grad_avg = torch.zeros_like(p.grad)
        p.grad_avg = p.grad_avg*self.mom + p.grad*(1-self.mom)
        p -= self.lr * p.grad_avg

它的效果就好于单纯的梯度下降,并且允许我们使用更大的学习率。

RMSProp

这里不再取梯度的平均值,而是梯度的平方数的平均值!它的核心理念是给不同的参数不同的学习率。如果一个参数的梯度很大,那么学习率就会变的比较小,而如果梯度很小,那么学习率就会相对大一些。

class RMSProp(SGD):
    def __init__(self, params, lr, wd=0., sqr_mom=0.99, eps=1e-5):
        super().__init__(params, lr=lr, wd=wd)
        self.sqr_mom,self.eps = sqr_mom,eps
    def opt_step(self, p):
        if not hasattr(p, 'sqr_avg'): p.sqr_avg = p.grad**2
        p.sqr_avg = p.sqr_avg*self.sqr_mom + p.grad**2*(1-self.sqr_mom)
        p -= self.lr * p.grad/(p.sqr_avg.sqrt() + self.eps)

Adam

相当于刚才两种优化器的结合版本,就是把最后的p.grad也替换成移动平均的版本。同时为了应对最开始数值丢失,我们也计算了unbiased_avgunbiased_sqr_avg防止开始的学习率过大。

class Adam(SGD):
    def __init__(self, params, lr, wd=0., beta1=0.9, beta2=0.99, eps=1e-5):
        super().__init__(params, lr=lr, wd=wd)
        self.beta1,self.beta2,self.eps = beta1,beta2,eps
    def opt_step(self, p):
        if not hasattr(p, 'avg'): p.avg = torch.zeros_like(p.grad.data)
        if not hasattr(p, 'sqr_avg'): p.sqr_avg = torch.zeros_like(p.grad.data)
        p.avg = self.beta1*p.avg + (1-self.beta1)*p.grad
        unbias_avg = p.avg / (1 - (self.beta1**(self.i+1)))
        p.sqr_avg = self.beta2*p.sqr_avg + (1-self.beta2)*(p.grad**2)
        unbias_sqr_avg = p.sqr_avg / (1 - (self.beta2**(self.i+1)))
        p -= self.lr * unbias_avg / (unbias_sqr_avg + self.eps).sqrt()

学习率规划

随着训练过程的进行,模型需要的权重调整越来越小,因此我们也希望学习率在训练后期也会相应的变小。一种实现的方式就是使用torchlr_scheduler

from torch.optim import lr_scheduler
sched = lr_scheduler.CosineAnnealingLR(opt, 100)

然后,我们可以通过sched.base_lrs获得最开始的学习率,sched.get_last_lr()获得最新的学习率。

def sched_lrs(sched, steps):
    lrs = [sched.get_last_lr()]
    for i in range(steps):
        sched.optimizer.step()
        sched.step()
        lrs.append(sched.get_last_lr())
    plt.plot(lrs)

这段代码会画出一个余弦函数的图案,从step=0处的最高点到step=100处的最低点。更进一步,我们可以据此实现一个学习率规划的回调程序,来实时控制训练过程中的各种超参数,来达到最佳的训练效果。

class BaseSchedCB(Callback):
    def __init__(self, sched): self.sched = sched
    def before_fit(self, learn): self.schedo = self.sched(learn.opt)
    def _step(self, learn):
        if learn.training: self.schedo.step()
class BatchSchedCB(BaseSchedCB):
    def after_batch(self, learn): self._step(learn)
class EpochSchedCB(BaseSchedCB):
    def after_epoch(self, learn): self._step(learn)

这里要传入的sched参数就是pytorch提供的各种LRScheduler,每当调用一次step(),学习率就会发生相应的改变。我们定义这么多类,就是为了提高框架的灵活性,方便后续功能的添加,比如我们可以轻松决定模型是在每个Batch后更新学习率还是每个Epoch后才更新。

同时,我们还要一个回调函数来记录训练过程中这些超参数的变化,因此,RecorderCB应运而生:

class RecorderCB(Callback):
    def __init__(self, **d): self.d = d
    def before_fit(self, learn):
        self.recs = {k:[] for k in self.d}
        self.pg = learn.opt.param_groups[0]    
    def after_batch(self, learn):
        if not learn.training: return
        for k,v in self.d.items():
            self.recs[k].append(v(self))
    def plot(self):
        for k,v in self.recs.items():
            plt.plot(v, label=k)
            plt.legend()
            plt.show()
def _lr(cb): return cb.pg['lr']
sched = partial(lr_scheduler.CosineAnnealingLR, T_max=tmax)
rec = RecorderCB(lr=_lr)
xtra = [BatchSchedCB(sched),rec]
learn = TrainLearner(model, dls, F.cross_entropy, lr=2e-2, cbs=cbs+xtra, opt_func=optim.AdamW)

这段代码可能因为背景信息太少而显得比较生硬。

  • lr_scheduler.CosineAnnealingLR就是我们刚才提到的一个典型的学习率规划类,按照余弦函数的形状来让学习率递减。因为余弦函数不是单调函数,所以需要提前告知训练的批次数,在第一个批次的时候学习率最大,最后一个学习率最小。
  • learn.opt.param_groups[0]是我们直接通过优化器来获得学习率的一种方式。对于比较大的模型,可能会有多组参数,因为我们可能还会进行迁移学习,不同部分需要不同的学习率。
  • d是一个字典,包含了我们要记录的数据名称,如lr,还有我们获取这种数据的方式,定义在_lr中。
  • 最后还额外定义了一个画图的方法,比较简单,就不解释了。AdamW是Adam+Weight Decay的优化器。

1Cycle

我们可以利用上面搭好的框架直接实现fit_1_cycle

def _beta1(cb): return cb.pg['betas'][0]
rec = RecorderCB(lr=_lr, mom=_beta1)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
xtra = [BatchSchedCB(sched), rec]
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=optim.AdamW)

当然,我把没有变化的代码省略了。此时再调用rec.plot()时就会出现两幅图,一个是学习率的图像,一个是动量的图像。

在训练过程中,学习率先从小变大然后又变小,动量从大到小最后又变大。

PyTorch保存模型

mdl_path = Path('models')
mdl_path.mkdir(exist_ok=True)
torch.save(learn.model, mdl_path/'data_aug.pkl')