写给程序员的机器学习入门 (八) - 卷积神经网络 (CNN) - 图片分类和验证码识别(三)


风晓
风晓 2023-12-31 10:01:03 49716 赞同 0 反对 0
分类: 资源
接上一篇写给程序员的机器学习入门 (八) - 卷积神经网络 (CNN) - 图片分类和验证码识别(二)

以下是使用 ResNet-18 进行训练的代码:

import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目标的图片大小,全部图片都会先缩放到这个大小
IMAGE_SIZE = (32, 32)
# 分析目标的图片所在的文件夹
IMAGE_DIR = "./cifar"
# 包含所有图片标签的文本文件
IMAGE_LABELS_PATH = "./cifar/labels.txt"

class BasicBlock(nn.Module):
    """ResNet 使用的基础块"""
    expansion = 1 # 定义这个块的实际出通道是 channels_out 的几倍,这里的实现固定是一倍
    def __init__(self, channels_in, channels_out, stride):
        super().__init__()
        # 生成 3x3 的卷积层
        # 处理间隔 stride = 1 时,输出的长宽会等于输入的长宽,例如 (32-3+2)//1+1 == 32
        # 处理间隔 stride = 2 时,输出的长宽会等于输入的长宽的一半,例如 (32-3+2)//2+1 == 16
        # 此外 resnet 的 3x3 卷积层不使用偏移值 bias
        self.conv1 = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 再定义一个让输出和输入维度相同的 3x3 卷积层
        self.conv2 = nn.Sequential(
            nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 让原始输入和输出相加的时候,需要维度一致,如果维度不一致则需要整合
        self.identity = nn.Sequential()
        if stride != 1 or channels_in != channels_out * self.expansion:
            self.identity = nn.Sequential(
                nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channels_out * self.expansion))

    def forward(self, x):
        # x => conv1 => relu => conv2 => + => relu
        # |                              ^
        # |==============================|
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        tmp = self.conv2(tmp)
        tmp += self.identity(x)
        y = nn.functional.relu(tmp)
        return y

class MyModel(nn.Module):
    """图片分类 (ResNet-18)"""
    def __init__(self, num_labels, block_type = BasicBlock):
        super().__init__()
        # 记录上一层的出通道数量
        self.previous_channels_out = 64
        # 把 3 通道转换到 64 通道,长宽不变
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(self.previous_channels_out))
        # ResNet 使用的各个层
        self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
        self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
        self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
        self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
        # 把最后一层的长宽转换为 1x1 的池化层,Adaptive 表示会自动检测原有长宽
        # 例如 B,512,4,4 的矩阵会转换为 B,512,1,1,每个通道的单个值会是原有 16 个值的平均
        self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
        # 全连接层,只使用单层线性模型
        self.fc_model = nn.Linear(512 * block_type.expansion, num_labels)

    def _make_layer(self, block_type, channels_out, num_blocks, stride):
        blocks = []
        # 添加第一个块
        blocks.append(block_type(self.previous_channels_out, channels_out, stride))
        self.previous_channels_out = channels_out * block_type.expansion
        # 添加剩余的块,剩余的块固定处理间隔为 1,不会改变长宽
        for _ in range(num_blocks-1):
            blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
            self.previous_channels_out *= block_type.expansion
        return nn.Sequential(*blocks)

    def forward(self, x):
        # 转换出通道到 64
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        # 应用 ResNet 的各个层
        tmp = self.layer1(tmp)
        tmp = self.layer2(tmp)
        tmp = self.layer3(tmp)
        tmp = self.layer4(tmp)
        # 转换长宽到 1x1
        tmp = self.avgPool(tmp)
        # 扁平化,维度会变为 B,512
        tmp = tmp.view(tmp.shape[0], -1)
        # 应用全连接层
        y = self.fc_model(tmp)
        return y

def save_tensor(tensor, path):
    """保存 tensor 对象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """从文件读取 tensor 对象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """转换图片对象到 tensor 对象"""
    in_img = img.resize(IMAGE_SIZE)
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 转换维度 H,W,C 到 C,W,H
    t = t / 255.0 # 正规化数值使得范围在 0 ~ 1
    return t

def load_image_labels():
    """读取图片分类列表"""
    return list(filter(None, open(IMAGE_LABELS_PATH).read().split()))

def prepare_save_batch(batch, tensor_in, tensor_out):
    """准备训练 - 保存单个批次的数据"""
    # 切分训练集 (80%),验证集 (10%) 和测试集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盘
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """准备训练"""
    # 数据集转换到 tensor 以后会保存在 data 文件夹下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 准备图片分类到序号的索引
    labels_to_index = { label: index for index, label in enumerate(load_image_labels()) }

    # 查找所有图片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 分类名称在文件名中,例如
            # 2598_cat.png => cat
            label = filename.split(".")[0].split("_")[1]
            label_index = labels_to_index.get(label)
            if label_index is None:
                continue
            image_paths.append((path, label_index))

    # 打乱图片顺序
    random.shuffle(image_paths)

    # 分批读取和保存图片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, label_index in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                t = image_to_tensor(img)
                image_tensors.append(t)
            image_labels.append(label_index)
        tensor_in = torch.stack(image_tensors) # 维度: B,C,W,H
        tensor_out = torch.tensor(image_labels) # 维度: B
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """开始训练"""
    # 创建模型实例
    num_labels = len(load_image_labels())
    model = MyModel(num_labels)

    # 创建损失计算器
    # 计算单分类输出最好使用 CrossEntropyLoss, 多分类输出最好使用 BCELoss
    # 使用 CrossEntropyLoss 时实际输出应该为标签索引值,不需要转换为 onehot
    loss_function = torch.nn.CrossEntropyLoss()

    # 创建参数调整器
    optimizer = torch.optim.Adam(model.parameters())

    # 记录训练集和验证集的正确率变化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 记录最高的验证集正确率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 读取批次的工具函数
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 计算正确率的工具函数
    def calc_accuracy(actual, predicted):
        # 把最大的值当作正确分类,然后比对有多少个分类相等
        predicted_labels = predicted.argmax(dim=1)
        acc = (actual == predicted_labels).sum().item() / actual.shape[0]
        return acc

    # 划分输入和输出的工具函数
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 开始训练过程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根据训练集训练并修改参数
        # 切换模型到训练模式,将会启用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助于泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 划分输入和输出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 计算预测值
                predicted = model(batch_x)
                # 计算损失
                loss = loss_function(predicted, batch_y)
                # 从损失自动微分求导函数值
                loss.backward()
                # 使用参数调整器调整参数
                optimizer.step()
                # 清空导函数值
                optimizer.zero_grad()
                # 记录这一个批次的正确率,torch.no_grad 代表临时禁用自动微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 输出批次正确率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 检查验证集
        # 切换模型到验证模式,将会禁用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 记录最高的验证集正确率与当时的模型状态,判断是否在 20 次训练后仍然没有刷新记录
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次训练后仍然没有刷新记录,结束训练
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用达到最高正确率时的模型状态
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 检查测试集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 显示训练集和验证集的正确率变化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用训练好的模型"""
    # 创建模型实例,加载训练好的状态,然后切换到验证模式
    labels = load_image_labels()
    num_labels = len(labels)
    model = MyModel(num_labels)
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 询问图片路径,并显示可能的分类一览
    while True:
        try:
            # 构建输入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 维度 C,W,H => 1,C,W,H
            # 预测输出
            tensor_out = model(tensor_in)
            # 转换到各个分类对应的概率
            tensor_out = nn.functional.softmax(tensor_out, dim=1)
            # 显示按概率排序后的分类一览
            rates = (t.item() for t in tensor_out[0])
            label_with_rates = list(zip(labels, rates))
            label_with_rates.sort(key=lambda p:-p[1])
            for label, rate in label_with_rates[:5]:
                rate = rate * 100
                print(f"{label}: {rate:0.2f}%")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函数"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 给随机数生成器分配一个初始值,使得每次运行都可以生成相同的随机数
    # 这是为了让过程可重现,你也可以选择不这样做
    random.seed(0)
    torch.random.manual_seed(0)

    # 根据命令行参数选择操作
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

最终输出结果如下,可以看到训练集正确率达到了 99%,验证集正确率达到了 85%,测试集正确率达到了 84%,比起上面的 LeNet 模型改进了很多吧🤗。

training accuracy: 0.9972708333333337
validating accuracy: 0.8373333333333337
stop training because highest validating accuracy not updated in 20 epoches
highest validating accuracy: 0.8521666666666667 from epoch 38
testing accuracy: 0.8464999999999996

随便在网上找的猫狗图片:

输出结果如下,不错吧:

Image path: BlogArchive/ml-08/cat.jpg
cat: 100.00%
dog: 0.00%
frog: 0.00%
deer: 0.00%
horse: 0.00%

Image path: BlogArchive/ml-08/dog.jpg
dog: 100.00%
bird: 0.00%
deer: 0.00%
frog: 0.00%
horse: 0.00%

pytorch 有专门用于处理视觉信息的 torchvision,其中包含了 ResNet 的实现,也就是说其实我们不用自己去写🤒,如果你有兴趣可以参考里面的实现代码,再试试 ResNet-50 等层数更多的模型是否可以带来更好的效果。

AI 鉴黄

相信很多人都看过 AI 鉴黄的新闻🥴🤭🥺,如果你想自己实现一个,可以从 nsfw_data_scraper 下载图片资源然后使用上面介绍的方法训练,识别起来会比 cifar 简单很多。因为实际只需要两个标签(1 黄色图片,0 正常图片),所以也可以使用单个值代表结果,然后用 sigmoid 代替 softmax。此外你也可以在 github 上搜索 nsfw 找到现成的模型。

使用 CNN 实现验证码识别 (ResNet-18)

最后再给出一个实用的例子。很多网站为了防机器人操作会使用验证码机制,传统的验证码会显示一张包含数字字母的图片,然后让用户填写里面的内容再对比是否正确,来判断用户是普通人还是机器人,这样的验证码可以用本篇介绍的 CNN 模型识别出来😈。

首先我们来选一个生成验证码的类库,github 上搜索 captcha c# 里面难度相对比较高的是 Hei.Captcha,这篇就使用 CNN 模型识别这个类库生成的验证码。(我的 zkweb 里面也有生成验证码的模块,但难度比较低所以就不用了)

以下步骤和代码会生成十万张用于训练和测试使用的验证码图片:

mkdir generate-captcha
cd generate-captcha
dotnet new console
dotnet add package Hei.Captcha
mkdir output
mkdir fonts
cd fonts
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/Candara.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/STCAIYUN.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/impact.ttf?raw=true
wget https://github.com/gebiWangshushu/Hei.Captcha/blob/master/Demo/fonts/monbaiti.ttf?raw=true
cd ..
# 添加程序代码
dotnet run -c Release
using System;
using System.IO;
using Hei.Captcha;

namespace generate_captcha
{
    class Program
    {
        static void Main(string[] args)
        {
            var helper = new SecurityCodeHelper();
            var iterations = 100000;
            for (var x = 0; x < iterations; ++x)
            {
                var code = helper.GetRandomEnDigitalText(4);
                var bytes = helper.GetEnDigitalCodeByte(code);
                File.WriteAllBytes($"output/{x:D5}-{code}.png", bytes);
                if (x % 100 == 0)
                    Console.WriteLine($"{x}/{iterations}");
            }
        }
    }
}

以下是生成的验证码图片例子,变形旋转干扰线动态背景色该有的都有😠:

接下来我们想想应该用什么数据结构来表达验证码。在图片识别的例子中有十个分类,我们用了 onehot 编码,即使用长度为 10 的 tensor 对象来表示结果,正确的分类为 1,不正确的分类为 0。换成验证码以后,可以用长度为 36 的 tensor 对象来表示 1 位验证码 (26 个英文数字 + 10 个字母,假设验证码不分大小写),如果有多位则可以 36 * 位数的 tensor 对象来表达多位验证码。以下函数可以把验证码转换为对应的 tensor 对象:

# 字母数字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 验证码位数
DIGITS = 4
# 标签数量,字母数字混合*位数
NUM_LABELS = len(ALPHA_NUMS)*DIGITS

def code_to_tensor(code):
    """转换验证码到 tensor 对象,使用 onehot 编码"""
    t = torch.zeros((NUM_LABELS,))
    code = code.lower() # 验证码不分大小写
    for index, c in enumerate(code):
        p = ALPHA_NUMS_MAP[c]
        t[index*len(ALPHA_NUMS)+p] = 1
    return t

转换例子如下:

>>> code_to_tensor("abcd")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
>>> code_to_tensor("a123")
tensor([1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.])

反过来也一样,我们可以把 tensor 的长度按 36 分为多组,然后求每一组最大的值所在的索引,再根据该索引找到对应的字母或者数字,就可以把 tensor 对象转换回验证码:

def tensor_to_code(tensor):
    """转换 tensor 对象到验证码"""
    tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
    indices = tensor.max(dim=1).indices
    code = "".join(ALPHA_NUMS[index] for index in indices)
    return code

接下来就可以用前面介绍过的 ResNet-18 模型进行训练了😎,相比前面的图片分类,这份代码有以下几点不同:

  • 因为是多分类,损失计算器应该使用 BCELoss 代替 CrossEntropyLoss
  • BCELoss 要求模型输出值范围在 0 ~ 1 之间,所以需要在模型内部添加控制函数 (CrossEntropyLoss 这么做会影响训练效果,但 BCELoss 不会)
  • 因为每一组都只有一个值是正确的,用 softmax 效果会比 sigmoid 要好 (普通的多分类问题会使用 sigmoid)
import os
import sys
import torch
import gzip
import itertools
import random
import numpy
import json
from PIL import Image
from torch import nn
from matplotlib import pyplot

# 分析目标的图片大小,全部图片都会先缩放到这个大小
# 验证码原图是 120x50
IMAGE_SIZE = (56, 24)
# 分析目标的图片所在的文件夹
IMAGE_DIR = "./generate-captcha/output/"
# 字母数字列表
ALPHA_NUMS = "abcdefghijklmnopqrstuvwxyz0123456789"
ALPHA_NUMS_MAP = { c: index for index, c in enumerate(ALPHA_NUMS) }
# 验证码位数
DIGITS = 4
# 标签数量,字母数字混合*位数
NUM_LABELS = len(ALPHA_NUMS)*DIGITS

class BasicBlock(nn.Module):
    """ResNet 使用的基础块"""
    expansion = 1 # 定义这个块的实际出通道是 channels_out 的几倍,这里的实现固定是一倍
    def __init__(self, channels_in, channels_out, stride):
        super().__init__()
        # 生成 3x3 的卷积层
        # 处理间隔 stride = 1 时,输出的长宽会等于输入的长宽,例如 (32-3+2)//1+1 == 32
        # 处理间隔 stride = 2 时,输出的长宽会等于输入的长宽的一半,例如 (32-3+2)//2+1 == 16
        # 此外 resnet 的 3x3 卷积层不使用偏移值 bias
        self.conv1 = nn.Sequential(
            nn.Conv2d(channels_in, channels_out, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 再定义一个让输出和输入维度相同的 3x3 卷积层
        self.conv2 = nn.Sequential(
            nn.Conv2d(channels_out, channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(channels_out))
        # 让原始输入和输出相加的时候,需要维度一致,如果维度不一致则需要整合
        self.identity = nn.Sequential()
        if stride != 1 or channels_in != channels_out * self.expansion:
            self.identity = nn.Sequential(
                nn.Conv2d(channels_in, channels_out * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channels_out * self.expansion))

    def forward(self, x):
        # x => conv1 => relu => conv2 => + => relu
        # |                              ^
        # |==============================|
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        tmp = self.conv2(tmp)
        tmp += self.identity(x)
        y = nn.functional.relu(tmp)
        return y

class MyModel(nn.Module):
    """识别验证码 (ResNet-18)"""
    def __init__(self, block_type = BasicBlock):
        super().__init__()
        # 记录上一层的出通道数量
        self.previous_channels_out = 64
        # 把 3 通道转换到 64 通道,长宽不变
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, self.previous_channels_out, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(self.previous_channels_out))
        # ResNet 使用的各个层
        self.layer1 = self._make_layer(block_type, channels_out=64, num_blocks=2, stride=1)
        self.layer2 = self._make_layer(block_type, channels_out=128, num_blocks=2, stride=2)
        self.layer3 = self._make_layer(block_type, channels_out=256, num_blocks=2, stride=2)
        self.layer4 = self._make_layer(block_type, channels_out=512, num_blocks=2, stride=2)
        # 把最后一层的长宽转换为 1x1 的池化层,Adaptive 表示会自动检测原有长宽
        # 例如 B,512,4,4 的矩阵会转换为 B,512,1,1,每个通道的单个值会是原有 16 个值的平均
        self.avgPool = nn.AdaptiveAvgPool2d((1, 1))
        # 全连接层,只使用单层线性模型
        self.fc_model = nn.Linear(512 * block_type.expansion, NUM_LABELS)
        # 控制输出在 0 ~ 1 之间,BCELoss 需要
        # 因为每组只应该有一个值为真,使用 softmax 效果会比 sigmoid 好
        self.softmax = nn.Softmax(dim=2)

    def _make_layer(self, block_type, channels_out, num_blocks, stride):
        blocks = []
        # 添加第一个块
        blocks.append(block_type(self.previous_channels_out, channels_out, stride))
        self.previous_channels_out = channels_out * block_type.expansion
        # 添加剩余的块,剩余的块固定处理间隔为 1,不会改变长宽
        for _ in range(num_blocks-1):
            blocks.append(block_type(self.previous_channels_out, self.previous_channels_out, 1))
            self.previous_channels_out *= block_type.expansion
        return nn.Sequential(*blocks)

    def forward(self, x):
        # 转换出通道到 64
        tmp = self.conv1(x)
        tmp = nn.functional.relu(tmp)
        # 应用 ResNet 的各个层
        tmp = self.layer1(tmp)
        tmp = self.layer2(tmp)
        tmp = self.layer3(tmp)
        tmp = self.layer4(tmp)
        # 转换长宽到 1x1
        tmp = self.avgPool(tmp)
        # 扁平化,维度会变为 B,512
        tmp = tmp.view(tmp.shape[0], -1)
        # 应用全连接层
        tmp = self.fc_model(tmp)
        # 划分每个字符对应的组,之后维度为 batch_size, digits, alpha_nums
        tmp = tmp.reshape(tmp.shape[0], DIGITS, len(ALPHA_NUMS))
        # 应用 softmax 到每一组
        tmp = self.softmax(tmp)
        # 重新扁平化,之后维度为 batch_size, num_labels
        y = tmp.reshape(tmp.shape[0], NUM_LABELS)
        return y

def save_tensor(tensor, path):
    """保存 tensor 对象到文件"""
    torch.save(tensor, gzip.GzipFile(path, "wb"))

def load_tensor(path):
    """从文件读取 tensor 对象"""
    return torch.load(gzip.GzipFile(path, "rb"))

def image_to_tensor(img):
    """转换图片对象到 tensor 对象"""
    in_img = img.resize(IMAGE_SIZE)
    in_img = in_img.convert("RGB") # 转换图片模式到 RGB
    arr = numpy.asarray(in_img)
    t = torch.from_numpy(arr)
    t = t.transpose(0, 2) # 转换维度 H,W,C 到 C,W,H
    t = t / 255.0 # 正规化数值使得范围在 0 ~ 1
    return t

def code_to_tensor(code):
    """转换验证码到 tensor 对象,使用 onehot 编码"""
    t = torch.zeros((NUM_LABELS,))
    code = code.lower() # 验证码不分大小写
    for index, c in enumerate(code):
        p = ALPHA_NUMS_MAP[c]
        t[index*len(ALPHA_NUMS)+p] = 1
    return t

def tensor_to_code(tensor):
    """转换 tensor 对象到验证码"""
    tensor = tensor.reshape(DIGITS, len(ALPHA_NUMS))
    indices = tensor.max(dim=1).indices
    code = "".join(ALPHA_NUMS[index] for index in indices)
    return code

def prepare_save_batch(batch, tensor_in, tensor_out):
    """准备训练 - 保存单个批次的数据"""
    # 切分训练集 (80%),验证集 (10%) 和测试集 (10%)
    random_indices = torch.randperm(tensor_in.shape[0])
    training_indices = random_indices[:int(len(random_indices)*0.8)]
    validating_indices = random_indices[int(len(random_indices)*0.8):int(len(random_indices)*0.9):]
    testing_indices = random_indices[int(len(random_indices)*0.9):]
    training_set = (tensor_in[training_indices], tensor_out[training_indices])
    validating_set = (tensor_in[validating_indices], tensor_out[validating_indices])
    testing_set = (tensor_in[testing_indices], tensor_out[testing_indices])

    # 保存到硬盘
    save_tensor(training_set, f"data/training_set.{batch}.pt")
    save_tensor(validating_set, f"data/validating_set.{batch}.pt")
    save_tensor(testing_set, f"data/testing_set.{batch}.pt")
    print(f"batch {batch} saved")

def prepare():
    """准备训练"""
    # 数据集转换到 tensor 以后会保存在 data 文件夹下
    if not os.path.isdir("data"):
        os.makedirs("data")

    # 查找所有图片
    image_paths = []
    for root, dirs, files in os.walk(IMAGE_DIR):
        for filename in files:
            path = os.path.join(root, filename)
            if not path.endswith(".png"):
                continue
            # 验证码在文件名中,例如
            # 00000-R865.png => R865
            code = filename.split(".")[0].split("-")[1]
            image_paths.append((path, code))

    # 打乱图片顺序
    random.shuffle(image_paths)

    # 分批读取和保存图片
    batch_size = 1000
    for batch in range(0, len(image_paths) // batch_size):
        image_tensors = []
        image_labels = []
        for path, code in image_paths[batch*batch_size:(batch+1)*batch_size]:
            with Image.open(path) as img:
                image_tensors.append(image_to_tensor(img))
            image_labels.append(code_to_tensor(code))
        tensor_in = torch.stack(image_tensors) # 维度: B,C,W,H
        tensor_out = torch.stack(image_labels) # 维度: B,N
        prepare_save_batch(batch, tensor_in, tensor_out)

def train():
    """开始训练"""
    # 创建模型实例
    model = MyModel()

    # 创建损失计算器
    # 计算多分类输出最好使用 BCELoss
    loss_function = torch.nn.BCELoss()

    # 创建参数调整器
    optimizer = torch.optim.Adam(model.parameters())

    # 记录训练集和验证集的正确率变化
    training_accuracy_history = []
    validating_accuracy_history = []

    # 记录最高的验证集正确率
    validating_accuracy_highest = -1
    validating_accuracy_highest_epoch = 0

    # 读取批次的工具函数
    def read_batches(base_path):
        for batch in itertools.count():
            path = f"{base_path}.{batch}.pt"
            if not os.path.isfile(path):
                break
            yield load_tensor(path)

    # 计算正确率的工具函数
    def calc_accuracy(actual, predicted):
        # 把每一位的最大值当作正确字符,然后比对有多少个字符相等
        actual_indices = actual.reshape(actual.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
        predicted_indices = predicted.reshape(predicted.shape[0], DIGITS, len(ALPHA_NUMS)).max(dim=2).indices
        matched = (actual_indices - predicted_indices).abs().sum(dim=1) == 0
        acc = matched.sum().item() / actual.shape[0]
        return acc
 
    # 划分输入和输出的工具函数
    def split_batch_xy(batch, begin=None, end=None):
        # shape = batch_size, channels, width, height
        batch_x = batch[0][begin:end]
        # shape = batch_size, num_labels
        batch_y = batch[1][begin:end]
        return batch_x, batch_y

    # 开始训练过程
    for epoch in range(1, 10000):
        print(f"epoch: {epoch}")

        # 根据训练集训练并修改参数
        # 切换模型到训练模式,将会启用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.train()
        training_accuracy_list = []
        for batch_index, batch in enumerate(read_batches("data/training_set")):
            # 切分小批次,有助于泛化模型
            training_batch_accuracy_list = []
            for index in range(0, batch[0].shape[0], 100):
                # 划分输入和输出
                batch_x, batch_y = split_batch_xy(batch, index, index+100)
                # 计算预测值
                predicted = model(batch_x)
                # 计算损失
                loss = loss_function(predicted, batch_y)
                # 从损失自动微分求导函数值
                loss.backward()
                # 使用参数调整器调整参数
                optimizer.step()
                # 清空导函数值
                optimizer.zero_grad()
                # 记录这一个批次的正确率,torch.no_grad 代表临时禁用自动微分功能
                with torch.no_grad():
                    training_batch_accuracy_list.append(calc_accuracy(batch_y, predicted))
            # 输出批次正确率
            training_batch_accuracy = sum(training_batch_accuracy_list) / len(training_batch_accuracy_list)
            training_accuracy_list.append(training_batch_accuracy)
            print(f"epoch: {epoch}, batch: {batch_index}: batch accuracy: {training_batch_accuracy}")
        training_accuracy = sum(training_accuracy_list) / len(training_accuracy_list)
        training_accuracy_history.append(training_accuracy)
        print(f"training accuracy: {training_accuracy}")

        # 检查验证集
        # 切换模型到验证模式,将会禁用自动微分,批次正规化 (BatchNorm) 与 Dropout
        model.eval()
        validating_accuracy_list = []
        for batch in read_batches("data/validating_set"):
            batch_x, batch_y = split_batch_xy(batch)
            predicted = model(batch_x)
            validating_accuracy_list.append(calc_accuracy(batch_y, predicted))
        validating_accuracy = sum(validating_accuracy_list) / len(validating_accuracy_list)
        validating_accuracy_history.append(validating_accuracy)
        print(f"validating accuracy: {validating_accuracy}")

        # 记录最高的验证集正确率与当时的模型状态,判断是否在 20 次训练后仍然没有刷新记录
        if validating_accuracy > validating_accuracy_highest:
            validating_accuracy_highest = validating_accuracy
            validating_accuracy_highest_epoch = epoch
            save_tensor(model.state_dict(), "model.pt")
            print("highest validating accuracy updated")
        elif epoch - validating_accuracy_highest_epoch > 20:
            # 在 20 次训练后仍然没有刷新记录,结束训练
            print("stop training because highest validating accuracy not updated in 20 epoches")
            break

    # 使用达到最高正确率时的模型状态
    print(f"highest validating accuracy: {validating_accuracy_highest}",
        f"from epoch {validating_accuracy_highest_epoch}")
    model.load_state_dict(load_tensor("model.pt"))

    # 检查测试集
    testing_accuracy_list = []
    for batch in read_batches("data/testing_set"):
        batch_x, batch_y = split_batch_xy(batch)
        predicted = model(batch_x)
        testing_accuracy_list.append(calc_accuracy(batch_y, predicted))
    testing_accuracy = sum(testing_accuracy_list) / len(testing_accuracy_list)
    print(f"testing accuracy: {testing_accuracy}")

    # 显示训练集和验证集的正确率变化
    pyplot.plot(training_accuracy_history, label="training")
    pyplot.plot(validating_accuracy_history, label="validing")
    pyplot.ylim(0, 1)
    pyplot.legend()
    pyplot.show()

def eval_model():
    """使用训练好的模型"""
    # 创建模型实例,加载训练好的状态,然后切换到验证模式
    model = MyModel()
    model.load_state_dict(load_tensor("model.pt"))
    model.eval()

    # 询问图片路径,并显示可能的分类一览
    while True:
        try:
            # 构建输入
            image_path = input("Image path: ")
            if not image_path:
                continue
            with Image.open(image_path) as img:
                tensor_in = image_to_tensor(img).unsqueeze(0) # 维度 C,W,H => 1,C,W,H
            # 预测输出
            tensor_out = model(tensor_in)
            # 转换到验证码
            code = tensor_to_code(tensor_out[0])
            print(f"code: {code}")
            print()
        except Exception as e:
            print("error:", e)

def main():
    """主函数"""
    if len(sys.argv) < 2:
        print(f"Please run: {sys.argv[0]} prepare|train|eval")
        exit()

    # 给随机数生成器分配一个初始值,使得每次运行都可以生成相同的随机数
    # 这是为了让过程可重现,你也可以选择不这样做
    random.seed(0)
    torch.random.manual_seed(0)

    # 根据命令行参数选择操作
    operation = sys.argv[1]
    if operation == "prepare":
        prepare()
    elif operation == "train":
        train()
    elif operation == "eval":
        eval_model()
    else:
        raise ValueError(f"Unsupported operation: {operation}")

if __name__ == "__main__":
    main()

因为训练需要大量时间而我机器只有 CPU 可以用,所以这次我就只训练到 epoch 23 🤢,训练结果如下。可以看到训练集正确率达到了 98%,验证集正确率达到了 91%,已经是实用的级别了。

epoch: 23, batch: 98: batch accuracy: 0.99125
epoch: 23, batch: 99: batch accuracy: 0.9862500000000001
training accuracy: 0.9849874999999997
validating accuracy: 0.9103000000000003
highest validating accuracy updated

使用训练好的模型识别验证码,你可以对比上面的图片看看是不是识别对了 (第二张的 P 看起来很像 D 🤒):

$ python3 example.py eval
Image path: BlogArchive/ml-08/captcha-1.png
code: 8ca6

Image path: BlogArchive/ml-08/captcha-2.png
code: tp8s

Image path: BlogArchive/ml-08/captcha-3.png
code: k225

注意这里介绍出来的模型只能识别这一种验证码,其他不同种类的验证码需要分别训练和生成模型,做打码平台的话会先识别验证码种类再使用该种类对应的模型识别验证码内容。如果你的目标只是单种验证码,那么用这篇文章介绍的方法应该可以帮你节省调打码平台的钱 🤠。如果你机器有好显卡,也可以试试用更高级的模型提升正确率。

此外,有很多人问我现在流行的滑动验证码如何破解,其实破解这种验证码只需要做简单的图片分析,例如这里这里都没有使用机器学习。但滑动验证码一般会配合浏览器指纹和鼠标轨迹采集一起使用,后台会根据大量数据分析用户是普通人还是机器人,所以破解几次很简单,但一直破解下去则会有很大几率被检测出来。

使用 torchvision 里面的 resnet 模型

在前文我们看到了怎么组合卷积层和池化层自己实现 LeNet 和 ResNet-18,我们还可以使用 torchvision 中现成的模型,以下是修改识别验证码的模型到 torchvision 提供的 ResNet 实现的代码:

# 文件开头引用 torchvision 库
import torchvision

# 替换原有代码中的 MyModel 类,BasicBlock 可以删掉
class MyModel(nn.Module):
    """识别验证码 (ResNet-18)"""
    def __init__(self):
        super().__init__()
        # Resnet 的实现
        self.resnet = torchvision.models.resnet18(num_classes=NUM_LABELS)
        # 控制输出在 0 ~ 1 之间,BCELoss 需要
        # 因为每组只应该有一个值为真,使用 softmax 效果会比 sigmoid 好
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x):
        # 应用 ResNet
        tmp = self.resnet(x)
        # 划分每个字符对应的组,之后维度为 batch_size, digits, alpha_nums
        tmp = tmp.reshape(tmp.shape[0], DIGITS, len(ALPHA_NUMS))
        # 应用 softmax 到每一组
        tmp = self.softmax(tmp)
        # 重新扁平化,之后维度为 batch_size, num_labels
        y = tmp.reshape(tmp.shape[0], NUM_LABELS)
        return y

是不是简单了很多?如果我们想使用 ResNet-50 可以把 resnet18 改为 resnet50 即可切换。虽然使用现成的模型方便,但了解下它们的原理和计算方式总是有好处的😇。

如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!

评价 0 条
风晓L1
粉丝 1 资源 2038 + 关注 私信
最近热门资源
银河麒麟桌面操作系统备份用户数据  127
统信桌面专业版【全盘安装UOS系统】介绍  122
银河麒麟桌面操作系统安装佳能打印机驱动方法  114
银河麒麟桌面操作系统 V10-SP1用户密码修改  105
最近下载排行榜
银河麒麟桌面操作系统备份用户数据 0
统信桌面专业版【全盘安装UOS系统】介绍 0
银河麒麟桌面操作系统安装佳能打印机驱动方法 0
银河麒麟桌面操作系统 V10-SP1用户密码修改 0
作者收入月榜
1

prtyaa 收益393.62元

2

zlj141319 收益218元

3

1843880570 收益214.2元

4

IT-feng 收益209.03元

5

风晓 收益208.24元

6

777 收益172.71元

7

Fhawking 收益106.6元

8

信创来了 收益105.84元

9

克里斯蒂亚诺诺 收益91.08元

10

技术-小陈 收益79.5元

请使用微信扫码

加入交流群

请使用微信扫一扫!