这份文档是我自2024年开始学AI来所习得知识的精华。2026年1月,我打算扩写一下这些笔记,让我的解释更加详细和可读性,优化笔记的结构,这样我对各种深度学习中概念的印象也会更加深刻。在扩写之前,总字数就已经破了两万,转化成PDF文档的笔记有足足六十多页。扩写之后,我希望这能够成为一个维护良好的深度学习教程,其他人也能从这份笔记中有所收获。

[TOC]

使用模型

我把先前放在这里的Random Forest相关的内容都存到了另一个专用的文件里,因为内容太多了!放在一个文件里就太大了!

Multilabel Image Classification

和正常的图像分类是一个套路,这里只列出datablock的定义方法。

dblock = DataBlock(blocks=(ImageBlock, MultiCategoryBlock),
                   get_items = get_image_files, 
                   get_y = lambda o: [parent_label(o)],
                   item_tfms=RandomResizedCrop(224, min_scale=0.5),
                   batch_tfms=aug_transforms(),
                   splitter=RandomSplitter(valid_pct=0.2))
dls = dblock.dataloaders(path)
learn = vision_learner(dls, resnet18, metrics=partial(accuracy_multi, thresh=0.5))
learn.fine_tune(3)

注意几个小点:

  • get_y需要传递一个列表;
  • metrics参数需要使用partial的accuracy_multi函数,并且规定阈值。

Image Regression

biwi = DataBlock(
    blocks=(ImageBlock, PointBlock),
    get_items=get_image_files,
    get_y=get_ctr,
    splitter=FuncSplitter(lambda o: o.parent.name=='13'),
    batch_tfms=aug_transforms(size=(240,320)), 
)

注意get_ctr返回的是形如tensor([350.4915, 262.9643])的点。

我们需要指定输出的是图像上的点,因为图像在做增强变换时,对应的点的位置也会发生变化。fastai也会对该点的坐标进行变换。

dls = biwi.dataloaders(path)
learn = vision_learner(dls, resnet18, y_range=(-1,1))
learn.fine_tune(3, 1e-2)

图像处理的高级方法

  1. 正则化:使用预训练模型的时候要特别注意。使用方法:在图像变换的参数batch_tfms要加入Normalize参数。
batch_tfms=[*aug_transforms(size=size, min_scale=0.75),
                               Normalize.from_stats(*imagenet_stats)])
  1. Progressive Resizing:训练模型的时候中途易辙,逐渐改用更大的图像大小进行fine_tune,从而达到训练目的。
dls = get_dls(128, 128)
learn = Learner(dls, xresnet50(n_out=dls.c), loss_func=CrossEntropyLossFlat(), 
                metrics=accuracy)
learn.fit_one_cycle(4, 3e-3)
# 换dataloaders
learn.dls = get_dls(64, 224)
learn.fine_tune(5, 1e-3)
  1. Test time augmentation:对validation set的图像也做增强处理并取平均值。
preds,targs = learn.tta()
accuracy(preds, targs).item()

一般来说,这样做可以提高准确率。

  1. Mixup:把不同图像混合。使模型在更小的数据集上具有更强的泛化能力。代价是,需要更长的训练时间。
model = xresnet50()
learn = Learner(dls, model, loss_func=CrossEntropyLossFlat(), metrics=accuracy, cbs=Mixup)
learn.fit_one_cycle(5, 3e-3)
  1. Label Smoothing:让最终预测的结果不是0和1,而是比零略大、比一略小的数字,增强模型的泛化能力。只需改动损失函数。
model = xresnet50()
learn = Learner(dls, model, loss_func=LabelSmoothingCrossEntropy(), metrics=accuracy)
learn.fit_one_cycle(5, 3e-3)
  1. Gradient Accumulation:如果使用的预训练模型比较大,而且batch_size选的比较大的话,可能会用尽GPU的内存而报错。这里采用一个方法,就是先对小的batch_size进行训练,并且把几个小的batch的梯度叠加,就可以得到与原来结果等效、而且不占用多少内存的新方法。
import gc
def report_gpu():
    print("Used: ", torch.cuda.memory_allocated()/(1024*1024*1024), 'GB')
    gc.collect()
    torch.cuda.empty_cache()

这是我们用来检查GPU占用内存多少的函数,并且附带了清理功能;

def train(arch, size, item=Resize(480, method='squish'), accum=1, finetune=True, epochs=12):
    dls = ImageDataLoaders.from_folder(trn_path, valid_pct=0.2, item_tfms=item,
        batch_tfms=aug_transforms(size=size, min_scale=0.75), bs=64//accum)
    cbs = GradientAccumulation(64) if accum else []
    learn = vision_learner(dls, arch, metrics=error_rate, cbs=cbs).to_fp16()
    if finetune:
        learn.fine_tune(epochs, 0.01)
        return learn.tta(dl=dls.test_dl(tst_files))
    else:
        learn.unfreeze()
        learn.fit_one_cycle(epochs, 0.01)

这是我们用来训练的函数。只需要传入GradientAccumulation作为callback的参数即可。

用一个模型干两件事

第一步,定义DataBlock。在paddy disease的例子中,我们既想预测水稻的疾病类型,又想预测水稻的品种。注意get_y需要变成一个列表,与两个对应的CategoryBlock相对应。

dls = DataBlock(
    blocks=(ImageBlock,CategoryBlock,CategoryBlock),
    n_inp=1,
    get_items=get_image_files,
    get_y = [parent_label,get_variety],
    splitter=RandomSplitter(0.2, seed=42),
    item_tfms=Resize(192, method='squish'),
    batch_tfms=aug_transforms(size=128, min_scale=0.75)
).dataloaders(trn_path)

第二步,我们需要对模型的评估函数进行修改,因为它现在接收的参数不是两个,而是三个!注意,我们现在所做的改动是:只是在模型的输入中增加了关于品种的信息,并没有给模型任何拟合品种的要求。由于现在需要有多个输出,fastai不再知道需要多少个神经元在输出层,必须手动规定n_out参数

def disease_err(inp,disease,variety): return error_rate(inp,disease)
def disease_loss(inp,disease,variety): return F.cross_entropy(inp,disease)
arch = 'convnext_small_in22k'
learn = vision_learner(dls, arch, loss_func=disease_loss, metrics=disease_err, n_out=10).to_fp16()
lr = 0.01
learn.fine_tune(5, lr)

第三步,进一步修改,让模型真正做到multi-target:我们把n_out参数修改为20,那么在所有的损失函数和评估标准函数中,接受的输入都具有20列,所以需要产生相应的变化。代码还是很好理解的,都是字面意思。注意到fastai接受的metrics可以是列表,损失函数直接把两项分别的损失相加即可。

learn = vision_learner(dls, arch, n_out=20).to_fp16()
def disease_loss(inp,disease,variety): return F.cross_entropy(inp[:,:10],disease)
def variety_loss(inp,disease,variety): return F.cross_entropy(inp[:,10:],variety)
def combine_loss(inp,disease,variety): return disease_loss(inp,disease,variety)+variety_loss(inp,disease,variety)
def disease_err(inp,disease,variety): return error_rate(inp[:,:10],disease)
def variety_err(inp,disease,variety): return error_rate(inp[:,10:],variety)
err_metrics = (disease_err,variety_err)
all_metrics = err_metrics+(disease_loss,variety_loss)

第四步,创建最终的learner,大功告成:

learn = vision_learner(dls, arch, loss_func=combine_loss, metrics=all_metrics, n_out=20).to_fp16()
learn.fine_tune(5, lr)

Collaborative Filtering

第一步是根据对应的DataFrame创建DataLoaders:需要注意的是,这样的dataframe必须包含两列,一个叫user,一个叫item。如果没有列名字叫item,那就需要传入参数item_name

from fastai.collab import *
from fastai.tabular.all import *
dls = CollabDataLoaders.from_df(ratings, item_name='title', bs=64)
dls.show_batch()

第二步是建立对应的PyTorch类,完成模型构建。对于最初级的collaborative filtering,我们只需要对用户和电影定义一定长度的latent factors,然后计算得分就是二者的点乘即可。于是这就是我们最简单的一个模型,fastai会自动帮我们找到合适的误差函数:

class DotProduct(Module):
    def __init__(self, n_users, n_movies, n_factors):
        self.user_factors = Embedding(n_users, n_factors)
        self.movie_factors = Embedding(n_movies, n_factors)
        
    def forward(self, x):
        users = self.user_factors(x[:,0])
        movies = self.movie_factors(x[:,1])
        return (users * movies).sum(dim=1)

因为我之前从来没有用过pytorch的相关架构,所以解释一下这里面的各种新鲜东西的含义。

  • Module理论上来说应该属于torch.nn.Module,但fastai做了相关的优化,所以这个Module其实是fastai库里的。
  • 必须要用Embedding,这样pytorch才能把设置的参数识别为模型需要的、要修改的参数。

注:Embedding是嵌入层,本质上就是一个巨大的表格,我们传入的就是要查询的索引,然后Pytorch会用自己优化过的方法帮助我们快速查询,中间不存在矩阵乘法操作,然后梯度计算也大大简化。

  • forward方法是模型在训练的时候要调用的方法,通俗来说,就是模型在forward里面产生最终结果。这个传入的参数x,对应着下面的变量:
x,y = dls.one_batch()
# x的前五项:
# tensor([[ 13, 422],
#        [472, 116],
#        [804, 590],
#        [747, 935],
#        [363, 716]])

也就是第一行对应着user_id,第二行对应着movie_id。我们做的,就是查询这些用户对应的向量和电影对应的向量,点乘,返回结果。

前面也科普过sum的用法,这里就很好理解了。

第三步,创建初步的learner:

model = DotProduct(n_users, n_movies, 50)
learn = Learner(dls, model, loss_func=MSELossFlat())
learn.fit_one_cycle(5, 5e-3)

就可以进行训练!

修改1:修改预测范围

class DotProduct(Module):
    def __init__(self, n_users, n_movies, n_factors, y_range=(0,5.5)):
        self.user_factors = Embedding(n_users, n_factors)
        self.movie_factors = Embedding(n_movies, n_factors)
        self.y_range = y_range
        
    def forward(self, x):
        users = self.user_factors(x[:,0])
        movies = self.movie_factors(x[:,1])
        return sigmoid_range((users * movies).sum(dim=1), *self.y_range)
model = DotProduct(n_users, n_movies, 50)
learn = Learner(dls, model, loss_func=MSELossFlat())
learn.fit_one_cycle(5, 5e-3)

注意到sigmoid的渐近特性,我们的分数不可能超过5,但其实是有很多人给电影打五分的!所以我们略微上调y_range的上限,使用sigmoid_range处理可以很好地限制结果的范围。但是实际上这样做的效果是让模型的训练过程变慢了,需要更多的循环才能达到最佳效果。

好像降低了模型的表现,建议和其他改进搭配使用。

修改2:添加bias

class DotProductBias(Module):
    def __init__(self, n_users, n_movies, n_factors, y_range=(0,5.5)):
        self.user_factors = Embedding(n_users, n_factors)
        self.user_bias = Embedding(n_users, 1)
        self.movie_factors = Embedding(n_movies, n_factors)
        self.movie_bias = Embedding(n_movies, 1)
        self.y_range = y_range
        
    def forward(self, x):
        users = self.user_factors(x[:,0])
        movies = self.movie_factors(x[:,1])
        res = (users * movies).sum(dim=1, keepdim=True)
        res += self.user_bias(x[:,0]) + self.movie_bias(x[:,1])
        return sigmoid_range(res, *self.y_range)
model = DotProductBias(n_users, n_movies, 50)
learn = Learner(dls, model, loss_func=MSELossFlat())
learn.fit_one_cycle(5, 5e-3)

我们增加了偏移量,让拟合的曲线具有更多的灵活性。但是对模型表现影响不大,甚至降低了表现,容易出现过拟合

修改3:weight decay

这是唯一能够显著改善模型表现的举措!

model = DotProductBias(n_users, n_movies, 50)
learn = Learner(dls, model, loss_func=MSELossFlat())
learn.fit_one_cycle(5, 5e-3, wd=0.1)

修改起来也是最简单的,只需要加入wd参数即可。

它的主要作用是防止模型过拟合,控制参数的大小在一定范围内,这样能够引导模型走上正确的道路,达到更低的误差率。

解读训练结果

movie_bias = learn.model.movie_bias.squeeze()
idxs = movie_bias.argsort()[:5]
[dls.classes['title'][i] for i in idxs]
  • argsort()很方便,不是把数值从小到大排列,而是在这种顺序下给你对应索引的排列!descending=True参数则是降序排列。

  • dls.classes['title']可以获得对应的电影名称。但这是普通列表,不能使用张量方法。

learn = collab_learner(dls, n_factors=50, y_range=(0, 5.5))

如上,可以通过更简单的方法进行训练。

找到最相似的电影:

movie_factors = learn.model.i_weight.weight
idx = dls.classes['title'].o2i['Silence of the Lambs, The (1991)']
distances = nn.CosineSimilarity(dim=1)(movie_factors, movie_factors[idx][None])
idx = distances.argsort(descending=True)[1]
dls.classes['title'][idx]

多层神经网络

class CollabNN(Module):
    def __init__(self, user_sz, item_sz, y_range=(0,5.5), n_act=100):
        self.user_factors = Embedding(*user_sz)
        self.item_factors = Embedding(*item_sz)
        self.layers = nn.Sequential(
            nn.Linear(user_sz[1]+item_sz[1], n_act),
            nn.ReLU(),
            nn.Linear(n_act, 1))
        self.y_range = y_range
        
    def forward(self, x):
        embs = self.user_factors(x[:,0]),self.item_factors(x[:,1])
        x = self.layers(torch.cat(embs, dim=1))
        return sigmoid_range(x, *self.y_range)
    
embs = get_emb_sz(dls)
model = CollabNN(*embs)
learn = Learner(dls, model, loss_func=MSELossFlat())
learn.fit_one_cycle(5, 5e-3, wd=0.01)

或者:

learn = collab_learner(dls, use_nn=True, y_range=(0, 5.5), layers=[100,50])
learn.fit_one_cycle(5, 5e-3, wd=0.1)

只需要设置参数use_nn=True!一般来说这样模型的表现较差,但是可以与其他类型的数据结合。

Natural Language Processing

先在这里打个预防针:自然语言处理特别吃计算资源。我能够真正细讲的,都是训练模型之前的部分,训练模型耗费的时间实在是太长了。

直观感受一下,imdb有十万个文本文件。

第一步:Tokenization

我还是把寻找数据的步骤列出来吧。

path = untar_data(URLs.IMDB)
files = get_text_files(path, folders = ['train', 'test', 'unsup'])
txt = files[0].open().read(); txt[:75]
# 'Once again Mr. Costner has dragged out a movie for far longer than necessar'

如果打开文件的时间特别长,那……就说明此时不适合训练模型。

在windows系统下,multiprocessing被禁用了,所以打开文件的速度很慢。

然后我们使用spacy = WordTokenizer()来创建这个Tokenizer。我注意到这里面可以接受lang参数,但是fastai并没有增加对中文的支持(貌似是bug)。

注意,在接下来的处理过程中,spacy就可以当作函数使用,但是接受的参数是列表形式,不是你要tokenize的字符串!所以要这样使用:

iterator = spacy([txt])
list(iterator)
# [(#187)['Once','again','Mr.','Costner','has','dragged','out','a','movie','for','far','longer','than','necessary','.','Aside','from','the','terrific','sea'...]]

所以,我们让toks = first(spacy([txt])),来获得这个列表里面的第一组(也是唯一一组)数据。

小妙招:我们想让L数据类型展示前30项数据,可以使用coll_repr(toks, max_n=30)

为了增强spacy的功能,我们再套一层tokenizer:tkn = Tokenizer(spacy)。此时我们就可以把tkn当成一个普通的函数来处理了,接受一个字符串作为输入,一个列表作为输出。

tkn = Tokenizer(spacy)
print(coll_repr(tkn(txt), 31))
# (#207) ['xxbos','xxmaj','once','again','xxmaj','mr','.','xxmaj','costner','has','dragged','out','a','movie','for','far','longer','than','necessary','.','xxmaj','aside','from','the','terrific','sea','rescue','sequences',',','of','which'...]
  • xxbos:: Indicates the beginning of a text (here, a review)
  • xxmaj:: Indicates the next word begins with a capital (since we lowercased everything)
  • xxunk:: Indicates the word is unknown
Subword Tokenization

它的特点就是词汇量可以被事先规定好。我们取出前两千个文本文件,然后传入subword tokenizer里面,看一看不同的词汇量对与分词有什么影响。

txts = L(o.open(encoding='utf-8').read() for o in files[:2000])
def subword(sz):
    sp = SubwordTokenizer(vocab_sz=sz)
    sp.setup(txts)
    return ' '.join(first(sp([txt]))[:40])

词汇量越大,就会有更多独立的单词被分出来成为一个实体。

第二步:Numericalize

toks200 = txts[:200].map(tkn)
toks200[0]
# (#207) ['xxbos','xxmaj','once','again','xxmaj','mr','.','xxmaj','costner','has','dragged','out','a','movie','for','far','longer','than','necessary','.'...]

我们先多准备一点文本材料。注意txts是两千个文本文件的集合,它具有能够map的特性,就是把传入的函数对每个元素都处理一遍,得到新的列表。

num = Numericalize()
num.setup(toks200)
coll_repr(num.vocab,20)
# "(#1968) ['xxunk','xxpad','xxbos','xxeos','xxfld','xxrep','xxwrep','xxup','xxmaj','the','.',',','a','and','of','to','is','it','i','in'...]"

然后我们直接把toks200塞进Numericalize对象里面,就可以得到一个语料库。现在,num可以被我们当成一个函数,传给它一个已经被tokenize的对象,它就可以把语料进一步转换为数字。如nums = num(toks)[:20]; nums就会返回TensorText([ 2, 8, 349, 183, 8, 1177, 10, 8, 1178, 60, 1455, 62, 12, 25, 28, 189, 957, 93, 958, 10])别忘了,toks就是我们把一小段txt的文字tokenize的结果,是一个列表。

第三步:Batch

这部分的原理,我没有理解透彻,所以不能细说。

之后再补充吧。

第四步:语言模型构建

get_imdb = partial(get_text_files, folders=['train', 'test', 'unsup'])

dls_lm = DataBlock(
    blocks=TextBlock.from_folder(path, is_lm=True),
    get_items=get_imdb, splitter=RandomSplitter(0.1)
).dataloaders(path, path=path, bs=128, seq_len=80)

这样传入可以获得针对的优化。

learn = language_model_learner(
    dls_lm, AWD_LSTM, drop_mult=0.3, 
    metrics=[accuracy, Perplexity()]).to_fp16()
learn.fit_one_cycle(1, 2e-2)
learn.unfreeze()
learn.fit_one_cycle(10, 2e-3)

第五步:语言分类器

dls_clas = DataBlock(
    blocks=(TextBlock.from_folder(path, vocab=dls_lm.vocab),CategoryBlock),
    get_y = parent_label,
    get_items=partial(get_text_files, folders=['train', 'test']),
    splitter=GrandparentSplitter(valid_name='test')
).dataloaders(path, path=path, bs=128, seq_len=72)
learn = text_classifier_learner(dls_clas, AWD_LSTM, drop_mult=0.5, 
                                metrics=accuracy).to_fp16()
learn = learn.load_encoder('finetuned')
learn.fit_one_cycle(1, 2e-2)
learn.freeze_to(-2)
learn.fit_one_cycle(1, slice(1e-2/(2.6**4),1e-2))
learn.freeze_to(-3)
learn.fit_one_cycle(1, slice(5e-3/(2.6**4),5e-3))
learn.unfreeze()
learn.fit_one_cycle(2, slice(1e-3/(2.6**4),1e-3))

之所以使用GrandparentSplitter,是因为有neg/pos的文件夹,再外面才是train/test

我们既使用discriminative learning rates,又引入了一个新技巧:逐渐解冻技术。

随着训练的深入,我们允许模型改动更加底层的权重,来达到最优表现。

CNN,卷积神经网络

这方面的基本知识我先跳过,只讲一些我不太熟悉的新知识。

通道

多通道输入:假如图片是3x28x28的大小,那么我们管这个叫做多通道的输入。因此,我们定义的卷积核应该是3x5x5或者类似的大小,总之需要是三维的。我们让每一层的卷积核和每一层的图片进行点乘,然后把结果相加。这就是我们应对多通道输入的办法。

多通道输出:我们把卷积核的形状变成四维的,也就是多个卷积核叠加在一起。

1x1卷积层

起到点乘压缩通道数的作用。

池化(Pooling)

汇聚层的输出通道数和输入通道数相同。

自适应池化(Adaptive Pooling)

PyTorch有一个特性:

pool2d = nn.AdaptiveAvgPool2d(1)
img = torch.randn(2, 3, 8, 8) 
pool2d(img).shape # torch.Size([2, 3, 1, 1])

这段代码的含义是,我希望经过这个池化层处理后的数据具有1x1的形状,然后让PyTorch去自己决定池化的窗口大小之类的,非常智能。需要注意的是,也许乍一看这种动态处理数据的方法显得很奇怪,但是我们必须明确:池化层不包含任何神经网络需要学习的参数,它只是一种对数据的固定操作,用于总结神经网络的信号罢了!因此,卷积神经网络才能处理各种各样不同大小的图片,还做到保持一致的准确率。

class AdaptiveConcatPool2d(Module):
    "Layer that concats `AdaptiveAvgPool2d` and `AdaptiveMaxPool2d`"
    def __init__(self, size=None):
        self.size = size or 1
        self.ap = nn.AdaptiveAvgPool2d(self.size)
        self.mp = nn.AdaptiveMaxPool2d(self.size)
    def forward(self, x): return torch.cat([self.mp(x), self.ap(x)], 1)

在这里塞进去一些零碎的知识:torch.cat。首先我们注意到AvgPool和MaxPool产生的结果形状相同,都是(Batch, Channels, *Size)。然后我们让torch在dim=1上进行拼接,那么Channels就会变成原来的2倍而其他维度不变罢了!

创建这个类的意义显然在于增加后面全连接层能够利用的有效信息。

AlexNet的架构

net = nn.Sequential(
    # 这里使用一个11*11的更大窗口来捕捉对象。
    # 同时,步幅为4,以减少输出的高度和宽度。
    # 另外,输出通道的数目远大于LeNet
    nn.Conv2d(1, 96, kernel_size=11, stride=4, padding=1), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    # 减小卷积窗口,使用填充为2来使得输入与输出的高和宽一致,且增大输出通道数
    nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    # 使用三个连续的卷积层和较小的卷积窗口。
    # 除了最后的卷积层,输出通道的数量进一步增加。
    # 在前两个卷积层之后,汇聚层不用于减少输入的高度和宽度
    nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.ReLU(),
    nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.ReLU(),
    nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Flatten(),
    # 这里,全连接层的输出数量是LeNet中的好几倍。使用dropout层来减轻过拟合
    nn.Linear(6400, 4096), nn.ReLU(),
    nn.Dropout(p=0.5),
    nn.Linear(4096, 4096), nn.ReLU(),
    nn.Dropout(p=0.5),
    # 最后是输出层。由于这里使用Fashion-MNIST,所以用类别数为10,而非论文中的1000
    nn.Linear(4096, 10))

VGG块设计

我们把卷积层、池化层和激活函数拼在一起,组成一个块:

def vgg_block(num_convs, in_channels, out_channels):
    layers = []
    for _ in range(num_convs):
        layers.append(nn.Conv2d(in_channels, out_channels,
                                kernel_size=3, padding=1))
        layers.append(nn.ReLU())
        in_channels = out_channels
    layers.append(nn.MaxPool2d(kernel_size=2,stride=2))
    return nn.Sequential(*layers)

VGG blocks introduced a key architectural insight: using multiple small convolutional filters (3×3) stacked together is more effective than using one large filter.

Network in Network

善用1x1的核。这相当于在每个像素的通道上设置一个多层感知机!这样,我们就不会担心在多层感知机使用时丢失图片的空间位置信息。

def nin_block(in_channels, out_channels, kernel_size, strides, padding):
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size, strides, padding),
        nn.ReLU(),
        nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU(),
        nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU())

真正设置网络的时候可以完全取消后面的全连接层:

net = nn.Sequential(
    nin_block(1, 96, kernel_size=11, strides=4, padding=0),
    nn.MaxPool2d(3, stride=2),
    nin_block(96, 256, kernel_size=5, strides=1, padding=2),
    nn.MaxPool2d(3, stride=2),
    nin_block(256, 384, kernel_size=3, strides=1, padding=1),
    nn.MaxPool2d(3, stride=2),
    nn.Dropout(0.5),
    # 标签类别数是10
    nin_block(384, 10, kernel_size=3, strides=1, padding=1),
    nn.AdaptiveAvgPool2d((1, 1)),
    # 将四维的输出转成二维的输出,其形状为(批量大小,10)
    nn.Flatten())

并行联结网络:GoogLeNet

结论:有时候使用不同大小的卷积核是有利的。

class Inception(nn.Module):
    # c1--c4是每条路径的输出通道数
    def __init__(self, in_channels, c1, c2, c3, c4, **kwargs):
        super(Inception, self).__init__(**kwargs)
        # 线路1,单1x1卷积层
        self.p1_1 = nn.Conv2d(in_channels, c1, kernel_size=1)
        # 线路2,1x1卷积层后接3x3卷积层
        self.p2_1 = nn.Conv2d(in_channels, c2[0], kernel_size=1)
        self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
        # 线路3,1x1卷积层后接5x5卷积层
        self.p3_1 = nn.Conv2d(in_channels, c3[0], kernel_size=1)
        self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
        # 线路4,3x3最大汇聚层后接1x1卷积层
        self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.p4_2 = nn.Conv2d(in_channels, c4, kernel_size=1)

    def forward(self, x):
        p1 = F.relu(self.p1_1(x))
        p2 = F.relu(self.p2_2(F.relu(self.p2_1(x))))
        p3 = F.relu(self.p3_2(F.relu(self.p3_1(x))))
        p4 = F.relu(self.p4_2(self.p4_1(x)))
        # 在通道维度上连结输出
        return torch.cat((p1, p2, p3, p4), dim=1)

网络架构是并行的,这样可以让模型捕获到图片的不同细节并建立联系。

我们把Inception Block作为网络的一个组成部分,形成如下非常复杂的网络:

b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
                   nn.ReLU(),
                   nn.Conv2d(64, 192, kernel_size=3, padding=1),
                   nn.ReLU(),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
                   Inception(256, 128, (128, 192), (32, 96), 64),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
                   Inception(512, 160, (112, 224), (24, 64), 64),
                   Inception(512, 128, (128, 256), (24, 64), 64),
                   Inception(512, 112, (144, 288), (32, 64), 64),
                   Inception(528, 256, (160, 320), (32, 128), 128),
                   nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
                   Inception(832, 384, (192, 384), (48, 128), 128),
                   nn.AdaptiveAvgPool2d((1,1)),
                   nn.Flatten())

net = nn.Sequential(b1, b2, b3, b4, b5, nn.Linear(1024, 10))

ResNet、模型参数检查和数据增强

我们在前面的“训练过程”部分已经提到了卷积神经网络的原理和优化过程,而Resnet是CNN的一种变体。它通过跳跃连接来增强模型在层数很多的情况下的表现,具体实现方法如下:

def conv(ni, nf, ks=3, stride=2, act=nn.ReLU, norm=None, bias=None):
    if bias is None: bias = not isinstance(norm, (nn.BatchNorm1d,nn.BatchNorm2d,nn.BatchNorm3d))
    layers = [nn.Conv2d(ni, nf, stride=stride, kernel_size=ks, padding=ks//2, bias=bias)]
    if norm: layers.append(norm(nf))
    if act: layers.append(act())
    return nn.Sequential(*layers)
def _conv_block(ni, nf, stride, act=act_gr, norm=None, ks=3):
    return nn.Sequential(conv(ni, nf, stride=1, act=act, norm=norm, ks=ks),
                         conv(nf, nf, stride=stride, act=None, norm=norm, ks=ks))
class ResBlock(nn.Module):
    def __init__(self, ni, nf, stride=1, ks=3, act=act_gr, norm=None):
        super().__init__()
        self.convs = _conv_block(ni, nf, stride, act=act, ks=ks, norm=norm)
        self.idconv = fc.noop if ni==nf else conv(ni, nf, ks=1, stride=1, act=None)
        self.pool = fc.noop if stride==1 else nn.AvgPool2d(2, ceil_mode=True)
        self.act = act()
    def forward(self, x): return self.act(self.convs(x) + self.idconv(self.pool(x)))

Resnet的一个基本单元前向传播的方式就是:输入先通过conv_block中的两层卷积神经网络,其中第一个卷积网络的跨度是1,这样保证了图像大小的稳定性;第二个网络的输入与输出频道数相等,总之,我们尝试用两个网络来代替原先的一个网络,在增加复杂度的情况下使模型体系的总体结构不变。

然后我们需要把输入本身加在两层卷积输出的结果上。如果卷积的输出与刚才的输入形状不同怎么办?有两种可能:频道数目不同,那么我们就通过1x1卷积来放缩;如果convs的跨度是2,我们就给idconv加一个池化层,让两个张量的形状相同就行!最后把这个结果用激活函数处理一下就好了。

模型改进与化简

然后这样创建一个模型:

# 之前的,没有ResBlock版本
def get_model(act=nn.ReLU, nfs=(8,16,32,64,128), norm=nn.BatchNorm2d):
    layers = [conv(1, 8, stride=1, act=act, norm=norm)]
    layers += [conv(nfs[i], nfs[i+1], act=act, norm=norm) for i in range(len(nfs)-1)]
    return nn.Sequential(*layers, conv(nfs[-1], 10, act=None, norm=norm, bias=True), 	         nn.Flatten()).to(def_device)

# 现在
def get_model(act=nn.ReLU, nfs=(8,16,32,64,128,256), norm=nn.BatchNorm2d):
    layers = [ResBlock(1, 8, stride=1, act=act, norm=norm)]
    layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
    layers += [nn.Flatten(), nn.Linear(nfs[-1], 10, bias=False), nn.BatchNorm1d(10)]
    return nn.Sequential(*layers).to(def_device)

因为这部分的代码比较具有迷惑性,所以就重点解读一下:

  • stride=1那就图像大小不变,stride=2则图像边长减半。最后到Flatten层时图像已经被压缩到1x1大小和256个通道。
  • 最后加上了BatchNorm1d来控制激活值的范围,并且代替上一个线性层的偏置参数,增加训练稳定性!

但是这种网络结构已经相当复杂了。我们可以用钩子来看一批次数据从中流动的情况。

def _print_shape(hook, mod, inp, outp): print(type(mod).__name__, inp[0].shape, outp.shape)
model = get_model()
learn = TrainLearner(model, dls, F.cross_entropy, cbs=[DeviceCB(), SingleBatchCB()])
with Hooks(model, _print_shape) as hooks: learn.fit(1, train=False)

有另一种更好的方法可以让模型的信息一目了然:

@fc.patch
def summary(self:Learner):
    res = '|Module|Input|Output|Num params|\n|--|--|--|--|\n'
    tot = 0
    def _f(hook, mod, inp, outp):
        nonlocal res,tot
        nparms = sum(o.numel() for o in mod.parameters())
        tot += nparms
        res += f'|{type(mod).__name__}|{tuple(inp[0].shape)}|{tuple(outp.shape)}|{nparms}|\n'
    with Hooks(self.model, _f) as hooks: self.fit(1, lr=1, train=False, cbs=SingleBatchCB())
    print("Tot params: ", tot)
    if fc.IN_NOTEBOOK:
        from IPython.display import Markdown
        return Markdown(res)
    else: print(res)
TrainLearner(get_model(), dls, F.cross_entropy, cbs=DeviceCB()).summary()
Module Input Output Num params
ResBlock (2048, 1, 28, 28) (2048, 8, 28, 28) 712
ResBlock (2048, 8, 28, 28) (2048, 16, 14, 14) 3696
ResBlock (2048, 16, 14, 14) (2048, 32, 7, 7) 14560
ResBlock (2048, 32, 7, 7) (2048, 64, 4, 4) 57792
ResBlock (2048, 64, 4, 4) (2048, 128, 2, 2) 230272
ResBlock (2048, 128, 2, 2) (2048, 256, 1, 1) 919296
Flatten (2048, 256, 1, 1) (2048, 256) 0
Linear (2048, 256) (2048, 10) 2560
BatchNorm1d (2048, 10) (2048, 10) 20

如何进一步提升模型的表现呢?可以增大模型的广度——把nfs改成(16,32,64,128,256,512)(当然,对应的get_model函数也需要改),但是这会大大增加模型的参数量,最后一个卷积层有将近四百万个参数。我们还可以在参数量不变的情况下增加计算量(通过设置stride=1),最后用池化层压缩信息。来看下一个版本的get_model

class GlobalAvgPool(nn.Module):
    def forward(self, x): return x.mean((-2,-1))
def get_model2(act=nn.ReLU, nfs=(16,32,64,128,256), norm=nn.BatchNorm2d):
    layers = [ResBlock(1, 16, ks=5, stride=1, act=act, norm=norm)]
    layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
    layers += [ResBlock(256, 512, act=act, norm=norm), GlobalAvgPool()]
    layers += [nn.Linear(512, 10, bias=False), nn.BatchNorm1d(10)]
    return nn.Sequential(*layers)

GlobalAvgPool把最后的2x2信息压缩成了1x1,所以可以直接通线性层。ResBlock默认的stride是1,所以模型的层数并没有变化。

同时,我们更新原先的summary()函数,让它可以获得每一层对应的计算量FLOPS。计算的原理是:对于线性层和BatchNorm层等,每个参数在计算时仅会被用到一次,所以用x.numel()就可以。但是卷积层的每个参数会被应用在所有的输出元素上——比如说stride=2,输入是14x14的图片,那么每个参数就会被用7*7=49次,这正好是输出张量的元素个数!

def _flops(x, h, w):
    if x.dim()<3: return x.numel()
    if x.dim()==4: return x.numel()*h*w
@fc.patch
def summary(self:Learner):
    res = '|Module|Input|Output|Num params|MFLOPS|\n|--|--|--|--|--|\n'
    totp,totf = 0,0
    def _f(hook, mod, inp, outp):
        nonlocal res,totp,totf
        nparms = sum(o.numel() for o in mod.parameters())
        totp += nparms
        *_,h,w = outp.shape
        flops = sum(_flops(o, h, w) for o in mod.parameters())/1e6
        totf += flops
        res += f'|{type(mod).__name__}|{tuple(inp[0].shape)}|{tuple(outp.shape)}|{nparms}|{flops:.1f}|\n'
    with Hooks(self.model, _f) as hooks: self.fit(1, lr=1, cbs=SingleBatchCB())
    print(f"Tot params: {totp}; MFLOPS: {totf:.1f}")
    if fc.IN_NOTEBOOK:
        from IPython.display import Markdown
        return Markdown(res)
    else: print(res)
Module Input Output Num params MFLOPS
ResBlock (1024, 1, 28, 28) (1024, 16, 28, 28) 6928 5.3
ResBlock (1024, 16, 28, 28) (1024, 32, 14, 14) 14560 2.8
ResBlock (1024, 32, 14, 14) (1024, 64, 7, 7) 57792 2.8
ResBlock (1024, 64, 7, 7) (1024, 128, 4, 4) 230272 3.7
ResBlock (1024, 128, 4, 4) (1024, 256, 2, 2) 919296 3.7
ResBlock (1024, 256, 2, 2) (1024, 512, 2, 2) 3673600 14.7
GlobalAvgPool (1024, 512, 2, 2) (1024, 512) 0 0.0
Linear (1024, 512) (1024, 10) 5120 0.0
BatchNorm1d (1024, 10) (1024, 10) 20 0.0

可以观察到,第一层和最后一个ResBlock是计算量最大的两层。如果我们能够精简这一部分的计算,不仅能够提高训练速度,而且表现很有可能不降反升。比如说,我们去掉那个原先的256到512的残差网络层,就会省掉很多参数。

第二个技巧,针对最开始:没必要一开始就使用残差网络。我们把ResBlock改成普通的conv,这样计算量减小很多!

def get_model4(act=nn.ReLU, nfs=(16,32,64,128,256), norm=nn.BatchNorm2d):
    layers = [conv(1, 16, ks=5, stride=1, act=act, norm=norm)]
    layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
    layers += [GlobalAvgPool(), nn.Linear(256, 10, bias=False), nn.BatchNorm1d(10)]
    return nn.Sequential(*layers)
Module Input Output Num params MFLOPS
Sequential (1024, 1, 28, 28) (1024, 16, 28, 28) 448 0.3
ResBlock (1024, 16, 28, 28) (1024, 32, 14, 14) 14560 2.8
ResBlock (1024, 32, 14, 14) (1024, 64, 7, 7) 57792 2.8
ResBlock (1024, 64, 7, 7) (1024, 128, 4, 4) 230272 3.7
ResBlock (1024, 128, 4, 4) (1024, 256, 2, 2) 919296 3.7
GlobalAvgPool (1024, 256, 2, 2) (1024, 256) 0 0.0
Linear (1024, 256) (1024, 10) 2560 0.0
BatchNorm1d (1024, 10) (1024, 10) 20 0.0

于是,参数量从4M下降到了1.2M,MFLOPS从30下降到了13,但是模型的实际表现并无差异。(92.8%准确率)

想继续提升表现就要使用数据增强。

数据增强

RandomCrop
from torchvision import transforms
def tfm_batch(b, tfm_x=fc.noop, tfm_y = fc.noop): return tfm_x(b[0]),tfm_y(b[1])
tfms = nn.Sequential(transforms.RandomCrop(28, padding=4),
                     transforms.RandomHorizontalFlip())
augcb = BatchTransformCB(partial(tfm_batch, tfm_x=tfms), on_val=False)
model = get_model()
learn = TrainLearner(model, dls, F.cross_entropy, lr=lr, cbs=[SingleBatchCB(), augcb])
learn.fit(1)
xb,yb = learn.batch
show_images(xb[:16], imsize=1.5)

这里RandomCrop做的是:先把图片边长拓展4,然后再随机截取28x28的区域。在learn.fit(1)之后,就可以直接获得这一个批次的数据。这个功能如此常见和有用,我们就把它进一步包装成learner的一个方法!

@fc.patch
@fc.delegates(show_images)
def show_image_batch(self:Learner, max_n=9, cbs=None, **kwargs):
    self.fit(1, cbs=[SingleBatchCB()]+fc.L(cbs))
    show_images(self.batch[0][:max_n], **kwargs)
Test Time Augmentation

为了实现这个,我们需要先捕获模型训练中的预测值:

class CapturePreds(Callback):
    def before_fit(self, learn): self.all_inps,self.all_preds,self.all_targs = [],[],[]
    def after_batch(self, learn):
        self.all_inps. append(to_cpu(learn.batch[0]))
        self.all_preds.append(to_cpu(learn.preds))
        self.all_targs.append(to_cpu(learn.batch[1]))
    def after_fit(self, learn):
        self.all_preds,self.all_targs,self.all_inps = map(torch.cat, 
                                                          [self.all_preds,self.all_targs,self.all_inps])
@fc.patch
def capture_preds(self: Learner, cbs=None, inps=False):
    cp = CapturePreds()
    self.fit(1, train=False, cbs=[cp]+fc.L(cbs))
    res = cp.all_preds,cp.all_targs
    if inps: res = res+(cp.all_inps,)
    return res

然后,我们把图片水平翻转后的激活值与原先的激活值取平均:

ttacb = BatchTransformCB(partial(tfm_batch, tfm_x=TF.hflip), on_val=True)
ap2, at = learn.capture_preds(cbs=[ttacb])
ap = torch.stack([ap1,ap2]).mean(0).argmax(1)
Random Erase/Copy

作为另一种数据增强的方法,我们可以让图像有一些噪声,提升模型泛化的能力。

def _rand_erase1(x, pct, xm, xs, mn, mx):
    szx = int(pct*x.shape[-2])
    szy = int(pct*x.shape[-1])
    stx = int(random.random()*(1-pct)*x.shape[-2])
    sty = int(random.random()*(1-pct)*x.shape[-1])
    init.normal_(x[:,:,stx:stx+szx,sty:sty+szy], mean=xm, std=xs)
    x.clamp_(mn, mx)
def rand_erase(x, pct=0.2, max_num = 4):
    xm,xs,mn,mx = x.mean(),x.std(),x.min(),x.max()
    num = random.randint(0, max_num)
    for i in range(num): _rand_erase1(x, pct, xm, xs, mn, mx)
    return x
class RandErase(nn.Module):
    def __init__(self, pct=0.2, max_num=4):
        super().__init__()
        self.pct,self.max_num = pct,max_num
    def forward(self, x): return rand_erase(x, self.pct, self.max_num)
tfms = nn.Sequential(transforms.RandomCrop(28, padding=1),
                     transforms.RandomHorizontalFlip(),
                     RandErase())
augcb = BatchTransformCB(partial(tfm_batch, tfm_x=tfms), on_val=False)

可以让模型的准确率提升到95%。但是噪声总是会让图片产生一些违和感,不如直接把图像的一块剪下来贴到另一块上。

def _rand_copy1(x, pct):
    szx = int(pct*x.shape[-2])
    szy = int(pct*x.shape[-1])
    stx1 = int(random.random()*(1-pct)*x.shape[-2])
    sty1 = int(random.random()*(1-pct)*x.shape[-1])
    stx2 = int(random.random()*(1-pct)*x.shape[-2])
    sty2 = int(random.random()*(1-pct)*x.shape[-1])
    x[:,:,stx1:stx1+szx,sty1:sty1+szy] = x[:,:,stx2:stx2+szx,sty2:sty2+szy]
def rand_copy(x, pct=0.2, max_num = 4):
    num = random.randint(0, max_num)
    for i in range(num): _rand_copy1(x, pct)
    return x
class RandCopy(nn.Module):
    def __init__(self, pct=0.2, max_num=4):
        super().__init__()
        self.pct,self.max_num = pct,max_num
    def forward(self, x): return rand_copy(x, self.pct, self.max_num)
Dropout

原理很简单,我直接放上代码:

def get_dropmodel(act=nn.ReLU, nfs=(16,32,64,128,256,512), norm=nn.BatchNorm2d, drop=0.0):
    layers = [ResBlock(1, 16, ks=5, stride=1, act=act, norm=norm), nn.Dropout2d(drop)]
    layers += [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2) for i in range(len(nfs)-1)]
    layers += [nn.Flatten(), Dropout(drop), nn.Linear(nfs[-1], 10, bias=False), nn.BatchNorm1d(10)]
    return nn.Sequential(*layers)

我们甚至可以在测试的时候使用Dropout,来判断模型对于样本的置信度。

class TTD_CB(Callback):
    def before_epoch(self, learn):
        learn.model.apply(lambda m: m.train() if isinstance(m, (nn.Dropout,nn.Dropout2d)) else None)

但是这种方法用的很少。

Stable Diffusion

这是一种用AI生成图片的算法。我目前并不是对这方面很精通,所以只能一步一步来,先把我会的所有东西都写在这里,最后在组织成一个完整的架构。

Unconditional DDPM

DDPM是Denoising Diffusion Probablistic Model的缩写。这个模型的作用是:给你一堆真实世界中的图片,这些图片像素的分布肯定遵循一定的规律。于是,我们给一个图片加上一定幅度的噪音之后,只要告诉模型噪音服从的分布,就可以让模型预测这个噪音,从而对图像“去噪”。DDPM背后的假设是,如果一开始给一个纯噪音,我们就认为这是一张图片层层加噪声(成为Forward Process)产生的——于是我们通过层层去除噪声(称为Reverse Process)的过程,就可以得到一张足够好的图片!

添加噪声

涉及的数学很多,我下面尽量讲明白。加入噪声时,新的图片相对于前一张图片都是一个正态分布:信号被弱化,然后加入了一部分噪声。 \(q(x_t|x_{t-1}) := \mathcal{N}(x_t; \sqrt{1-\beta_t}x_{t-1}, \beta_tI)\) 这个式子看上去有些麻烦,我们可以进行一小步改写,这和上面的式子是等价的,$\epsilon$是符合标准正态分布的噪声。 \(x_t = \sqrt{1-\beta_t}x_{t-1}+\sqrt{\beta_t}\epsilon_{t-1}\) 显然我们训练模型去识别这些噪声的时候,不可能对每个样本都一步一步的运行这个公式,这样太慢了!因为这个式子足够简单,我们可以直接找到$x_t$与$x_0$的关系。因为数学推导比较简单,我就省去过程,结果是:令$\alpha_t = 1-\beta_t$,最终有 \(x_t = \sqrt{\alpha_t\alpha_{t-1}...\alpha_1}x_0+\sqrt{1-\alpha_t\alpha_{t-1}...\alpha_1}\) 所以说,下面的代码就是上面逻辑的复刻。可以看出来$\sqrt{\beta_t}$是我们在每一步加入噪声的大小,这个噪声的方差是线性增加的。然后我们用alphabar来计算$\alpha$的累乘。

betamin,betamax,n_steps = 0.0001,0.02,1000
beta = torch.linspace(betamin, betamax, n_steps)
alpha = 1.-beta
alphabar = alpha.cumprod(dim=0)
sigma = beta.sqrt()

接下来在观察我们给图片加噪声的过程。对于一批图片$x_0$,我们给每一个图片生成1到1000的随机数timestep代表噪声的程度,然后按照timestep确定噪声规模,最后按照公式添加噪声,就这么些东西。

def noisify(x0, ):
    device = x0.device
    n = len(x0) # number of instances
    t = torch.randint(0, n_steps, (n,), dtype=torch.long)
    ε = torch.randn(x0.shape, device=device)
    ᾱ_t = [t].reshape(-1, 1, 1, 1).to(device)
    xt = ᾱ_t.sqrt()*x0 + (1-ᾱ_t).sqrt()*ε
    return (xt, t.to(device)), ε
贝叶斯公式

在理解怎么去除噪声之前,先看这个涉及贝叶斯公式的例子。

你想称量一只小狗的重量。你知道它的重量大约是10kg,但是不是特别确定——所以在你看来,狗的体重符合均值为10,方差为4的正态分布。然后你又把狗用仪器测量了体重,但是这个仪器也有点不准,它给出的得数也是一个正态分布,均值就是小狗的真实体重,但是有1的方差。

问题来了:你称量小狗之后,小狗的体重分布变成了什么?

清晰解题的关键在于用形式化的数学把问题表示出来。狗的体重$w \sim \mathcal{N}(10, 4) $,然后$P(read \ 12 w) \propto exp(-\frac{(w-12)^2}{2})$,也是一个正态分布。
那么根据贝叶斯公式,$P(w read \ 12) = \frac{P(read \ 12 w)P(w)}{P(read \ 12)}$,因为最后会统一归一化为1,所以不管常系数,只看分子的两个表达式相乘!两个exp内的表达式相加,得到另一个二次函数,配方之后得到$\frac{5}{8}!\left(w - \frac{58}{5}\right)^2 + \text{const}$,从而得到新的正态分布$w \text{read 12} \sim \mathcal{N}(11.6,\; 0.8)$。
降噪

如果我们知道$x_t$和$x_0$,那么我们就能得到$x_{t-1}$的分布。 \(q(\mathbf{x}_{t-1}|\mathbf{x}_t, \mathbf{x}_0) = \frac{q(\mathbf{x}_t|\mathbf{x}_{t-1},\mathbf{x}_0)\;\cdot\;q(\mathbf{x}_{t-1}|\mathbf{x}_0)}{q(\mathbf{x}_t|\mathbf{x}_0)}\) 由于分母和$x_{t-1}$无关,所以可以直接扔掉,最后统一归一化。对于分子,我们只需要把两个表达式相乘,和上面的例子一样,得到一个新的正态分布!把指数上的部分相加,表达式是: \(-\frac{(\mathbf{x}_t - \sqrt{\alpha_t}\,\mathbf{x}_{t-1})^2}{2\beta_t} - \frac{(\mathbf{x}_{t-1} - \sqrt{\bar{\alpha}_{t-1}}\,\mathbf{x}_0)^2}{2(1-\bar{\alpha}_{t-1})}\) 然后我们把这个新分布的方差和均值分开,可以得到下面两项: \(\tilde{\beta}_t = \frac{\beta_t(1-\bar{\alpha}_{t-1})}{1-\bar{\alpha}_t} \\ \tilde{\boldsymbol{\mu}}_t(\mathbf{x}_t, \mathbf{x}_0) = \frac{\sqrt{\alpha_t}(1-\bar{\alpha}_{t-1})}{1-\bar{\alpha}_t}\,\mathbf{x}_t + \frac{\sqrt{\bar{\alpha}_{t-1}}\,\beta_t}{1-\bar{\alpha}_t}\,\mathbf{x}_0\) 说白了,如果知道$x_0$,那么$x_{t-1}$就服从 \(q(\mathbf{x}_{t-1}|\mathbf{x}_t, \mathbf{x}_0) = \mathcal{N}(\mathbf{x}_{t-1};\; \tilde{\boldsymbol{\mu}}_t,\; \tilde{\beta}_t\mathbf{I})\) 这样的分布。但是关键在于我们并不知道什么是初始的图片。不过这个问题也可以解决——我们有一步从$x_0$到$x_t$的公式,因为噪声已经预测好了,所以可以直接带回去得到一个我们预测的$x_0$!

这个式子就是 \(\hat{\mathbf{x}}_0 = \frac{\mathbf{x}_t - \sqrt{1-\bar{\alpha}_t}\,\boldsymbol{\epsilon}_\theta}{\sqrt{\bar{\alpha}_t}}\) 所以对应的代码做的事情就是:计算原先的图片是什么样子的,然后把各种系数带进去计算上一步的图片是什么样子。做1000次这样的降噪,就可以得到一个清晰的图片!

@torch.no_grad()
def sample(model, sz, alpha, alphabar, sigma, n_steps):
    device = next(model.parameters()).device
    x_t = torch.randn(sz, device=device)
    preds = []
    for t in reversed(range(n_steps)):
        # x_t.shape[0] is the number of images to generate
        t_batch = torch.full((x_t.shape[0],), t, device=device, dtype=torch.long)
        # generate noise that is the same shape as x_t
        z = (torch.randn(x_t.shape) if t > 0 else torch.zeros(x_t.shape)).to(device)
        ᾱ_t1 = alphabar[t-1]  if t > 0 else torch.tensor(1)
        b̄_t = 1 - alphabar[t]
        b̄_t1 = 1 - ᾱ_t1
        x_0_hat = ((x_t - b̄_t.sqrt() * learn.model((x_t, t_batch)))/alphabar[t].sqrt()).clamp(-1,1)
        x_t = x_0_hat * ᾱ_t1.sqrt()*(1-alpha[t])/b̄_t + x_t * alpha[t].sqrt()*b̄_t1/b̄_t + sigma[t]*z
        preds.append(x_t.cpu())
    return preds

下面讲具体如何训练和获取图片。block_out_channels规定的是卷积神经网络在每一层的通道数。训练需要一个好的显卡,我用Macbook Air训练的话,一个Epoch需要七分钟左右,而且CPU温度会飙到90度,所以我就只训练了两个Epoch就停止了,结果是有的图片仍然基本全是噪声,但是效果已经显现出来了。

还有一个小的地方可以注意:在创建UNet模型的时候可以添加参数norm_num_groups=8,因为这个模型正则化使用的是GroupNorm。只需要注意这个分的组数一定要能够被block_out_channels的所有数整除。

from diffusers import UNet2DModel
model = UNet2DModel(in_channels=1, out_channels=1, block_out_channels=(32, 64, 128, 128))
lr = 4e-3
epochs = 5
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
ddpm_cb = DDPMCB(n_steps=1000, beta_min=0.0001, beta_max=0.02)
cbs = [ddpm_cb, DeviceCB(), ProgressCB(plot=True), MetricsCB(), BatchSchedCB(sched)]
learn = Learner(model, dls, nn.MSELoss(), lr=lr, cbs=cbs, opt_func=optim.Adam)
learn.fit(epochs)
samples = ddpm_cb.sample(learn.model, (16, 1, 32, 32))
show_images(-samples[-1], figsize=(5,5))

另外,为了提高模型表现,可以通过下面的方法初始化model。本质上就是先通过把各种层的权重变成零来让整个模型的结构很简单,然后在训练的过程中再慢慢加入复杂度。使用正交初始化可以保持信号的范数不变,从而阻止在前向传播的过程中出现爆炸或者消失现象。模型最开始会一直预测没有噪声。

def init_ddpm(model):
    for o in model.down_blocks:
        for p in o.resnets:
            p.conv2.weight.data.zero_()
            for p in fc.L(o.downsamplers): init.orthogonal_(p.conv.weight)
    for o in model.up_blocks:
        for p in o.resnets: p.conv2.weight.data.zero_()
    model.conv_out.weight.data.zero_()
init_ddpm(model)

然后我们可以把扩散过程变成一个MP4视频,非常方便。

%matplotlib auto
import matplotlib.animation as animation
from IPython.display import display, HTML

fig,ax = plt.subplots(figsize=(3,3))
def _show_i(i): return show_image(-samples[i][9], ax=ax, animated=True).get_images()
r = L.range(800,990, 5)+L.range(990,1000)+[999]*10
ims = r.map(_show_i)

animate = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=3000)
display(HTML(animate.to_html5_video()))

Mixed Precision训练

上面的训练代码还可以进行改动,不用callback,而是拆分成许多小的函数,比如拆成noisify和sample,然后修改collate_fn就可以完美代替原先的代码。

def collate_ddpm(b): return noisify(default_collate(b)[xl], alphabar)
def dl_ddpm(ds): return DataLoader(ds, batch_size=bs, collate_fn=collate_ddpm, num_workers=4)

为了提高训练速度,在CUDA设备上,我们可以采取混精度训练的方式,把存储的权重从32位浮点数变成16位浮点数。如果想要从头实现的话,可以参考PyTorch官网上的代码范例,我们只需要照葫芦画瓢改一下就能实现。

class MixedPrecision(TrainCB):
    order = DeviceCB.order+10
    def before_fit(self, learn): self.scaler = torch.cuda.amp.GradScaler()
    def before_batch(self, learn):
        self.autocast = torch.autocast("cuda", dtype=torch.float16)
        self.autocast.__enter__()
    def after_loss(self, learn): self.autocast.__exit__(None, None, None)        
    def backward(self, learn): self.scaler.scale(learn.loss).backward()
    def step(self, learn):
        self.scaler.step(learn.opt)
        self.scaler.update()

然后训练的代码也是很简单的。

lr = 1e-2
epochs = 8
tmax = epochs * len(dls.train)
sched = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
cbs = [DeviceCB(), MixedPrecision(), ProgressCB(plot=True), MetricsCB(), BatchSchedCB(sched)]
model = UNet(in_channels=1, out_channels=1, block_out_channels=(16, 32, 64, 128), norm_num_groups=8)
init_ddpm(model)
learn = Learner(model, dls, nn.MSELoss(), lr=lr, cbs=cbs, opt_func=opt_func)
learn.fit(epochs)

不过,自己从头定义MixedPrecision还是有点麻烦,我们可以使用现成的accelerate库来帮我们更加轻松的完成这件事!它会自动帮我们在恰当的时机调整精度,省的写很多代码。

from accelerate import Accelerator
class AccelerateCB(TrainCB):
    order = DeviceCB.order+10
    def __init__(self, n_inp=1, mixed_precision="fp16"):
        super().__init__(n_inp=n_inp)
        self.acc = Accelerator(mixed_precision=mixed_precision)        
    def before_fit(self, learn):
        learn.model,learn.opt,learn.dls.train,learn.dls.valid = self.acc.prepare(
            learn.model, learn.opt, learn.dls.train, learn.dls.valid)
    def backward(self, learn): self.acc.backward(learn.loss)

改变训练细节

  • 在加噪声的过程中,如果噪声是线性增加的,那么信号会减少的非常快。我们可以直接定义信号的保留幅度是一个余弦函数的形状,这样大部分的时间里,图像都会保留一定的信息,有利于提高生成图像的质量。
def abar(t, T): return (t/T*math.pi/2).cos()**2
def cos_sched(n_steps=1000):
    ts = torch.linspace(0, n_steps-1, n_steps)
    ab = abar(ts,n_steps)
    alp = ab/abar(ts-1,n_steps)
    return SimpleNamespace(a=alp, abar=ab, sig=(1-alp).sqrt())
  • 在对比余弦规划和线性噪声规划的图像之后,我们可以考虑把线性噪声规划的最大值改成0.01而不是0.02,这样可以让两种图像更加贴近,斜率更加平滑,也助于提升训练效果。
  • 初始化图片数据的时候,可以考虑减小像素值分布的范围,这样能够提升模型表现!
@inplace
def transformi(b): b[xl] = [F.pad(TF.to_tensor(o), (2,2,2,2))-0.5 for o in b[xl]]

上传训练数据到Weights & Biases

这是一个免费的平台可以让我们存储自己训练的数据,由于我没有办法下载cifar数据集,所以也没有真正运行过这段代码测试效果,不过先摆在这里作为参考。我们只需要复写之前的MetricsCB类,就可以很轻松的添加这样的功能。

import wandb
class WandBCB(MetricsCB):
    order=100
    def __init__(self, config, *ms, project='ddpm_cifar10', **metrics):
        fc.store_attr()
        super().__init__(*ms, **metrics)
    def before_fit(self, learn): wandb.init(project=self.project, config=self.config)
    def after_fit(self, learn): wandb.finish()
    def _log(self, d): 
        if self.train: 
            wandb.log({'train_'+m:float(d[m]) for m in self.all_metrics})
        else: 
            wandb.log({'val_'+m:float(d[m]) for m in self.all_metrics})
            wandb.log({'samples':self.sample_figure(learn)})
        print(d)        
    def sample_figure(self, learn):
        with torch.no_grad():
            samples = sample(learn.model, (16, 3, 32, 32))
        s = (samples[-1] + 0.5).clamp(0,1)
        plt.clf()
        fig, axs = get_grid(16)
        for im,ax in zip(s[:16], axs.flat): show_image(im, ax=ax)
        return fig
    def after_batch(self, learn):
        super().after_batch(learn) 
        wandb.log({'loss':learn.loss})

计算生成图像的仿真度:FID和KID

FIDFréchet Inception Distance的简称,它的核心思想是,我们已经有了一个非常擅长对Fashion MNIST数据集分类的模型,其准确率可以达到94.5%。那么它在最后几层肯定提取到了一些图片有用的特征和细节,所以我们对真实图片和生成图片各自取这一层的激活值,然后定量看它们分布的差异,差别越小,生成的图片就越好! \(\text{FID} = \|\mu_1 - \mu_2\|^2 + \text{tr}(\Sigma_1) + \text{tr}(\Sigma_2) - 2\,\text{tr}\!\left((\Sigma_1 \Sigma_2)^{1/2}\right)\) 唯一的难点就是计算矩阵的平方根——这个需要线性代数工具,自己用牛顿法实现太麻烦。

def _calc_stats(feats):
    feats = feats.squeeze()
    return feats.mean(0),feats.T.cov()

def _calc_fid(m1,c1,m2,c2):
#     csr = _sqrtm_newton_schulz(c1@c2)
    csr = tensor(linalg.sqrtm(c1@c2, 256).real)
    return (((m1-m2)**2).sum() + c1.trace() + c2.trace() - 2*csr.trace()).item()

FID的特点是会和我们的取样有关,样本越多,计算出的值就越准确。同时,这个FID只能供我们自己比较使用。

然后我们再看KID,它是Kernel Inception Distance。它会给出无偏估计,但是有很大的方差,所以可能不是很准确。 $$ k(\mathbf{a}, \mathbf{b}) = \left(\frac{\mathbf{a} \cdot \mathbf{b}}{d} + 1\right)^3 \

\text{MMD}^2 = \frac{1}{m(m-1)}\sum_{i \neq j} k(x_i, x_j) + \frac{1}{n(n-1)}\sum_{i \neq j} k(y_i, y_j) - \frac{2}{mn}\sum_{i,j} k(x_i, y_j) $$

def _squared_mmd(x, y):
    def k(a,b): return (a@b.transpose(-2,-1)/a.shape[-1]+1)**3
    m,n = x.shape[-2],y.shape[-2]
    kxx,kyy,kxy = k(x,x), k(y,y), k(x,y)
    kxx_sum = kxx.sum([-1,-2])-kxx.diagonal(0,-1,-2).sum(-1)
    kyy_sum = kyy.sum([-1,-2])-kyy.diagonal(0,-1,-2).sum(-1)
    kxy_sum = kxy.sum([-1,-2])
    return kxx_sum/m/(m-1) + kyy_sum/n/(n-1) - kxy_sum*2/m/n
def _calc_kid(x, y, maxs=50):
    xs,ys = x.shape[0],y.shape[0]
    n = max(math.ceil(min(xs/maxs, ys/maxs)), 4)
    mmd = 0.
    for i in range(n):
        cur_x = x[round(i*xs/n) : round((i+1)*xs/n)]
        cur_y = y[round(i*ys/n) : round((i+1)*ys/n)]
        mmd += _squared_mmd(cur_x, cur_y)
    return (mmd/n).item()

这个代码因为涉及很多数学公式,所以看上去比较难懂,但是这些细节都无关紧要。

我们最后再把这两个函数封装成一个类。

class ImageEval:
    def __init__(self, model, dls, cbs=None):
        self.learn = TrainLearner(model, dls, loss_func=fc.noop, cbs=cbs, opt_func=None)
        self.feats = self.learn.capture_preds()[0].float().cpu().squeeze()
        self.stats = _calc_stats(self.feats)
    def get_feats(self, samp):
        self.learn.dls = DataLoaders([],[(samp, tensor([0]))])
        return self.learn.capture_preds()[0].float().cpu().squeeze()
    def fid(self, samp): return _calc_fid(*self.stats, *_calc_stats(self.get_feats(samp)))
    def kid(self, samp): return _calc_kid(self.feats, self.get_feats(samp))

Skip Sampling

我们减少让模型预测噪声的次数,因为在相邻的timestep之间,噪声的预测变化幅度不会特别大,而调用模型是非常耗时间的。这样,我们调用模型的次数可以从1000次减少到不到400次,会大幅提升生成图像的速度,质量不会有明显差别。但是,如果调用模型太少的话,生成的图片就不会有很多细节。

@torch.no_grad()
def sample_skip(model, sz):
    ps = next(model.parameters())
    x_t = torch.randn(sz).to(ps)
    preds = []
    for t in reversed(range(n_steps)):
        t_batch = torch.full((x_t.shape[0],), t, device=ps.device, dtype=torch.long)
        z = (torch.randn(x_t.shape) if t > 0 else torch.zeros(x_t.shape)).to(ps)
        ᾱ_t1 = alphabar[t-1]  if t > 0 else torch.tensor(1)
        b̄_t = 1-alphabar[t]
        b̄_t1 = 1-ᾱ_t1
        if t%3==0 or t<50: noise = model((x_t, t_batch))
        x_0_hat = ((x_t - b̄_t.sqrt() * noise)/alphabar[t].sqrt())
        x_t = x_0_hat * ᾱ_t1.sqrt()*(1-alpha[t])/b̄_t + x_t * alpha[t].sqrt()*b̄_t1/b̄_t + sigma[t]*z
        preds.append(x_t.cpu().float())
    return preds

DDIM

这是Denoising Diffusion Implistic Models的缩写。使用模型预测图片中的噪声没有什么新意,你也能看出来,关键在于如何一步一步去除噪声得到完整的图片。DDIM创新的地方是通过一些新的数学公式可以在保持质量的情况下实现大幅更快速度的采样。我们先看如何用Huggingface的自带功能实现DDPM取样。

sched = DDPMScheduler(beta_end=0.01)
x_t = torch.randn(sz).cuda()
preds = []

for t in progress_bar(sched.timesteps):
    with torch.no_grad(): noise = model(x_t, t).sample
    x_t = sched.step(noise, t, x_t).prev_sample
    preds.append(x_t.float().cpu())

接下来是DDIM。上公式: \(x_{t-1} = \sqrt{\bar{\alpha}_{t-1}}\;\hat{x}_0 + \sqrt{1 - \bar{\alpha}_{t-1} - \sigma_t^2}\;\epsilon_\theta(x_t, t) + \sigma_t\,z\)

sched = DDIMScheduler(beta_end=0.01)
sched.set_timesteps(333)
def diff_sample(model, sz, sched, **kwargs):
    x_t = torch.randn(sz).cuda()
    preds = []
    for t in progress_bar(sched.timesteps):
        with torch.no_grad(): noise = model(x_t, t).sample
        x_t = sched.step(noise, t, x_t, **kwargs).prev_sample
        preds.append(x_t.float().cpu())
    return preds
preds = diff_sample(model, sz, sched, eta=1.)
s = (preds[-1]*2).clamp(-1,1)

这个eta可以控制生成图片的随机程度,和sigma_t成正比。在eta=0的时候,算法可以在50步生成一个还不错的图片,这就是DDIM的优点所在。

UNet

作为Stable Diffusion的另一个重要主题,我们以Super Resolution问题来展开。如果面对一堆低分辨率的图像,如何生成对应的高分辨率版本?可以考虑用AutoEncoder结构来应对这种输入是图片、输出也是图片的情况,但是单纯的AutoEncoder对原先图像的信息压缩太严重,会大大降低神经网络的理解能力,所以把跳步连接的原理用在AutoEncoder结构中。

class TinyUnet(nn.Module):
    def __init__(self, act=act_gr, nfs=(32,64,128,256,512,1024), norm=nn.BatchNorm2d):
        super().__init__()
        self.start = ResBlock(3, nfs[0], stride=1, act=act, norm=norm)
        self.dn = nn.ModuleList([ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2)
                                 for i in range(len(nfs)-1)])
        self.up = nn.ModuleList([up_block(nfs[i], nfs[i-1], act=act, norm=norm)
                                 for i in range(len(nfs)-1,0,-1)])
        self.up += [ResBlock(nfs[0], 3, act=act, norm=norm)]
        self.end = ResBlock(3, 3, act=nn.Identity, norm=norm)

    def forward(self, x):
        layers = []
        layers.append(x)
        x = self.start(x)
        for l in self.dn:
            layers.append(x)
            x = l(x)
        n = len(layers)
        for i,l in enumerate(self.up):
            if i!=0: x += layers[n-i]
            x = l(x)
        return self.end(x+layers[0])

这是一个很复杂的卷积神经网络,为了降低训练难度,我们把最后几层权重设置成零。

Image Style Transfer

我们希望第二张图片能够融合第一张图片的风格,同时自己的内容大部分不变。在上手这个任务之前,我们需要先做一些准备工作。

利用Learner框架的灵活性

比如说看下面的例子:我们需要对一张随机生成的图片进行梯度下降,从而让它和我们给定的图片相等。因为没有所谓的数据集,我们只能定义一个来骗Learner来正常进行训练过程!

class LengthDataset():
    def __init__(self, length=1): self.length=length
    def __len__(self): return self.length
    def __getitem__(self, idx): return 0,0
def get_dummy_dls(length=100):
    return DataLoaders(DataLoader(LengthDataset(length), batch_size=1), # Train
                       DataLoader(LengthDataset(1), batch_size=1))      # Valid (length 1)
class TensorModel(nn.Module):
    def __init__(self, t):
        super().__init__()
        self.t = nn.Parameter(t.clone())
    def forward(self, x=0): return self.t
model = TensorModel(torch.rand_like(content_im))
show_image(model());

可以看到,我们定义了假的Dataset,DataLoaders和Model,它们实际上什么都干不了。但是,精髓在于修改训练过程。我们重新写了predictget_loss函数,通过精巧的代码设计,让随机排列的像素进行一百次梯度下降来逼近我们之前给定的图片。

class ImageOptCB(TrainCB):
    def predict(self, learn): learn.preds = learn.model()
    def get_loss(self, learn): learn.loss = learn.loss_func(learn.preds)
def loss_fn_mse(im):
    return F.mse_loss(im, content_im)
model = TensorModel(torch.rand_like(content_im))
cbs = [ImageOptCB(), ProgressCB(), MetricsCB(), DeviceCB()]
learn = Learner(model, get_dummy_dls(100), loss_fn_mse, 
                lr=1e-2, cbs=cbs, opt_func=torch.optim.Adam)
learn.fit(1)

同时,我们还可以写一个回调来记录训练过程中图像的变化。非常好懂。

class ImageLogCB(Callback):
    order = ProgressCB.order + 1
    def __init__(self, log_every=10): store_attr(); self.images=[]; self.i=0
    def after_batch(self, learn): 
        if self.i%self.log_every == 0: self.images.append(to_cpu(learn.preds.clip(0, 1)))
        self.i += 1
    def after_fit(self, learn): show_images(self.images)

截取图片特征+ImageNet正则化

我们刚才是比对两个图片的像素进行迭代的,如果想要做到风格转移,就需要抽取两张图像的“风格”向量,然后比对这两个的差别,同时再加权考虑上实际上像素之间的差别(这样才能保留原来图像的内容,并且融入新风格)。如果想提取一个图像的风格,其实也可以做到,那就是使用预训练的模型的后几层输出。在这个例子中,我们使用VGG卷积神经网络。

vgg16 = timm.create_model('vgg16', pretrained=True).to(def_device).features

细节:为什么最后要加.features?因为这个模型分为两部分:一个是features,一个是head。前面的负责特征提取,后面的是全连接层,负责把提取到的特征映射到ImageNet上的1000个图像类别去。因为我们这里只需要图片的特征,所以直接用了.features,抛弃了后面的部分。

使用预训练模型需要注意的是正则化。因为VGG模型是在ImageNet上训练的,所以为了和ImageNet训练集里的其他图片一样接受相同的正则化,我们就需要减去ImageNet图片的像素均值,再除以方差。

def normalize(im):
    imagenet_mean = tensor([0.485, 0.456, 0.406])[:,None,None].to(im.device)
    imagenet_std = tensor([0.229, 0.224, 0.225])[:,None,None].to(im.device)
    return (im - imagenet_mean) / imagenet_std

为了省事,我们也可以使用torchvision.transforms的正则化手段:

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
normalize(content_im).min(), normalize(content_im).max()

然后我们就可以上手获取图片的特征了。在VGG模型的最后几层,原来图片的信息已经被大大压缩,模型能够记住的,就是一些关于图片特征的关键指标,所以我们就可以想办法截获这样的特征!我们下面选择的两层都是ReLU层,处理完后的图像大小分别是(512, 32, 32)和(512, 16, 16)。

def calc_features(imgs, target_layers=(18, 25)): 
    x = normalize(imgs)
    feats = []
    for i, layer in enumerate(vgg16[:max(target_layers)+1]):
        x = layer(x)
        if i in target_layers:
            feats.append(x.clone())
    return feats

Content Loss

于是我们就可以想办法比较两个图片最后的特征差别。

class ContentLossToTarget():
    def __init__(self, target_im, target_layers=(18, 25)):
        fc.store_attr()
        with torch.no_grad():
            self.target_features = calc_features(target_im, target_layers)
    def __call__(self, input_im): 
        return sum((f1-f2).pow(2).mean() for f1, f2 in 
               zip(calc_features(input_im, self.target_layers), self.target_features))
loss_function_perceptual = ContentLossToTarget(content_im)
model = TensorModel(torch.rand_like(content_im))
learn = Learner(model, get_dummy_dls(150), loss_function_perceptual, 
                lr=1e-2, cbs=cbs, opt_func=torch.optim.Adam)
learn.fit(1, cbs=[ImageLogCB(log_every=30)])        

很显然,如果我们从随机的图片开始,是无法收敛到原来的图片的,但是我们可以看出原来图片的轮廓之类的信息。如果选择神经网络前面的一些层,我们会得到一张不同的图片,因为前面的几层关注的是不同的特征。

Style Loss和Gram Matrix

我们在VGG网络的第18层抽取到了一个(512, 32, 32)的特征矩阵。第一个维度对应神经网络找到的特征,然后32x32代表特征出现的位置。但是我们理想中的Style Transfer中,具体的风格应该是和出现位置无关的。有一个非常有用的技巧可以消除空间信息。

第一步,Flatten。把空间维度展平,变成(512, 1024)的矩阵。

第二步,乘以自身的转置,得到(512,512)的协相关矩阵,里面有特征之间的相关程度信息,非常有用!这就是Gram Matrix。

在代码中,我们还有一个特殊的处理,因为要计算好几个不同层的风格矩阵,所以要除以每个矩阵的大小防止那些空间维度大的矩阵数值也会变的非常大,掩盖其他矩阵的信号。

def calc_grams(img, target_layers=(1, 6, 11, 18, 25)):
    return L(torch.einsum('chw, dhw -> cd', x, x) / (x.shape[-2]*x.shape[-1]) 
            for x in calc_features(img, target_layers))             # 'bchw, bdhw -> bcd' if batched
class StyleLossToTarget():
    def __init__(self, target_im, target_layers=(1, 6, 11, 18, 25)):
        fc.store_attr()
        with torch.no_grad(): self.target_grams = calc_grams(target_im, target_layers)
    def __call__(self, input_im): 
        return sum((f1-f2).pow(2).mean() for f1, f2 in 
               zip(calc_grams(input_im, self.target_layers), self.target_grams))
model = TensorModel(content_im) # Start from content image
style_loss = StyleLossToTarget(style_im)
content_loss = ContentLossToTarget(content_im)
def combined_loss(x):
    return style_loss(x) + content_loss(x)
learn = Learner(model, get_dummy_dls(150), combined_loss, lr=1e-2, cbs=cbs, opt_func=torch.optim.Adam)
learn.fit(1, cbs=[ImageLogCB(30)])

Neural Cellular Automata

可以参考这篇文章来获得直观认识:https://distill.pub/2020/growing-ca/

本质上,就是设计一个神经网络,对每个元素单独进行某些规则的操作,让系统整体呈现出某种性质,有点像生命游戏。废话不多说,我们直接上代码:定义一个128x128的网格,然后每个格子对应有四个元素表示自己当前的状态。我们直接把filters定义死,规定每个格子都能接受到四种周围格子的信息,这样可以降低训练难度。

num_channels = 4
hidden_n = 8
def make_grids(n, sz=128): return torch.zeros(n, num_channels, sz, sz).to(def_device)
filters = torch.stack([
    tensor([[0.0,0.0,0.0],[0.0,1.0,0.0],[0.0,0.0,0.0]]),
    tensor([[-1.0,0.0,1.0],[-2.0,0.0,2.0],[-1.0,0.0,1.0]]),
    tensor([[-1.0,0.0,1.0],[-2.0,0.0,2.0],[-1.0,0.0,1.0]]).T,
    tensor([[1.0,2.0,1.0],[2.0,-12,2.0],[1.0,2.0,1.0]])
]).to(def_device)

接下来是使用神经网络来想办法更新我们的网格世界。在直接上神经网络之前,我们要先把filters应用在网格上来告诉每个格子它周围的格子的信息,但是具体怎么操作呢?下面是详细的解释。

def perchannel_conv(x, filters):
    '''filters: [filter_n, h, w]'''
    b, ch, h, w = x.shape
    y = x.reshape(b*ch, 1, h, w)
    y = F.pad(y, [1, 1, 1, 1], 'circular') # << Note pad mode
    y = F.conv2d(y, filters[:,None])
    return y.reshape(b, -1, h, w)
  • 我们用x = make_grids(1)来作为参数传进这个函数,所以初始形状(1, 4, 128, 128)。
  • 接下来看reshape函数。需要牢记的是,所有的高维张量本质上都是在一维的内存中存储的,不同的维度的索引移动一格,在内存中的移动距离不一样。在这个例子里面,我们的数据在内存中应该是这样排布的:
[b0_ch0_pixels..., b0_ch1_pixels..., ..., b0_chN_pixels..., b1_ch0_pixels..., ...]

最开始在第一个维度上,我们走一步就相当于在内存中跨越$chw$个元素,而在reshape之后,我们就相当于把每个原先的channel对应pixels都当成一个不同的图片进行处理。此时第一维度的stride就会变成$hw$。

  • circular padding是用来应对卷积层在边缘如何进行计算的问题的。我们把最左边的像素复制到右边的边缘,其他边也类推,所以应用在右边缘的卷积核可以看到最左边的元素,不会丢失信息。第二个参数表示左右上下各填充多少个元素。
  • filters[:, None]把(4, 3, 3)的卷积核变成了(4, 1, 3, 3)。如果把冒号改成省略号,那么新增的维度才会被插入到最后,注意区分。这里再解释一下为什么要把每个通道分开进行卷积:因为其实每个像素对应的通道表示的是这个像素点独立的信息,我们不想通过PyTorch默认的卷积来把所有的通道结果加和。
  • 这样,因为每个state都对应四个滤镜,那么最后的通道数就会变成原来的四倍。

数据预处理好了,接下来看神经网络!我们的目的是对每一个格点根据它的状态信息用这个网络更新。所以说需要先把所以批次的所有格点展平到第一个维度,而原先的通道维度放到最后。卷积就需要利用1x1卷积层的技巧,它的本质其实也是线性层。

之所以最后一层没有偏置,是因为我们更新网络的方式是把神经网络的输出加到原来的状态上,如果开始bias不等于零,那么网络的激活值就不断向一个方向偏移,这不是我们想要的。

import einops
x = make_grids(1)
model_inputs = perchannel_conv(x, filters)
brain = nn.Sequential(
    nn.Linear(num_channels*4, hidden_n),
    nn.ReLU(),
    nn.Linear(hidden_n, num_channels, bias=False)
).to(def_device)
model_inputs_flat = einops.rearrange(model_inputs, 'b c h w -> (b h w) c') # (1*128*128, 16)
brain_preds = brain(model_inputs_flat).reshape(x.shape)

brain = nn.Sequential(
    nn.Conv2d(num_channels*4, hidden_n, 1),
    nn.ReLU(),
    nn.Conv2d(hidden_n, num_channels, 1, bias=False)
).to(def_device)
brain_preds = brain(model_inputs).reshape(x.shape)

最后,我们把所有功能整合成一个类。注意到三个细节:

  • 最后一层权重也被设置为零,简化神经网络。
  • 采用随机更新的方法来提高训练的灵活性。我使用了torch.rand,它生成的是均匀分布,所以我们加上0.5,整体的分布范围就是0.5到1.5,向下取整之后就会有一半的数据是1,一半的数据是0。
  • 定义了to_rgb函数,在把网格转换成图片的时候只用前三个通道,同时为了应对一开始图片是全黑的情况,我们格外加上0.5。
class SimpleCA(nn.Module):
    def __init__(self, zero_w2=True):
        super().__init__()
        self.w1 = nn.Conv2d(num_channels*4, hidden_n, 1)
        self.relu = nn.ReLU()
        self.w2 = nn.Conv2d(hidden_n, num_channels, 1, bias=False)
        if zero_w2: self.w2.weight.data.zero_()
    def forward(self, x, update_rate=0.5):
        y = perchannel_conv(x, filters) # Apply the filters
        y = self.w2(self.relu(self.w1(y))) # pass the result through our 'brain'
        b, c, h, w = y.shape
        update_mask = (torch.rand(b, 1, h, w).to(x.device)+update_rate).floor() # Random update
        return x+y*update_mask
    def to_rgb(self, x):
        return x[...,:3,:,:]+0.5

剩下的就是训练了。代码之所以这么长,是因为我们有额外的画图任务。

损失函数是模型运行一定时间之后生成图案和我们预想图案之间的Style Loss,同时为了防止梯度爆炸,我们在更新的时候会把梯度的大小正则化,这个技巧叫Gradient Normalization

class NCAProgressCB(ProgressCB):
    def after_batch(self, learn):
        learn.dl.comment = f'{learn.loss:.3f}'
        if not (hasattr(learn, 'metrics') and learn.training): return 
        self.losses.append(learn.loss.item())
        mbar = self.mbar
        if not hasattr(mbar, 'graph_fig'):
            mbar.graph_fig, mbar.graph_axs = plt.subplots(1, 2, figsize=(12, 3.5))
            mbar.graph_out = display(mbar.graph_fig, display_id=True)
        # Update preview image every 64 iters
        if (len(self.losses))%64 != 10: return         
        # Plot losses:
        mbar.graph_axs[0].clear()
        mbar.graph_axs[0].plot(self.losses, '.', alpha=0.3)
        mbar.graph_axs[0].set_yscale('log')
        mbar.graph_axs[0].set_ylim(tensor(self.losses).min(), self.losses[0])       
        # Show preview images:
        rgb = learn.model.to_rgb(learn.preds.detach()).clip(0, 1)
        show_image(torchvision.utils.make_grid(rgb), ax=mbar.graph_axs[1])        
        # Update graph
        mbar.graph_out.update(mbar.graph_fig)
class NCACB(TrainCB):
    order = DeviceCB.order+1
    def __init__(self, ca, style_img_tensor, style_loss_scale=0.1, size=256, 
                 step_n_min=32, step_n_max=96, batch_size=4):
        fc.store_attr()
        with torch.no_grad(): self.pool = make_grids(256, sz=size) # Set up a 'pool' of grids    
    def predict(self, learn):         
        # Pick some random samples from the pool
        batch_idx = torch.randint(0, len(self.pool), (self.batch_size,))
        x = self.pool[batch_idx]        
        # occasionally zero out some samples
        if torch.randint(8, (1,)) < 1: 
            x[:1] =  make_grids(1, sz=self.size)        
        # Apply the model a number of times
        for _ in range(torch.randint(self.step_n_min, self.step_n_max, (1,))):
            x = learn.model(x)        
        # Update pool
        with torch.no_grad(): self.pool[batch_idx] = x
        # and store preds
        learn.preds = x        
    def get_loss(self, learn): 
        style_loss = learn.loss_func(learn.model.to_rgb(self.learn.preds))
        overflow_loss = (learn.preds-learn.preds.clamp(-1.0, 1.0)).abs().sum()
        learn.loss = overflow_loss + style_loss*self.style_loss_scale        
    def backward(self, learn):
        learn.loss.backward()
        # Gradient normalization:
        for p in learn.model.parameters():
            p.grad /= (p.grad.norm()+1e-8) 
    def before_fit(self, learn): self.learn=learn 

最后生成效果图:

images = []
x = torch.randn(1, num_channels, 128, 128).to(def_device) * 0.1
for i in range(900):
    x = model(x)
    if i%100==0: images.append(model.to_rgb(x)[0].clip(0, 1))
show_images(images)

注意力机制

Self-Attention

这里介绍的是一种比较简单的自注意力机制,最开始用于GAN来感知图片各个地方之间的关系,这样生成的图片更具有逻辑上的一致性,显著提高了结果的质量。下面拆解一下这个注意力机制的原理。先放上源代码。

class SelfAttention(Module):
    "Self attention layer for `n_channels`."
    def __init__(self, n_channels):
        self.query,self.key,self.value = [self._conv(n_channels, c) for c in (n_channels//8,n_channels//8,n_channels)]
        self.gamma = nn.Parameter(tensor([0.]))

    def _conv(self,n_in,n_out):
        return ConvLayer(n_in, n_out, ks=1, ndim=1, norm_type=NormType.Spectral, act_cls=None, bias=False)

    def forward(self, x):
        #Notation from the paper.
        size = x.size()
        x = x.view(*size[:2],-1)
        f,g,h = self.query(x),self.key(x),self.value(x)
        beta = F.softmax(torch.bmm(f.transpose(1,2), g), dim=1)
        o = self.gamma * torch.bmm(h, beta) + x
        return o.view(*size).contiguous()

首先,明确一下输入的张量形状类似于(32,16,8,8)。说白了,就是递给模型一张图片。然后把这个图片的64像素压成1维的。

然后我们让这张图分别经过三个1维、卷积核大小为1的卷积神经网络处理,产生(32, 2, 64)的query、key和(32, 16, 64)的value。这种卷积核大小是1的卷积层,本质上就是一个对所有像素同时处理的全连接层,分别对每个像素都汇总了来自16个通道的信号,把它压缩到2个通道上去。value则没有进行丢失信息的处理。

接下来的操作就是注意力机制的精髓:我们通过转置把f的形状变成(32, 64, 2)再和g进行批次矩阵乘法,得到(32, 64, 64)的矩阵。我们用两个数据表示每一个像素“需要”的信息(query),然后再用两个数据表示每个像素可以“提供”的信息(key),然后对每一个像素对都求出图片上其他每一个位置的像素对它的影响和可以提供的信息,形成一个巨大的64x64表格。这就是Attention。然后我们不能直接把这个输出,因为会丢失信号。因此,我们对这个表格进行行维度上的softmax处理,然后让这个Attention Mask和value相乘,这样,新得到的每一个位置都是原来所有位置的加权平均值,也就是综合了Attention的信息!然后我们用gamma来自定义模型的注意力大小。

最后,我们返回伸缩回去的张量,这就是注意力机制!

Pooled Self-Attention

class PooledSelfAttention2d(Module):
    "Pooled self attention layer for 2d."
    def __init__(self, n_channels):
        self.n_channels = n_channels
        self.query,self.key,self.value = [self._conv(n_channels, c) for c in (n_channels//8,n_channels//8,n_channels//2)]
        self.out   = self._conv(n_channels//2, n_channels)
        self.gamma = nn.Parameter(tensor([0.]))

    def _conv(self,n_in,n_out):
        return ConvLayer(n_in, n_out, ks=1, norm_type=NormType.Spectral, act_cls=None, bias=False)

    def forward(self, x):
        n_ftrs = x.shape[2]*x.shape[3]
        f = self.query(x).view(-1, self.n_channels//8, n_ftrs)
        g = F.max_pool2d(self.key(x),   [2,2]).view(-1, self.n_channels//8, n_ftrs//4)
        h = F.max_pool2d(self.value(x), [2,2]).view(-1, self.n_channels//2, n_ftrs//4)
        beta = F.softmax(torch.bmm(f.transpose(1, 2), g), -1)
        o = self.out(torch.bmm(h, beta.transpose(1,2)).view(-1, self.n_channels//2, x.shape[2], x.shape[3]))
        return self.gamma * o + x

和上面的代码有很多相似的地方,但逻辑还是一样的难懂。总体思路是,这个是上面版本的一个优化,原来我们对每一个像素都要计算所有其他像素的注意力权重,但是实际上没有必要,我们只需要对其他地方进行一个MaxPool操作,就可以大大减少模型所需“注意力”的运算量,从而加速模型训练,让其能够生产更高分辨率的图片!

  • __init__方法中,我们就已经注意到了不同:self.value对应的channels除了2,并且多了一个把channels翻倍的self.out层。然后我们重点看features层。
  • n_ftrsnumber of features的缩写,代表图片中像素的数目。然后让输入x被query处理之后flatten掉。
  • 用2x2的池化层扫过self.key(x),这样features就变成了原来的1/4倍,同样flatten。然后对self.value(x)做相同处理,但是要注意的是self.value(x)最后的通道数是channels/2。
  • 用矩阵乘法产生一个Attention Mask,如果输入是(32, 16, 8, 8),那么这个Mask的形状就是(32, 64, 16),每个像素对应来自图片其他16个位置的信息。然后再经过一波矩阵乘法,得到torch.bmm(h, beta.transpose(1,2)的形状是(32, 8, 64),用out层处理之后,再进行变形,就可以得到一个和输入形状相同的结果,然后就是熟悉的乘以gamma的运算了。

Simple Self-Attention

class SimpleSelfAttention(Module):
    def __init__(self, n_in:int, ks=1, sym=False):
        self.sym,self.n_in = sym,n_in
        self.conv = _conv1d_spect(n_in, n_in, ks, padding=ks//2, bias=False)
        self.gamma = nn.Parameter(tensor([0.]))

    def forward(self,x):
        if self.sym:
            c = self.conv.weight.view(self.n_in,self.n_in)
            c = (c + c.t())/2
            self.conv.weight = c.view(self.n_in,self.n_in,1)

        size = x.size()
        x = x.view(*size[:2],-1)

        convx = self.conv(x)
        xxT = torch.bmm(x,x.permute(0,2,1).contiguous())
        o = torch.bmm(xxT, convx)
        o = self.gamma * o + x
        return o.view(*size).contiguous()

如果我们对模型的性能有严格的限制,可以考虑使用这个更简单版本的注意力机制。我们完全抛弃掉了Query, Key和Value,只是让输入X乘以自己的转置,再经卷积层处理后与原来的输入相加。

主要的道理和前面两个例子相同,我就不再详细描述了。不过Attention这块模型的本质我还是没有特别搞清楚,以后很可能还会回来继续补充我对Attention的理解。

FastAI中间层API

1. 从Transform类开始:

class NormalizeMean(Transform):
    def setups(self, items): self.mean = sum(items)/len(items)
    def encodes(self, x): return x-self.mean
    def decodes(self, x): return x+self.mean

注意我们进行方法改写的时候,一定要写setups,decodes和encodes,不要去掉s。因为fastai会自动为我们包装上其他的功能,中间再调用我们定义的函数!

2. 用Pipeline合并多个Transform

tfms = Pipeline([tok, num])
t = tfms(txts[0]); t[:20]
tfms.decode(t)[:100]

3. 用TfmdLists结合Pipeline与输入数据

原来我们只能用Pipeline处理一个输入。如果我们有一堆输入,都放在列表里,可以使用这个功能批量处理:

tls = TfmdLists(files, [Tokenizer.from_folder(path), Numericalize])
t = tls[0]; t[:20]
tls.decode(t)[:100]
tls.show(t)

而且TfmdLists还有一个特有的功能,就是可以接受splits参数:

cut = int(len(files)*0.8)
splits = [list(range(cut)), list(range(cut,len(files)))]
tls = TfmdLists(files, [Tokenizer.from_folder(path), Numericalize], 
                splits=splits)

然后就可以tls.valid[0][:20]这样用了。

4. 用Datasets将x、y传入不同的Pipeline

x_tfms = [Tokenizer.from_folder(path), Numericalize]
y_tfms = [parent_label, Categorize()]
dsets = Datasets(files, [x_tfms, y_tfms], splits=splits)
x,y = dsets.valid[0]
x[:20],y

还可以方便的把datasets转换成dataloaders:

dls = dsets.dataloaders(bs=64, dl_type=SortedDL, before_batch=pad_input)
  • after_item:: Applied on each item after grabbing it inside the dataset. This is the equivalent of item_tfms in DataBlock.
  • before_batch:: Applied on the list of items before they are collated. This is the ideal place to pad items to the same size.
  • after_batch:: Applied on the batch as a whole after its construction. This is the equivalent of batch_tfms in DataBlock.

FastAI高级函数

Dataset: A collection that returns a tuple of your independent and dependent variable for a single item

DataLoader: An iterator that provides a stream of mini-batches, where each mini-batch is a couple of a batch of independent variables and a batch of dependent variables

Datasets: An iterator that contains a training Dataset and a validation Dataset

DataLoaders: An object that contains a training DataLoader and a validation DataLoader

DataLoader和DataLoaders

定义

dl = DataLoader(range(15), batch_size=5, shuffle=True)
list(dl)
::
[tensor([ 0,  7,  4,  5, 11]),
 tensor([ 9,  3,  8, 14,  6]),
 tensor([12,  2,  1, 10, 13])]
::
ds = L(enumerate(string.ascii_lowercase))
dl = DataLoader(ds, batch_size=6, shuffle=True)
::
[(tensor([ 6, 14, 12, 15, 24, 11]), ('g', 'o', 'm', 'p', 'y', 'l')),
 (tensor([ 0, 16,  2, 18, 25, 21]), ('a', 'q', 'c', 's', 'z', 'v')),
 (tensor([ 8,  7, 19, 23,  1,  9]), ('i', 'h', 't', 'x', 'b', 'j')),
 (tensor([ 4, 13, 10,  5,  3, 17]), ('e', 'n', 'k', 'f', 'd', 'r')),
 (tensor([22, 20]), ('w', 'u'))]
::

  • 我们可以看出最基础的DataLoader的逻辑。给了一个列表,那就直接分组;如果列表的形式是被zip到一起的,那就当成dataset来处理,”题目“和”答案“分开。
  • bs参数可以设置每一批图像的个数多少。

获取数据

dl = DataLoader(dset, batch_size=256)
xb,yb = first(dl)
  • 我们可以通过first函数来查看第一组数据(准确来说是第一批)。
  • 或者:
x,y = dls.one_batch()

也可以帮助我们获得第一批次的训练数据。x、y的长度是之前在模型训练时,我们规定的batch_size的大小。

获取预测(第一种)

preds,_ = learn.get_preds(dl=[(x,y)])
preds[0] # 此为组内第一个数据的预测。

We can view the predictions (the activations of the final layer of our neural network) by using Learner.get_preds. This function takes either a dataset index (0 for train and 1 for valid) or an iterator of batches. Thus, we can pass it a simple list with our batch to get our predictions. It returns predictions and targets by default, but since we already have the targets, we can effectively ignore them by assigning to the special variable _.

learn.get_preds如果不提供任何参数,那就是默认返回validation set的预测与真实值。

获取预测(第二种)

is_bird,_,probs = learn.predict(PILImage.create('bird.jpg'))

中间那个下划线省略了,因为返回的东西是输入所对应的具体类别,如tensor(0),和我们想要的没有太大关系。

还可以添加参数with_decoded=True来获取想要的类别代码之类的。

创建针对测试数据的Dataloader

tst_files = get_image_files(path/'test_images').sorted()
tst_dl = dls.test_dl(tst_files)

直接给文件路径列表就行。

DataBlock

bears = DataBlock(
    blocks=(ImageBlock, CategoryBlock),
    get_items=get_image_files,
    splitter=RandomSplitter(valid_pct=0.2, seed=42),
    get_y=parent_label,
    item_tfms=Resize(128))
bears = bears.new(
    item_tfms=RandomResizedCrop(224, min_scale=0.5),
    batch_tfms=aug_transforms())
dls = bears.dataloaders(path)

基本属性

最需要注意的来自datablock的定义。

  • 参数blocks是一个元组,代表输入和输出的数据类型。
  • 使用get_image_files获取所有的图片文件作为输入。
  • splitter属性必不可少,Randomsplitter中可以设置训练集所占比例,对应参数valid_pct。其中,pct是percentage(百分比)的意思。
  • 使用get_y=parent_label,让其把每一张照片所在文件夹的名称作为其所对应的分类!对应我们刚才的数据存储方式。记住,get_items返回的是什么样的数据,什么样的数据就会被传入get_y。可以说,前者是获取题目,y是你根据题目来索引得到答案。get_image_files返回的是所有图像文件的路径,而这些路径可以被传入parent_label得到它们对应的分类。

还有一种用法:get_y=using_attr(RegexLabeller(r'(.+)_\d+.jpg$'), 'name'),可以让我们主动提取文件名中的某一部分(使用正则表达式)而非使用其父文件夹的名字。为什么后面的参数要写name?注意到前面提过获取一个path对象的文件名时,我们使用的fname.name

补充:item_tfms=[Resize(192, method=’squish’)]也是完全可以的。

dataset的定义方法

dblock = DataBlock()
dsets = dblock.datasets(df)
len(dsets.train),len(dsets.valid) # 4009, 1002

另一种定义方式就是从空的对象开始,然后来规定里面的dataset(数据集)。而这个数据集可以直接从pandas的dataframe转换过来,十分方便。而给到一个数据集之后,fastai是无从知道什么是训练数据、什么是测试数据,以及什么是输入、什么是输出。于是它便不做任何区分。x,y = dsets.train[0]确实会返回第一个数据,但是毕竟训练集和测试集都是随机分的,而且题目和答案都指向的dataframe中的同一行。

def get_x(r): return path/'train'/r['fname']
def get_y(r): return r['labels'].split(' ')
dblock = DataBlock(get_x = get_x, get_y = get_y)
dsets = dblock.datasets(df)
dsets.train[0]

所以我们可以进阶一下,定义了get_x和get_y函数,这两个函数负责处理df中的每一行数据,并且吐出了对应的”题目“和”答案“。在这里,题目是文件名指向的图片,而答案则是图片对应的标签。因为是多标签分类问题,所以需要进行split处理,变成一个列表。

注意:哪怕get_x和get_y所需要的函数都很短小,也不要使用lambda,因为之后在导出learner的时候,lambda函数是不支持序列化的!

dblock = DataBlock(blocks=(ImageBlock, MultiCategoryBlock),
                   get_x = get_x, get_y = get_y)
dsets = dblock.datasets(df)
dsets.train[0]

怎么让fastai知道这是图像多标签分类呢?block的类型在这里真正派上了用场!指定了文件名之后,因为规定是imageblock,fastai就会把这个图像文件打开并加载作为输入,而多标签的multicategoryblock则会产生一个长度为20的列表(对应物品的总种类),如果图片符合某一个种类,对应种类的索引就是1。

  • dsets.train.vocab可以获取所有标签的名称。
torch.where(dsets.train[0][1]==1.)

再讲一下这段代码。它获取了训练集的第一个数据,拿到了标签,然后对照一下看看哪一个标签是符合图片的。这会返回一个长度为20,里面只有True和False的列表,而想把true的地方都变成索引,就用到了torch.where,所以我们得到了(TensorMultiCategory([14, 18]),)把这个标签数据转换成文字也特别简单,只需要:

dsets.train.vocab[idxs]

就可以知道图片对应的具体是什么类别,(#2) ['person','train']

learner对象也可以轻松的获取vocab:learner.dls.vocab即可

手动操作训练集和测试集的分割

def splitter(df):
    train = df.index[~df['is_valid']].tolist()
    valid = df.index[df['is_valid']].tolist()
    return train,valid

dblock = DataBlock(blocks=(ImageBlock, MultiCategoryBlock),
                   splitter=splitter,
                   get_x=get_x, 
                   get_y=get_y)

dsets = dblock.datasets(df)
dsets.train[0]

根据刚才通过dataset定义的方法,其中的dataframe有一列是用来规定该数据是否应当在测试集里面。那么我们需要根据这个原则去手动分割数据集,这就必须自己定义splitter方法,而不是用fastai提供的randomsplitter。

试一下df.is_valid就可以知道返回的是一系列的True和False,但这是Series类型而不是张量,所以没有办法用torch.where获取True所在的索引。那么我们就用pandas自己提供的方法:df.index[df.is_valid]可以得到类似的效果。可以用df.index[df.is_valid].tolist()转换为普通列表即可。然后就有了刚才的分离函数,记住,传入分离函数的参数是整个数据集的dataframe,输出的是训练集、测试集各自分别的索引列表。

分割的另一种方法
splitter=FuncSplitter(lambda o: o.parent.name=='13')

这里的含义是让某一种数据(判定为True的)被归为测试集。

标准随机分割

image-20250613071357564

抛弃dataset

dblock = DataBlock(blocks=(ImageBlock, MultiCategoryBlock),
                   splitter=splitter,
                   get_x=get_x, 
                   get_y=get_y,
                   item_tfms = RandomResizedCrop(128, min_scale=0.35))
dls = dblock.dataloaders(df)

现在我们就可以不再用dataset做低级的测试了,稍加改动,把datasets改成dataloaders,就可以得到目标dataloaders了!

链接DataLoader

不过还要记住,datablock定义之后并没有加载数据,想要把datablock和数据连接起来,我们需要根据datablock加载我们已经存好数据的路径,即生成dataloaders保存真正的训练集和测试集。

调试DataBlock

如果创建dataloader失败,可以使用这个函数来进行调试:datablock.summary(path)。比如作为数据集的图片大小没有均一化,会得到如下提示信息:

Collating items in a batch
Error! It's not possible to collate your items in a batch
Could not collate the 0-th members of your tuples because got the following
shapes:
torch.Size([3, 500, 375]),torch.Size([3, 375, 500]),torch.Size([3, 333, 500]),
torch.Size([3, 375, 500])

Learner相关

保存和加载模型

learn.save('1epoch')

import torch.serialization
torch.serialization.add_safe_globals([L])

learn = learn.load('1epoch')

语言模型的保存

learn.save_encoder('finetuned')

这样不保存最后一层神经元,用来方便模型在之后精调,用于其他功能。

手动获得数据及其预测输出

learn = vision_learner(dls, resnet18)
x,y = to_cpu(dls.train.one_batch())
activs = learn.model(x)

数据清理(可能用不了)

interp = ClassificationInterpretation.from_learner(learn)
interp.plot_confusion_matrix()
interp.plot_top_losses(5, nrows=1)
cleaner = ImageClassifierCleaner(learn)
for idx in cleaner.delete(): cleaner.fns[idx].unlink()
for idx,cat in cleaner.change(): shutil.move(str(cleaner.fns[idx]), path/cat)

上面是模型的清理工作和对数据的分析。

learn.export()
path = Path()
print(path.ls(file_exts='.pkl'))
learn_inf = load_learner(path/'export.pkl')
print(learn_inf.dls.vocab)

以上是模型的导入导出。

the ResNet architecture that we are using in this chapter comes in variants with 18, 34, 50, 101, and 152 layers, pretrained on ImageNet.

learn = vision_learner(dls, resnet18, y_range=(-1,1))

注意到一个新参数,y_range。当涉及到坐标的运算和预测时,我们默认把坐标值缩放到-1和1之间。

获得训练历史数据

dls = DataLoaders(dl, valid_dl)
learn = Learner(dls, nn.Linear(28*28,1), opt_func=SGD,
                loss_func=mnist_loss, metrics=batch_accuracy)
learn.fit(10, lr=lr)
values = learn.recorder.values
learn.recorder.plot_loss()

这个recorder.values里面的值就对应着训练过程中展示给你的表格的值的顺序。

当然,训练时间不计算在内。

学习率精调

在不同深度的神经网络中使用不同的学习率。

learn = vision_learner(dls, resnet34, metrics=error_rate)
lr_min,lr_steep = learn.lr_find(suggest_funcs=(minimum, steep))
learn.fit_one_cycle(6, lr_max=1e-5)
learn.fit_one_cycle(12, lr_max=slice(1e-6,1e-4))

显示结果(Actual/Predicted)

learn.show_results(ds_idx=1, nrows=3, figsize=(6,8))

使用预训练模型和fp16

from fastai.callback.fp16 import *
learn = vision_learner(dls, resnet50, metrics=error_rate).to_fp16()
learn.fine_tune(6, freeze_epochs=3)

选择和切换预训练模型

import timm
timm.list_models('convnext*')
###
['convnext_base',
, 'convnext_base_384_in22ft1k',
, 'convnext_base_in22ft1k',
, 'convnext_base_in22k',
, 'convnext_large',
, 'convnext_large_384_in22ft1k',
, 'convnext_large_in22ft1k',
, 'convnext_large_in22k',
, 'convnext_nano_hnf',
, 'convnext_small',
, 'convnext_small_384_in22ft1k',
, 'convnext_small_in22ft1k',
, 'convnext_small_in22k',
, 'convnext_tiny',
, 'convnext_tiny_384_in22ft1k',
, 'convnext_tiny_hnf',
, 'convnext_tiny_hnfd',
, 'convnext_tiny_in22ft1k',
, 'convnext_tiny_in22k',
, 'convnext_xlarge_384_in22ft1k',
, 'convnext_xlarge_in22ft1k',
, 'convnext_xlarge_in22k']
###
learn = vision_learner(dls, 'convnext_tiny_in22k', metrics=error_rate).to_fp16()

其中,后缀带有”in22”的意味着对应的模型在22000个类别的图像上进行了训练,结果更加准确。

Best Practices

  • 编程原则:the best way to start is to try it against one example at first. 对于深度学习的每一个环节,先试图对于一小段数据进行处理,然后再把所有数据按同样方法处理一遍;然后把刚才用来实验的代码封装到函数中,而不是先封装成函数再测试。
  • 先训练模型,再根据训练结果清理数据(第二节课)

常见问题

POSIX-PATH

import pathlib
temp = pathlib.PosixPath
pathlib.PosixPath = pathlib.WindowsPath

not ‘KeyboardModifier‘

import matplotlib
matplotlib.use("TkAgg")

如何提交csv文件

from kaggle import api
comp = 'paddy-disease-classification'
api.competition_submit_cli('subm.csv', 'initial rn26d 128px', comp)

fastkaggle:如何下载kaggle比赛的数据集

使用fastkaggle就好了,可以非常简单地解决这些问题!因为未知原因,导入fastkaggle时候会爆出很多警告,什么包过期了,运行不了了……实际上都不影响fastkaggle的使用,我就把这些警告都禁用了。

import warnings
warnings.filterwarnings('ignore')
from fastkaggle import *

然后fastkaggle有两个比较好用的功能,一个是iskaggle变量,如果你没有在kaggle中编辑,那么它就是一个空字符串。

要Authentication的话需要设置环境变量KAGGLE_API_TOKEN。

第二个就是帮你下载比赛的文件:

setup_comp('titanic')

就会照着比赛名称把数据集下载下来,返回对应的文件夹路径。好用吧?

运行代码之前导入这些库

对于空白的python,运行这些命令:torch一条是对于windows系统!

目前来说这几条命令就可以自洽地安装基本所有深度学习需要用到的库。就是pytorch占空间非常大,有将近3个G。

pip install fastbook
pip install fastkaggle
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu126 
pip install treeinterpreter
pip install waterfallcharts

然后是导入的库:

import os
from pathlib import Path
import torch, numpy as np, pandas as pd
from fastai.vision.all import *
from fastai.tabular.all import *
from fastai.text.all import *
from fastai.imports import *
from fastbook import *

清除IPython中占用的内存

def clean_ipython_hist():
    if not 'get_ipython' in globals(): return
    ip = get_ipython()
    user_ns = ip.user_ns
    ip.displayhook.flush()
    pc = ip.displayhook.prompt_count + 1
    for n in range(1, pc): user_ns.pop('_i'+repr(n),None)
    user_ns.update(dict(_i='',_ii='',_iii=''))
    hm = ip.history_manager
    hm.input_hist_parsed[:] = [''] * pc
    hm.input_hist_raw[:] = [''] * pc
    hm._i = hm._ii = hm._iii = hm._i00 =  ''
def clean_tb():
    if hasattr(sys, 'last_traceback'):
        traceback.clear_frames(sys.last_traceback)
        delattr(sys, 'last_traceback')
    if hasattr(sys, 'last_type'): delattr(sys, 'last_type')
    if hasattr(sys, 'last_value'): delattr(sys, 'last_value')
def clean_mem():
    clean_tb()
    clean_ipython_hist()
    gc.collect()
    torch.cuda.empty_cache()

只需要调用最后一个函数就行。

解决MPS问题

if "mps" == torch_device: os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = "1"

有些函数没有在MPS加速环境下被PyTorch实现,我们就只能用CPU版的FallBack版本。这就是这行代码做的事情,可以避免训练中断。

在Kaggle环境下使用MiniAI

加上下面三行命令:

!cp -r /kaggle/input/datasets/jasoncoderjia/miniai /tmp/miniai_pkg
!pip install /tmp/miniai_pkg
!pip install torcheval

Lesson 0

基于项目(很可能是BBZ)

重写notebook

twitter

博客 fastpages

kaggle

留下足迹