PyTorch MLP实现详细指南¶
我们将讨论以下三部分:
- 神经网络层搭建方法
- 前向传播定义方法
- 模型训练方法
暂时不涉及数据处理部分。每个部分后都会有相应的**习题**,帮助你通过练习加深理解。
1. 神经网络层搭建方法¶
在 PyTorch 中,构建神经网络层主要通过继承 nn.Module
类,并在 __init__
方法中定义所需的网络层。以下是一些常用的 PyTorch 单元组件及其实现方式:
1.1 线性层(全连接层)¶
组件说明:
- nn.Linear(in_features, out_features)
:定义一个线性变换,输入特征数为 in_features
,输出特征数为 out_features
。
示例:
import torch.nn as nn
class BasicMLP(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(BasicMLP, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size) # 第一层全连接
self.fc2 = nn.Linear(hidden_size, output_size) # 第二层全连接
def forward(self, x):
# 前向传播定义在下一部分
pass
1.2 卷积层¶
虽然我们当前专注于 MLP,但了解卷积层有助于扩展网络结构。
组件说明:
- nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0)
:二维卷积层。
示例:
import torch.nn.functional as F
class SimpleCNN(nn.Module):
def __init__(self):
super(SimpleCNN, self).__init__()
self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1) # 输入通道1,输出32通道,3x3卷积核
self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
return x
1.3 激活函数¶
激活函数在网络层之间引入非线性。PyTorch 提供多种激活函数,通过 torch.nn.functional
使用。
常用激活函数:
- ReLU:F.relu(x)
- Sigmoid:F.sigmoid(x)
- Tanh:F.tanh(x)
示例:
import torch.nn.functional as F
def forward(self, x):
x = F.relu(self.fc1(x)) # ReLU激活
x = self.fc2(x)
return x
1.4 Dropout层¶
Dropout用于防止过拟合,通过随机失活部分神经元。
组件说明:
- nn.Dropout(p)
:以概率 p
随机失活神经元。
示例:
import torch.nn.functional as F
class AdvancedMLP(nn.Module):
def __init__(self, input_size, output_size):
super(AdvancedMLP, self).__init__()
self.fc1 = nn.Linear(input_size, 512)
self.dropout = nn.Dropout(0.2) # 20%的dropout率
self.fc2 = nn.Linear(512, output_size)
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.dropout(x) # 应用Dropout
x = self.fc2(x)
return x
1.5 批归一化(Batch Normalization)¶
批归一化帮助加速训练并稳定网络。
组件说明:
- nn.BatchNorm1d(num_features)
:一维批归一化,常用于全连接层。
- nn.BatchNorm2d(num_features)
:二维批归一化,常用于卷积层。
示例:
import torch.nn.functional as F
class BNMLP(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(BNMLP, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.bn1 = nn.BatchNorm1d(hidden_size) # 第一层批归一化
self.fc2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = F.relu(self.bn1(self.fc1(x))) # 先线性变换,再批归一化,最后激活
x = self.fc2(x)
return x
习题 1:神经网络层搭建方法¶
-
构建一个包含四个隐藏层的MLP,每层的神经元数量分别为256、128、64、32。每个隐藏层后都添加ReLU激活和Dropout(0.3)的操作,最后输出层为10个节点。
提示: 你需要在
__init__
中定义所有层,并在forward
方法中按顺序调用它们。查看答案
参考答案:
import torch.nn as nn import torch.nn.functional as F class FourLayerMLP(nn.Module): def __init__(self, input_size, output_size): super(FourLayerMLP, self).__init__() self.fc1 = nn.Linear(input_size, 256) self.dropout1 = nn.Dropout(0.3) self.fc2 = nn.Linear(256, 128) self.dropout2 = nn.Dropout(0.3) self.fc3 = nn.Linear(128, 64) self.dropout3 = nn.Dropout(0.3) self.fc4 = nn.Linear(64, 32) self.dropout4 = nn.Dropout(0.3) self.output = nn.Linear(32, output_size) def forward(self, x): x = F.relu(self.fc1(x)) x = self.dropout1(x) x = F.relu(self.fc2(x)) x = self.dropout2(x) x = F.relu(self.fc3(x)) x = self.dropout3(x) x = F.relu(self.fc4(x)) x = self.dropout4(x) x = self.output(x) return x
-
实现一个带有批归一化的三层MLP,其中每个隐藏层使用不同的激活函数(如ReLU、Tanh)。
提示: 使用
nn.BatchNorm1d
为各层添加批归一化,选择不同的激活函数。查看答案
参考答案:
import torch.nn as nn import torch.nn.functional as F class BNThreeLayerMLP(nn.Module): def __init__(self, input_size, hidden_sizes, output_size): super(BNThreeLayerMLP, self).__init__() self.fc1 = nn.Linear(input_size, hidden_sizes[0]) self.bn1 = nn.BatchNorm1d(hidden_sizes[0]) self.fc2 = nn.Linear(hidden_sizes[0], hidden_sizes[1]) self.bn2 = nn.BatchNorm1d(hidden_sizes[1]) self.fc3 = nn.Linear(hidden_sizes[1], hidden_sizes[2]) self.bn3 = nn.BatchNorm1d(hidden_sizes[2]) self.output = nn.Linear(hidden_sizes[2], output_size) def forward(self, x): x = F.relu(self.bn1(self.fc1(x))) # 第一隐藏层使用ReLU x = F.tanh(self.bn2(self.fc2(x))) # 第二隐藏层使用Tanh x = F.relu(self.bn3(self.fc3(x))) # 第三隐藏层使用ReLU x = self.output(x) return x
-
扩展
SimpleCNN
类,添加第二个卷积层(输出64通道,3x3卷积核),并在每个卷积层后添加ReLU激活和最大池化层。提示: 在
__init__
中增加新的卷积层和池化层,在forward
方法中按顺序调用它们。查看答案
参考答案:
import torch.nn as nn import torch.nn.functional as F class ExtendedCNN(nn.Module): def __init__(self): super(ExtendedCNN, self).__init__() self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1) # 第一层卷积 self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1) # 第二层卷积 self.pool = nn.MaxPool2d(kernel_size=2, stride=2) def forward(self, x): x = F.relu(self.conv1(x)) x = self.pool(x) x = F.relu(self.conv2(x)) x = self.pool(x) return x
2. 前向传播定义方法¶
前向传播是数据通过网络进行预测的过程。在 nn.Module
的子类中,你需要定义 forward
方法,描述数据在各层之间的传递方式。
2.1 基本前向传播¶
示例:
def forward(self, x):
x = F.relu(self.fc1(x)) # 第一层线性变换 + ReLU激活
x = self.fc2(x) # 第二层线性变换
return x
2.2 包含Dropout的前向传播¶
示例:
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.dropout(x) # 应用Dropout
x = self.fc2(x)
return x
2.3 使用批归一化的前向传播¶
示例:
def forward(self, x):
x = F.relu(self.bn1(self.fc1(x))) # 线性变换 -> 批归一化 -> ReLU激活
x = self.fc2(x)
return x
2.4 多层前向传播¶
示例:
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
x = self.fc3(x)
return x
习题 2:MLP实现方法¶
-
为一个四层MLP(包含三个隐藏层)定义前向传播方法,其中每个隐藏层后都添加了ReLU激活和Dropout。
提示: 按照定义的层数依次调用,并在每层后添加激活和Dropout操作。
查看答案
参考答案:
import torch.nn as nn import torch.nn.functional as F class FourLayerMLPWithDropout(nn.Module): def __init__(self, input_size, hidden_sizes, output_size, dropout_p=0.3): super(FourLayerMLPWithDropout, self).__init__() self.fc1 = nn.Linear(input_size, hidden_sizes[0]) self.dropout1 = nn.Dropout(dropout_p) self.fc2 = nn.Linear(hidden_sizes[0], hidden_sizes[1]) self.dropout2 = nn.Dropout(dropout_p) self.fc3 = nn.Linear(hidden_sizes[1], hidden_sizes[2]) self.dropout3 = nn.Dropout(dropout_p) self.fc4 = nn.Linear(hidden_sizes[2], output_size) def forward(self, x): x = F.relu(self.fc1(x)) x = self.dropout1(x) x = F.relu(self.fc2(x)) x = self.dropout2(x) x = F.relu(self.fc3(x)) x = self.dropout3(x) x = self.fc4(x) return x
-
实现一个前向传播方法,该方法在每两个线性层之间添加了一个残差连接(Residual Connection)。
提示: 在前向传播中,将输入直接加到经过两层变换后的输出上。
查看答案
参考答案:
import torch.nn as nn import torch.nn.functional as F class ResidualMLP(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(ResidualMLP, self).__init__() self.fc1 = nn.Linear(input_size, hidden_size) self.fc2 = nn.Linear(hidden_size, hidden_size) self.fc3 = nn.Linear(hidden_size, output_size) self.relu = nn.ReLU() def forward(self, x): identity = x # 保存输入以便添加残差 out = self.fc1(x) out = self.relu(out) out = self.fc2(out) out += identity # 添加残差连接 out = self.relu(out) out = self.fc3(out) return out
-
为包含批归一化和不同激活函数的MLP定义前向传播方法。
提示: 使用相应的激活函数和批归一化层,确保顺序正确。
查看答案
参考答案:
import torch.nn as nn import torch.nn.functional as F class AdvancedBNMLP(nn.Module): def __init__(self, input_size, hidden_sizes, output_size): super(AdvancedBNMLP, self).__init__() self.fc1 = nn.Linear(input_size, hidden_sizes[0]) self.bn1 = nn.BatchNorm1d(hidden_sizes[0]) self.fc2 = nn.Linear(hidden_sizes[0], hidden_sizes[1]) self.bn2 = nn.BatchNorm1d(hidden_sizes[1]) self.fc3 = nn.Linear(hidden_sizes[1], hidden_sizes[2]) self.bn3 = nn.BatchNorm1d(hidden_sizes[2]) self.output = nn.Linear(hidden_sizes[2], output_size) def forward(self, x): x = self.fc1(x) x = self.bn1(x) x = F.relu(x) # 第一隐藏层使用ReLU x = self.fc2(x) x = self.bn2(x) x = F.tanh(x) # 第二隐藏层使用Tanh x = self.fc3(x) x = self.bn3(x) x = F.relu(x) # 第三隐藏层使用ReLU x = self.output(x) return x
3. 模型训练方法¶
模型训练涉及定义损失函数、优化器,并进行迭代训练以最小化损失。以下是 PyTorch 中常见的训练方法及组件的实现方式。
3.1 设备配置¶
确保模型和数据在同一设备上(CPU或GPU)。
示例:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = AdvancedMLP(input_size=784, output_size=10).to(device)
3.2 定义损失函数和优化器¶
损失函数:
- 分类问题常用 nn.CrossEntropyLoss
- 回归问题常用 nn.MSELoss
优化器:
- 常用优化器有 torch.optim.SGD
、torch.optim.Adam
等。
示例:
import torch.optim as optim
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 或使用Adam优化器
# optimizer = optim.Adam(model.parameters(), lr=0.001)
3.3 训练循环¶
训练过程包括多个 Epoch,每个 Epoch 包含若干 Batch。
示例:
def train(model, train_loader, optimizer, criterion, epochs):
model.train() # 设置模型为训练模式
for epoch in range(epochs):
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device) # 数据搬移到设备
optimizer.zero_grad() # 清零梯度
output = model(data) # 前向传播
loss = criterion(output, target) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
if batch_idx % 100 == 0:
print(f'Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')
3.4 评估函数¶
在训练过程中或之后,需要评估模型的性能。
示例:
def test(model, test_loader, criterion):
model.eval() # 设置模型为评估模式
test_loss = 0
correct = 0
with torch.no_grad(): # 关闭梯度计算
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += criterion(output, target).item() # 累积损失
pred = output.argmax(dim=1, keepdim=True) # 获取预测结果
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader)
accuracy = 100. * correct / len(test_loader.dataset)
print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%\n')
3.5 完整训练与评估流程¶
示例:
if __name__ == '__main__':
train(epochs=5)
test()
习题 3:模型训练方法¶
-
在训练循环中添加学习率调度器,使学习率每个Epoch下降为原来的0.7倍。
提示: 使用
torch.optim.lr_scheduler.StepLR
并在训练循环中调用scheduler.step()
。查看答案
参考答案:
import torch.optim as optim from torch.optim.lr_scheduler import StepLR def train_with_scheduler(model, train_loader, optimizer, criterion, scheduler, epochs): model.train() for epoch in range(epochs): for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = criterion(output, target) loss.backward() optimizer.step() if batch_idx % 100 == 0: print(f'Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}') scheduler.step() # 更新学习率 print(f'Learning rate after epoch {epoch}: {scheduler.get_last_lr()}') # 使用示例 optimizer = optim.Adam(model.parameters(), lr=0.01) scheduler = StepLR(optimizer, step_size=1, gamma=0.7) # 每个Epoch学习率乘以0.7 train_with_scheduler(model, train_loader, optimizer, criterion, scheduler, epochs=10)
-
实现早停(Early Stopping)机制,当验证集损失在连续3个Epoch中没有下降时,提前终止训练。
提示: 创建一个
EarlyStopping
类,并在每个Epoch结束后检查验证损失是否有下降。查看答案
参考答案:
import torch import numpy as np class EarlyStopping: def __init__(self, patience=3, verbose=False, delta=0): """ Args: patience (int): 在多少个Epoch内验证损失没有下降时停止训练 verbose (bool): 是否打印提示信息 delta (float): 验证损失下降的最小变化量 """ self.patience = patience self.verbose = verbose self.delta = delta self.counter = 0 self.best_loss = np.Inf self.early_stop = False def __call__(self, val_loss): if val_loss < self.best_loss - self.delta: self.best_loss = val_loss self.counter = 0 if self.verbose: print(f'验证损失改善,计数器重置为0') else: self.counter += 1 if self.verbose: print(f'验证损失没有改善,计数器增加到{self.counter}') if self.counter >= self.patience: self.early_stop = True if self.verbose: print('早停触发,停止训练') def train_with_early_stopping(model, train_loader, val_loader, optimizer, criterion, epochs, patience=3): early_stopping = EarlyStopping(patience=patience, verbose=True) for epoch in range(epochs): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = criterion(output, target) loss.backward() optimizer.step() # 评估在验证集上的表现 val_loss = 0 model.eval() with torch.no_grad(): for data, target in val_loader: data, target = data.to(device), target.to(device) output = model(data) loss = criterion(output, target) val_loss += loss.item() val_loss /= len(val_loader) print(f'Epoch: {epoch} Validation Loss: {val_loss:.6f}') early_stopping(val_loss) if early_stopping.early_stop: print("提前停止训练") break # 使用示例 train_with_early_stopping(model, train_loader, val_loader, optimizer, criterion, epochs=50, patience=3)
-
在训练过程中保存每个Epoch后模型的权重,并在训练结束后加载最佳模型进行测试。
提示: 使用
torch.save
保存模型权重,并在适当的位置使用torch.load
加载权重。查看答案
参考答案:
import torch import os def train_and_save_best_model(model, train_loader, val_loader, optimizer, criterion, epochs, save_path='best_model.pth'): best_val_loss = np.Inf for epoch in range(epochs): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = criterion(output, target) loss.backward() optimizer.step() # 评估在验证集上的表现 val_loss = 0 model.eval() with torch.no_grad(): for data, target in val_loader: data, target = data.to(device), target.to(device) output = model(data) loss = criterion(output, target) val_loss += loss.item() val_loss /= len(val_loader) print(f'Epoch: {epoch} Validation Loss: {val_loss:.6f}') # 保存最好的模型 if val_loss < best_val_loss: best_val_loss = val_loss torch.save(model.state_dict(), save_path) print(f'保存最佳模型,验证损失: {best_val_loss:.6f}') print('训练结束') def load_best_model(model, load_path='best_model.pth'): if os.path.exists(load_path): model.load_state_dict(torch.load(load_path)) model.to(device) print('加载最佳模型权重成功') else: print('最佳模型权重文件不存在') # 使用示例 train_and_save_best_model(model, train_loader, val_loader, optimizer, criterion, epochs=20, save_path='best_model.pth') load_best_model(model, load_path='best_model.pth') test(model, test_loader, criterion)