RNN文本分类任务实战
2024-01-02 21:27:11
- 递归神经网络 (RNN):
定义:RNN 是一类专为顺序数据处理而设计的人工神经网络。
顺序处理:RNN 保持一个隐藏状态,该状态捕获有关序列中先前输入的信息,使其适用于涉及顺序依赖关系的任务。 - 词嵌入:
定义:词嵌入是捕获语义关系的词的密集向量表示。
重要性:它们允许神经网络学习上下文信息和单词之间的关系。
实现:使用预先训练的词嵌入(Word2Vec、GloVe)或在模型中包含嵌入层。 - 文本标记化和填充:
代币化:将文本分解为单个单词或子单词。
填充:通过添加零或截断来确保所有序列具有相同的长度。 - Keras 中的顺序模型:
实现:利用 Keras 库中的 Sequential 模型创建线性层堆栈。 - 嵌入层:
实现:向模型添加嵌入层,将单词转换为密集向量。
配置:指定输入维度、输出维度(嵌入大小)和输入长度。 - 循环层(LSTM 或 GRU):
LSTM 和 GRU:长短期记忆 (LSTM) 和门控循环单元 (GRU) 层有助于捕获长期依赖关系。
实现:将一个或多个 LSTM 或 GRU 层添加到模型中。 - 致密层:
目的:密集层用于最终分类输出。
实现:添加一个或多个具有适当激活函数的密集层。 - 激活功能:
选择:ReLU(整流线性单元)或tanh是隐藏层中激活函数的常见选择。 - 损失函数和优化器:
损失函数:稀疏分类交叉熵通常用于文本分类任务。
优化:Adam 或 RMSprop 是常用的优化器。 - 批处理和排序:
批处理:在批量输入序列上训练模型。
处理不同长度的物料:使用填充来处理不同长度的序列。 - 培训流程:
汇编:使用所选的损失函数、优化器和指标编译模型。
训练:将模型拟合到训练数据,在单独的集合上进行验证。 - 防止过拟合:
技术:实现 dropout 或 recurrent dropout 层以防止过拟合。
正规化:如果需要,请考虑 L1 或 L2 正则化。 - 超参数调优:
参数:根据验证性能调整超参数,例如学习率、批量大小和循环单元数。 - 评估指标:
指标:选择适当的指标,如准确率、精确率、召回率或 F1 分数进行评估。
# 文本分类任务实战
# 数据集构建:影评数据集进行情感分析
# 词向量模型:加载训练好的词向量或者自己训练
# 序列网络模型:训练好RNN模型进行识别
import os
import warnings
warnings.filterwarnings('ignore')
import tensorflow as tf
import numpy as np
import pprint
import logging
import time
from collections import Counter
from pathlib import Path
from tqdm import tqdm
#加载影评数据集,可以自动下载放到对应位置
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.imdb.load_data()
# a=x_train.shape
# print(a)
# 读进来的数据是已经转换成ID映射的,一般的数据读进来都是词语,都需要手动转换成ID映射的
_word2idx = tf.keras.datasets.imdb.get_word_index()
word2idx = {w: i+3 for w, i in _word2idx.items()}
word2idx['<pad>'] = 0
word2idx['<start>'] = 1
word2idx['<unk>'] = 2
idx2word = {i: w for w, i in word2idx.items()}
# 按文本长度大小进行排序
def sort_by_len(x, y):
x, y = np.asarray(x), np.asarray(y)
idx = sorted(range(len(x)), key=lambda i: len(x[i]))
return x[idx], y[idx]
# 将中间结果保存到本地,万一程序崩了还得重玩,保存的是文本数据,不是ID
x_train, y_train = sort_by_len(x_train, y_train)
x_test, y_test = sort_by_len(x_test, y_test)
def write_file(f_path, xs, ys):
with open(f_path, 'w',encoding='utf-8') as f:
for x, y in zip(xs, ys):
f.write(str(y)+'\t'+' '.join([idx2word[i] for i in x][1:])+'\n')
write_file('./data/train.txt', x_train, y_train)
write_file('./data/test.txt', x_test, y_test)
# 构建语料表,基于词频来进行统计
counter = Counter()
with open('./data/train.txt',encoding='utf-8') as f:
for line in f:
line = line.rstrip()
label, words = line.split('\t')
words = words.split(' ')
counter.update(words)
words = ['<pad>'] + [w for w, freq in counter.most_common() if freq >= 10]
print('Vocab Size:', len(words))
Path('./vocab').mkdir(exist_ok=True)
with open('./vocab/word.txt', 'w',encoding='utf-8') as f:
for w in words:
f.write(w+'\n')
# 得到新的word2id映射表
word2idx = {}
with open('./vocab/word.txt',encoding='utf-8') as f:
for i, line in enumerate(f):
line = line.rstrip()
word2idx[line] = i
# embedding层
# 可以基于网络来训练,也可以直接加载别人训练好的,一般都是加载预训练模型
# 这里有一些常用的:https://nlp.stanford.edu/projects/glove/
#做了一个大表,里面有20598个不同的词,【20599*50】
embedding = np.zeros((len(word2idx)+1, 50)) # + 1 表示如果不在语料表中,就都是unknow
with open('./data/glove.6B.50d.txt',encoding='utf-8') as f: #下载好的
count = 0
for i, line in enumerate(f):
if i % 100000 == 0:
print('- At line {}'.format(i)) #打印处理了多少数据
line = line.rstrip()
0
sp = line.split(' ')
word, vec = sp[0], sp[1:]
if word in word2idx:
count += 1
embedding[word2idx[word]] = np.asarray(vec, dtype='float32') #将词转换成对应的向量
# 现在已经得到每个词索引所对应的向量
print("[%d / %d] words have found pre-trained values"%(count, len(word2idx)))
np.save('./vocab/word.npy', embedding)
print('Saved ./vocab/word.npy')
# 构建训练数据
# 注意所有的输入样本必须都是相同shape(文本长度,词向量维度等)
# 数据生成器
# tf.data.Dataset.from_tensor_slices(tensor):将tensor沿其第一个维度切片,返回一个含有N个样本的数据集,这样做的问题就是需要将整个数据集整体传入,然后切片建立数据集类对象,比较占内存。
#
# tf.data.Dataset.from_generator(data_generator,output_data_type,output_data_shape):从一个生成器中不断读取样本
def data_generator(f_path, params):
with open(f_path,encoding='utf-8') as f:
print('Reading', f_path)
for line in f:
line = line.rstrip()
label, text = line.split('\t')
text = text.split(' ')
x = [params['word2idx'].get(w, len(word2idx)) for w in text]#得到当前词所对应的ID
if len(x) >= params['max_len']:#截断操作
x = x[:params['max_len']]
else:
x += [0] * (params['max_len'] - len(x))#补齐操作
y = int(label)
yield x, y
def dataset(is_training, params):
_shapes = ([params['max_len']], ())
_types = (tf.int32, tf.int32)
if is_training:
ds = tf.data.Dataset.from_generator(
lambda: data_generator(params['train_path'], params),
output_shapes=_shapes,
output_types=_types, )
ds = ds.shuffle(params['num_samples'])
ds = ds.batch(params['batch_size'])
ds = ds.prefetch(tf.data.experimental.AUTOTUNE) # 设置缓存序列,根据可用的CPU动态设置并行调用的数量,说白了就是加速
else:
ds = tf.data.Dataset.from_generator(
lambda: data_generator(params['test_path'], params),
output_shapes=_shapes,
output_types=_types, )
ds = ds.batch(params['batch_size'])
ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
return ds
# 自定义网络模型
class Model(tf.keras.Model):
def __init__(self, params):
super().__init__()
self.embedding = tf.Variable(np.load('./vocab/word.npy'),
dtype=tf.float32,
name='pretrained_embedding',
trainable=False, )
self.drop1 = tf.keras.layers.Dropout(params['dropout_rate'])
self.drop2 = tf.keras.layers.Dropout(params['dropout_rate'])
self.drop3 = tf.keras.layers.Dropout(params['dropout_rate'])
self.rnn1 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'], return_sequences=True))
self.rnn2 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'], return_sequences=True))
self.rnn3 = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(params['rnn_units'], return_sequences=True))
self.drop_fc = tf.keras.layers.Dropout(params['dropout_rate'])
self.fc = tf.keras.layers.Dense(2 * params['rnn_units'], tf.nn.elu)
self.out_linear = tf.keras.layers.Dense(2)
def call(self, inputs, training=False):
if inputs.dtype != tf.int32:
inputs = tf.cast(inputs, tf.int32)
batch_sz = tf.shape(inputs)[0]
rnn_units = 2 * params['rnn_units']
x = tf.nn.embedding_lookup(self.embedding, inputs)
x = tf.reshape(x, (batch_sz * 10 * 10, 10, 50))
x = self.drop1(x, training=training)
x = self.rnn1(x)
x = tf.reduce_max(x, 1)
x = tf.reshape(x, (batch_sz * 10, 10, rnn_units))
x = self.drop2(x, training=training)
x = self.rnn2(x)
x = tf.reduce_max(x, 1)
x = tf.reshape(x, (batch_sz, 10, rnn_units))
x = self.drop3(x, training=training)
x = self.rnn3(x)
x = tf.reduce_max(x, 1)
x = self.drop_fc(x, training=training)
x = self.fc(x)
x = self.out_linear(x)
return x
# 设置参数
params = {
'vocab_path': './vocab/word.txt',
'train_path': './data/train.txt',
'test_path': './data/test.txt',
'num_samples': 25000,
'num_labels': 2,
'batch_size': 32,
'max_len': 1000,
'rnn_units': 200,
'dropout_rate': 0.2,
'clip_norm': 10.,
'num_patience': 3,
'lr': 3e-4,
}
def is_descending(history: list):
history = history[-(params['num_patience']+1):]
for i in range(1, len(history)):
if history[i-1] <= history[i]:
return False
return True
word2idx = {}
with open(params['vocab_path'],encoding='utf-8') as f:
for i, line in enumerate(f):
line = line.rstrip()
word2idx[line] = i
params['word2idx'] = word2idx
params['vocab_size'] = len(word2idx) + 1
model = Model(params)
model.build(input_shape=(None, None))#设置输入的大小,或者fit时候也能自动找到
#pprint.pprint([(v.name, v.shape) for v in model.trainable_variables])
#链接:https://tensorflow.google.cn/api_docs/python/tf/keras/optimizers/schedules/ExponentialDecay?version=stable
#return initial_learning_rate * decay_rate ^ (step / decay_steps)
decay_lr = tf.optimizers.schedules.ExponentialDecay(params['lr'], 1000, 0.95)#相当于加了一个指数衰减函数
optim = tf.optimizers.Adam(params['lr'])
global_step = 0
history_acc = []
best_acc = .0
t0 = time.time()
logger = logging.getLogger('tensorflow')
logger.setLevel(logging.INFO)
while True:
# 训练模型
for texts, labels in dataset(is_training=True, params=params):
with tf.GradientTape() as tape: # 梯度带,记录所有在上下文中的操作,并且通过调用.gradient()获得任何上下文中计算得出的张量的梯度
logits = model(texts, training=True)
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits)
loss = tf.reduce_mean(loss)
optim.lr.assign(decay_lr(global_step))
grads = tape.gradient(loss, model.trainable_variables)
grads, _ = tf.clip_by_global_norm(grads, params['clip_norm']) # 将梯度限制一下,有的时候回更新太猛,防止过拟合
optim.apply_gradients(zip(grads, model.trainable_variables)) # 更新梯度
if global_step % 50 == 0:
logger.info("Step {} | Loss: {:.4f} | Spent: {:.1f} secs | LR: {:.6f}".format(
global_step, loss.numpy().item(), time.time() - t0, optim.lr.numpy().item()))
t0 = time.time()
global_step += 1
# 验证集效果
m = tf.keras.metrics.Accuracy()
for texts, labels in dataset(is_training=False, params=params):
logits = model(texts, training=False)
y_pred = tf.argmax(logits, axis=-1)
m.update_state(y_true=labels, y_pred=y_pred)
acc = m.result().numpy()
logger.info("Evaluation: Testing Accuracy: {:.3f}".format(acc))
history_acc.append(acc)
if acc > best_acc:
best_acc = acc
logger.info("Best Accuracy: {:.3f}".format(best_acc))
if len(history_acc) > params['num_patience'] and is_descending(history_acc):
logger.info("Testing Accuracy not improved over {} epochs, Early Stop".format(params['num_patience']))
break
文章来源:https://blog.csdn.net/m0_71778249/article/details/135345807
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!