文章目录
一、循环神经网络概念
二、RNN基本结构
1.输入层(Input Layer)
2.循环隐藏层(Recurrent Hidden Layer)
3. 输出层(Output Layer)
三、RNN数学原理
四、LSTM(长短时记忆网络)
五、使用 RNN 模型来处理序列数据并进行分类
六、LSTM实现时间序列预测
参考文章
一、循环神经网络概念
循环神经网络(Recurrent Neural Networks, RNN)是一类神经网络架构,专门用于处理序列数据,能够捕捉时间序列或有序数据的动态信息,能够处理序列数据,如文本、时间序列或音频。RNN 在自然语言处理(NLP)、语音识别、时间序列预测等任务中有着广泛的应用。RNN 的关键特性是其能够保持隐状态(hidden state),使得网络能够记住先前时间步的信息,这对于处理序列数据至关重要。
循环神经网络其核心特点是神经元之间存在循环连接,使得网络能够利用内部状态(隐藏状态)来记忆先前的输入信息,从而捕捉数据中的时间依赖关系;它通过在每个时间步共享参数并递归地更新隐藏状态,理论上可以处理任意长度的序列,但传统RNN存在梯度消失或梯度爆炸问题,因此实际应用中常采用LSTM、GRU等改进变体来增强长程记忆能力。
二、RNN基本结构
循环神经网络的结构核心在于循环连接与参数共享:网络在每个时间步接收当前输入和前一时刻的隐藏状态,通过相同的权重矩阵计算当前隐藏状态并输出,这种链式结构使信息能在时间维度上传递,形成"记忆"效果;其典型结构包括输入层、循环隐藏层(含自连接神经元)和输出层,隐藏层状态作为内部记忆存储历史信息,而展开的等效结构则呈现为多个相同单元按时间步串联的深层网络形态。
1.输入层(Input Layer)
功能与作用
1.接收序列数据中的单个时间步(time step)的输入向量,通常表示为 xt。
2.将外部输入转换为适合网络处理的维度,但不执行非线性变换,仅做线性映射或维度调整。
3.对于不同模态的数据,输入层处理方式各异:文本需经过嵌入层(Embedding)将离散的词/字符转为稠密向量;时间序列数据(如股价、传感器数据)可直接输入数值向量;语音信号则通常先经过特征提取(如MFCC)再输入。
2.循环隐藏层(Recurrent Hidden Layer)
核心机制
这是RNN区别于前馈网络(如CNN、MLP)的根本所在。隐藏层不仅接收当前输入,还接收前一时刻自身的输出(即隐藏状态),形成循环反馈。
隐藏层之间存在循环连接,使得网络能够维护一个“记忆”状态,这一状态包含了过去的信息。这使得RNN能够理解序列中的上下文信息。
3. 输出层(Output Layer)
功能定位
将当前时刻的隐藏状态转换为任务所需的输出格式,输出内容取决于具体应用场景。
三、RNN数学原理
给出一个典型的RNN:
在图中:有一条单向流动的信息流是从输入单元到达隐藏单元的,与此同时另一条单向流动的信息流从隐藏单元到达输出单元。在某些情况下,RNNs会打破后者的限制,引导信息从输出单元返回隐藏单元,这些被称为“Back Projections”,并且隐藏层的输入还包括上一隐藏层的状态,即隐藏层内的节点可以自连也可以互连。(这实际上就是LSTM)
右侧为计算时便于理解记忆而产开的结构。简单说,x为输入层,o为输出层,s为隐含层,而t指第几次的计算;V,W,U为权重,其中计算第t次的隐含层状态时为:
下图为直观表达:
按照上图所示,可知道RNN网络前向传播过程中满足下面的公式:
其代价函数可以是重构的误差:
也可以是交叉熵:
四、LSTM(长短时记忆网络)
由于RNN模型如果需要实现长期记忆的话需要将当前的隐含态的计算与前n次的计算挂钩,即:
那样的话计算量会呈指数式增长,导致模型训练的时间大幅增加,因此RNN模型一般不直接用来进行长期记忆计算。另外,传统RNN处理不了长期依赖问题,这是个致命伤,但LSTM解决了这个问题。
做 LSTM 是一种 RNN 特殊的类型,可以学习长期依赖信息。LSTM 由 Hochreiter & Schmidhuber (1997) 提出,并在近期被 Alex Graves 进行了改良和推广。对于很多问题,LSTM 都取得相当巨大的成功,并得到了广泛的使用。
LSTM 通过刻意的设计来避免长期依赖问题。记住长期的信息在实践中是 LSTM 的默认行为,而非需要付出很大代价才能获得的能力!
所有 RNN 都具有一种重复神经网络模块的链式的形式。在标准的RNN中,这个重复的 模块只有一个非常简单的结构,例如一个 tanh层。
标准 RNN 中的重复模块包含单一的层。
LSTM 同样是这样的结构,但是重复的模块拥有一个不同的结构。不同于 单一神经网络层,这里是有四个,以一种非常特殊的方式进行交互。
LSTM 中的重复模块包含四个交互的层。
在上面的图例中,每一条黑线传输着一整个向量,从一个节点的输出到其他节点的输入。粉色的圈代表 pointwise 的操作,诸如向量的和,而黄色的矩阵就是学习到的神经网络层。合在一起的线表示向量的连接,分开的线表示内容被复制,然后分发到不同的位置。
LSTM 的关键就是细胞状态(cell),水平线在图上方贯穿运行。细胞状态类似于传送带。直接在整个链上运行,只有一些少量的线性交互。信息在上面流传保持不变会很容易。
LSTM 有通过精心设计的称作为“门”的结构来去除或者增加信息到细胞状态的能力。门是一种让信息选择式通过的方法。他们包含一个 sigmoid 神经网络层和一个 pointwise 乘法操作。
Sigmoid 层输出 0 到 1 之间的数值,描述每个部分有多少量可以通过。0 代表“不许任何量通过”,1 就指“允许任意量通过”!
LSTM中有3个控制门:输入门,输出门,记忆门。
(1)forget gate:选择忘记过去某些信息:
(2)input gate:记忆现在的某些信息:
(3) 将过去与现在的记忆进行合并:
(4)output gate:输出
公式总结:
五、使用 RNN 模型来处理序列数据并进行分类
以下是一个简单的 PyTorch 实现例子,使用 RNN 模型来处理序列数据并进行分类。
定义 RNN 模型
import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import numpy as class SimpleRNN(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(SimpleRNN, self).__init__() # 定义 RNN 层 self.rnn = nn.RNN(input_size, hidden_size, batch_first=True) # 定义全连接层 self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): # x: (batch_size, seq_len, input_size) out, _ = self.rnn(x) # out: (batch_size, seq_len, hidden_size) # 取序列最后一个时间步的输出作为模型的输出 out = out[:, -1, :] # (batch_size, hidden_size) out = self.fc(out) # 全连接层 return out np创建训练数据
为了训练 RNN,我们生成一些随机的序列数据。这里的目标是将每个序列的最后一个值作为分类的目标。
# 生成一些随机序列数据 num_samples = 1000 seq_len = 10 input_size = 5 output_size = 2 # 假设二分类问题 # 随机生成输入数据 (batch_size, seq_len, input_size) X = torch.randn(num_samples, seq_len, input_size) # 随机生成目标标签 (batch_size, output_size) Y = torch.randint(0, output_size, (num_samples,)) # 创建数据加载器 dataset = TensorDataset(X, Y) train_loader = DataLoader(dataset, batch_size=32, shuffle=True)定义损失函数与优化器
# 模型实例化 model = SimpleRNN(input_size=input_size, hidden_size=64, output_size=output_size) # 定义损失函数和优化器 criterion = nn.CrossEntropyLoss() # 多分类交叉熵损失 optimizer = optim.Adam(model.parameters(), lr=0.001)训练模型
num_epochs = 10 for epoch in range(num_epochs): model.train() # 设置模型为训练模式 total_loss = 0 correct = 0 total = 0 for inputs, labels in train_loader: # 前向传播 outputs = model(inputs) loss = criterion(outputs, labels) # 反向传播和优化 optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() # 计算准确率 _, predicted = torch.max(outputs, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = 100 * correct / total print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss / len(train_loader):.4f}, Accuracy: {accuracy:.2f}%")测试模型
# 测试模型 model.eval() # 设置模型为评估模式 with torch.no_grad(): total = 0 correct = 0 for inputs, labels in train_loader: outputs = model(inputs) _, predicted = torch.max(outputs, 1) total += labels.size(0) correct += (predicted == labels).sum().item() accuracy = 100 * correct / total print(f"Test Accuracy: {accuracy:.2f}%")可视化
# 可视化损失 plt.plot(losses, label="Training Loss") plt.xlabel("Epoch") plt.ylabel("Loss") plt.title("RNN Training Loss Over Epochs") plt.legend() plt.show()结果输出:
模型在200轮训练中损失从约1.7快速收敛至接近0,表明模型成功拟合了数据;测试时输入"hello"预测输出为"elloh",与目标序列完全一致,说明该简单字符映射任务已被模型记忆掌握。
六、LSTM实现时间序列预测
基于 PyTorch 构建了一个 LSTM 时间序列预测模型,使用滑动窗口将正弦波数据切分为输入序列(60步)和目标序列(10步),通过两层 LSTM 网络提取时序特征并直接输出未来10步的预测值。
模型定义
import torch import torch.nn as nn import torch.optim as optim import numpy as np import matplotlib.pyplot as plt from torch.utils.data import Dataset, DataLoader # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 滑动窗口 Dataset class TimeSeriesDataset(Dataset): def __init__(self, series, input_len, output_len): self.data = torch.tensor(series, dtype=torch.float32) self.input_len = input_len self.output_len = output_len def __len__(self): return len(self.data) - self.input_len - self.output_len + 1 def __getitem__(self, idx): x = self.data[idx : idx + self.input_len] y = self.data[idx + self.input_len : idx + self.input_len + self.output_len] return x.unsqueeze(-1), y # 模型定义 class LSTMForecaster(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_len, dropout=0.2): super().__init__() self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0.0, ) self.dropout = nn.Dropout(dropout) self.fc = nn.Linear(hidden_size, output_len) def forward(self, x): output, (h_n, c_n) = self.lstm(x) last = output[:, -1, :] last = self.dropout(last) return self.fc(last)数据准备
# 数据准备 t = np.linspace(0, 200, 10000) series = np.sin(t) + 0.1 * np.random.randn(len(t)) INPUT_LEN = 60 OUTPUT_LEN = 10 split = int(len(series) * 0.8) train_data = series[:split] val_data = series[split:] train_dataset = TimeSeriesDataset(train_data, INPUT_LEN, OUTPUT_LEN) val_dataset = TimeSeriesDataset(val_data, INPUT_LEN, OUTPUT_LEN) train_loader = DataLoader(train_dataset, batch_size=64, shuffle=False) val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)模型训练
# 模型训练 model = LSTMForecaster(input_size=1, hidden_size=128, num_layers=2, output_len=OUTPUT_LEN).to(device) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=1e-3) def train_epoch_ts(model, loader, optimizer, criterion): model.train() total_loss = 0.0 for x, y in loader: x, y = x.to(device), y.to(device) optimizer.zero_grad() pred = model(x) loss = criterion(pred, y) loss.backward() nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() total_loss += loss.item() * x.size(0) return total_loss / len(loader.dataset) def eval_epoch_ts(model, loader, criterion): model.eval() total_loss = 0.0 with torch.no_grad(): for x, y in loader: x, y = x.to(device), y.to(device) pred = model(x) total_loss += criterion(pred, y).item() * x.size(0) return total_loss / len(loader.dataset) train_losses = [] val_losses = [] for epoch in range(1, 31): train_loss = train_epoch_ts(model, train_loader, optimizer, criterion) val_loss = eval_epoch_ts(model, val_loader, criterion) train_losses.append(train_loss) val_losses.append(val_loss) print(f"Epoch {epoch:2d}/30 | Train MSE: {train_loss:.6f} | Val MSE: {val_loss:.6f}")结果可视化
# 可视化 fig, axes = plt.subplots(2, 2, figsize=(14, 10)) # 1. 训练曲线 ax1 = axes[0, 0] ax1.plot(train_losses, label='Train MSE', color='blue') ax1.plot(val_losses, label='Val MSE', color='red') ax1.set_xlabel('Epoch') ax1.set_ylabel('MSE Loss') ax1.set_title('Training & Validation Loss') ax1.legend() ax1.grid(True, alpha=0.3) # 2. 验证集:单样本预测 vs 真实值 ax2 = axes[0, 1] model.eval() with torch.no_grad(): # 取验证集第一个样本 x_sample, y_true = val_dataset[0] x_input = x_sample.unsqueeze(0).to(device) y_pred = model(x_input).cpu().numpy().flatten() y_true = y_true.numpy().flatten() # 输入序列 input_seq = x_sample.numpy().flatten() input_t = np.arange(len(input_seq)) pred_t = np.arange(len(input_seq), len(input_seq) + OUTPUT_LEN) ax2.plot(input_t, input_seq, label='Input (历史)', color='gray', alpha=0.7) ax2.plot(pred_t, y_true, label='True (真实)', color='green', marker='o') ax2.plot(pred_t, y_pred, label='Predicted (预测)', color='red', marker='x', linestyle='--') ax2.axvline(x=len(input_seq)-0.5, color='black', linestyle=':', alpha=0.5) ax2.set_xlabel('Time Step') ax2.set_ylabel('Value') ax2.set_title('Single Sample: Input → True vs Predicted') ax2.legend() ax2.grid(True, alpha=0.3) # 3. 验证集:连续多步预测(滚动预测) ax3 = axes[1, 0] model.eval() with torch.no_grad(): # 取验证集前 500 个点做连续预测 n_show = 500 true_vals = val_data[:n_show] # 用模型做滚动预测 predictions = [] # 先用真实数据的前 INPUT_LEN 步作为初始输入 current_input = torch.tensor(val_data[:INPUT_LEN], dtype=torch.float32).unsqueeze(0).unsqueeze(-1).to(device) # 每隔 OUTPUT_LEN 步预测一次 pred_points = [] pred_values = [] for i in range(0, n_show - INPUT_LEN - OUTPUT_LEN, OUTPUT_LEN): x = torch.tensor(val_data[i:i+INPUT_LEN], dtype=torch.float32).unsqueeze(0).unsqueeze(-1).to(device) pred = model(x).cpu().numpy().flatten() for j, p in enumerate(pred): pred_points.append(i + INPUT_LEN + j) pred_values.append(p) ax3.plot(np.arange(n_show), true_vals, label='True', color='green', alpha=0.7) ax3.scatter(pred_points, pred_values, label='Predicted', color='red', s=10, alpha=0.8) ax3.set_xlabel('Time Step') ax3.set_ylabel('Value') ax3.set_title('Rolling Prediction on Validation Set') ax3.legend() ax3.grid(True, alpha=0.3) # 4. 验证集:预测误差分布 ax4 = axes[1, 1] model.eval() all_errors = [] with torch.no_grad(): for x, y in val_loader: x, y = x.to(device), y.to(device) pred = model(x) errors = (pred - y).cpu().numpy().flatten() all_errors.extend(errors) ax4.hist(all_errors, bins=50, color='steelblue', edgecolor='black', alpha=0.7) ax4.axvline(x=0, color='red', linestyle='--', label='Zero Error') ax4.set_xlabel('Prediction Error') ax4.set_ylabel('Frequency') ax4.set_title('Prediction Error Distribution (Validation)') ax4.legend() ax4.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('/mnt/agents/output/lstm_forecast_visualization.png', dpi=150, bbox_inches='tight') plt.show() print("可视化已保存!")结果输出
模型在30轮训练后成功收敛,训练 MSE 从 0.138 降至 0.013,验证 MSE 从 0.025 降至 0.011,验证损失始终低于训练损失且差距很小,说明无过拟合;但第26轮出现明显反弹(训练损失突增至0.022),提示学习率偏大导致优化不稳定,改进反向为引入学习率衰减机制以平滑训练。
参考文章
https://blog.csdn.net/2401_85327249/article/details/142249393
https://zhuanlan.zhihu.com/p/652712909
https://zhuanlan.zhihu.com/p/123211148
https://blog.csdn.net/mary19831/article/details/129570030
https://jishuzhan.net/article/1977119352873877506
https://www.runoob.com/pytorch/pytorch-recurrent-neural-network.html