Tensorflow——多线程输入数据处理框架(完整代码)
我们学习使用TensorFlow对图像数据进行预处理的方法。虽然使用这些图像数据预处理的方法可以减少无关因素对图像识别模型效果的影响,但这些复杂的预处理过程也会减慢整个训练过程。为了避免图像预处理成为神经网络模型训练效率的瓶颈,TensorFlow提供了一套多线程处理输入数据的框架。
下面总结了一个经典的输入数据处理的流程:
? 下面我们首先学习TensorFlow中队列的概念。在TensorFlow中,队列不仅是一种数据结构,它更提供了多线程机制。队列也是TensorFlow多线程输入数据处理框架的基础。然后再学习上面的流程。最后这个流程将处理好的单个训练数据整理成训练数据 batch,这些batch就可以作为神经网络的输入。
准备知识:多线程的简单介绍
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程。线程顾名思义,就是一条流水线工作的过程(流水线的工作需要电源,电源就相当于CPU),而一条流水线必须属于一个车间,一个车间就是一个进程,车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一条流水线。所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是CPU上的执行单位。
多线程(即多个控制线程)的概念就是:在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。比如成都地铁和西安地铁是不同的进程,而成都地铁3号线是一个线程,成都地铁所有的线程共享成都所有的资源,比如成都所有的乘客可以被所有线拉。
开启多线程的方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|
t.start() 将开启进程的信号发给操作系统后,操作系统要申请内存空间,让好拷贝父进程地址空间到子进程,开销远大于线程。
1,队列与多线程
在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。其他的计算节点可以修改他们的状态。对于变量,可以通过赋值操作修改变量的取值。对于队列,修改队列状态的操作主要有Enqueue,EnqueueMany和Dequeue。下面程序展示了如何使用这些函数来操作一个队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
TensorFlow中提供了FIFOQueue 和 RandomShuffleQueue 两种队列。在上面的程序中,已经展示了如何使用FIFOQueue,它的实现的一个先进先出队列。?RandomShuffleQueue 会将队列中的元素打乱,每次出队操作得到的是从当前队列所有元素中随机选择的一个。在训练审计网络时希望每次使用的训练数据尽量随机。?RandomShuffleQueue 就提供了这样的功能。
在TensorFlow中,队列不仅仅是一种数据结构,还是异步计算张量取值的一个重要机制。比如多个线程可以同时向一个队列中写元素,或者同时读取一个队列中的元素。在后面我们会学习TensorFlow是如何利用队列来实现多线程输入数据处理的。
TensorFlow提供了 tf.Coordinator 和 tf.QueueRunner 两个类来完成多线程协同的功能。tf.Coordinator 主要用于协同多个线程一起停止,并提供了 should_stop, request_stop 和 join 三个函数。在启动线程之前,需要先声明一个 tf.Coordinator 类,并将这个类传入每一个创建的线程中。启动的线程需要一直查询 tf.Coordinator 类中提供的 should_stop 函数,当这个函数的返回值为 True时,则当前线程也需要退出。每一个启动的线程都可以通过调用 request_stop 函数来通知其他线程退出。当某一个线程调用 ?request_stop 函数之后, should_stop 函数的返回值将被设置为 TRUE,这样其他的线程就可以同时终止了。下面程序展示了如何使用 tf.Coordinator。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
|
当所有线程启动之后,每个线程会打印各自的ID,于是前面4行打印出了他们的ID。然后在暂停1秒之后,所有的线程又开始第二遍打印ID。在这个时候有一个线程推出的条件达到,于是调用了coord.request_stop 函数来停止所有其他的线程。然而在打印Stoping_from_id:4之后,可以看到有线程仍然在输出。这是因为这些线程已经执行完 coord.should_stop 的判断,于是仍然会继续输出自己的ID。但在下一轮判断是否需要停止时将推出线程。于是在打印一次ID之后就不会再有输出了。
tf.QueueRunner 主要用于启动多个线程来操作同一个队列,启动的这些线程可以通过上面介绍的 tf.Coordinator 类来统一管理,下面代码展示了如何使用 tf.QueueRunner 和 tf.Coordinator 来管理多线程队列操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
|
输入文件队列
下面将学习如何使用TensorFlow中的队列管理输入文件列表。这里假设所有的输入数据都已经整理成了TFRecord 格式。虽然一个 TFRecord 文件中可以存储多个训练样例,但是当训练数据量较大时,可以将数据分成多个 TFRecord 文件来提高处理效率。 TensorFlow 提供了 tf.train.match_filenames_once 函数来获取符合一个正则表达式的所有文件,得到的文件列表可以通过 tf.train.string_input_producer 函数进行有效的管理。
tf.train.string_input_producer 函数会使用初始化时提供的文件列表创建一个输入队列,输入对垒中原始的元素为文件列表中的所有文件。如上面的代码所示,创建好的输入队列可以作为文件读取函数的参数。每次调用文件读取函数时,该函数会先判断当前是否已有打开的文件可读,如果没有或者打开的文件以及读完,这个函数会从输入队列中出队一个文件并从这个文件中读取数据。
通过设置 shuffle 参数,tf.train.string_input_producer 函数支持随机打乱文件列表中文件出队的顺序。当 shuffle 参数为 TRUE时,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。随机打乱文件顺序以及加入输入队列的过程会泡在一个单独的线程上,这样不会影响获取文件的速度。tf.train.string_input_producer 函数生成的输入队列可以同时被多个文件读取线程操作,而且输入队列会将队列中的文件均匀的分给不同的线程,不出现有些文件被处理过多次而有些文件还没有被处理过的情况。
当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列。tf.train.string_input_producer 函数可以设置 num_epochs 参数来限制加载初始文件列表的最大轮数。当所有文件都已经被使用了设定的轮数后,如果继续尝试读取新的文件,输入队列会报 OutOfRange 的错误。在测试神经网络模型时,因为所有测试数据只需要使用一次,所以可以将 num_epochs 参数设置为1,这样在计算完一轮之后程序将自动停止。在展示 ?tf.train.match_filenames_once 和 tf.train.string_input_producer 函数的使用方法之前,我们可以先给出一个简单的程序来生成数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
|
程序运行之后,在指定的目录下生产两个文件,每一个文件中存储了两个样例,在生成了样例数据之后,下面代码展示了 tf.train.match_filenames_once 函数 和 tf.train.string_input_producer 函数的使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
|
打印结果如下:
1 2 3 4 5 6 7 8 |
|
在不打乱文件列表的情况下,会依次独处样例数据中的每一个样例。而且当所有样例都被读完之后,程序会自动从头开始。如果限制 num_epochs=1,那么程序会报错。
组合训练数据(batching)
在上面,我们已经学习了如何从文件列表中读取单个样例,将这些单个样例通过预处理方法进行处理,就可以得到提高给神经网络输入层的训练数据了。在之前学习过,将多个输入样例组织成一个batch可以提高模型训练的效率。所以在得到单个样例的预处理结果之后,还需要将他们组织成batch,然后再提供给审计网络的输入层。TensorFlow提供了 tf.train.batch 和 tf.train.shuffle_batch 函数来将单个的样例组织成 batch 的形式输出。这两个函数都会生成一个队列,队列的入队操作时生成单个样例的方法,而每次出队得到的时一个batch的样例。他们唯一的区别自安于是否会将数据顺序打乱。下面代码展示了这两个函数的使用方法。
? 下面代码展示了 tf.train.batch函数的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
|
下面代码展示了 tf.train.shuffle_batch 函数的使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
|
tf.train.batch 函数 和 tf.train.shuffle_batch 函数除了将单个训练数据整理成输入 batch,也提供了并行化处理输入数据的方法。tf.train.batch 函数 和 tf.train.shuffle_batch 函数并行化的方式一样,所以我们执行应用更多的 tf.train.shuffle_batch 函数为例。通过设置tf.train.shuffle_batch 函数中的 num_threads参数,可以指定多个线程同时执行入队操作。tf.train.shuffle_batch 函数的入队操作就是数据读取以及预处理的过程。当 num_threads 参数大于1时,多个线程会同时读取一个文件中的不同样例并进行预处理。如果需要多个线程处理不同文件中的样例时,可以使用tf.train.shuffle_batch_join 函数。此函数会从输入文件队列中获取不同的文件分配给不同的线程。一般来说,输入文件队列时通过 tf.train.string_input_producer 函数生成的。这个函数会分均分配文件以保证不同文件中的数据会被尽量平均地使用。
tf.train.shuffle_batch 函数 和 tf.train.shuffle_batch_join 函数都可以完成多线程并行的方式来进行数据预处理,但是他们各有优劣。对于tf.train.shuffle_batch 函数,不同线程会读取同一个文件。如果一个文件中的样例比较相似(比如都属于同一个类别),那么神经网络的训练效果有可能会受到影响。所以在使用 tf.train.shuffle_batch 函数时,需要尽量将同一个TFRecord 文件中的样例随机打乱。而是用 tf.train.shuffle_batch_join 函数时,不同线程会读取不同文件。如果读取数据的线程数比总文件数还大,那么多个线程可能会读取同一个文件中相近部分的数据。而却多个线程读取多个文件可能导致过多的硬盘寻址,从而使得读取的效率降低。不同的并行化方式各有所长。具体采用哪一种方法需要根据具体情况来确定。
输入数据处理框架
前面已经学习了开始给出的流程图中的所有步骤,下面将这些步骤串成一个完成的TensorFlow来处理输入数据,下面代码给出了这个步骤:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
|
下面代码是生成TFRecord文件的(数据是MNIST数据)代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
|
上面代码给出了从输入数据处理的整个流程。(但是程序可能会报错,我们这里主要学习思路)。从下图中可以看出,输入数据处理的第一步是为获取存储训练数据的文件列表。下图的文件列表为{A, B, C}.通过 tf.train.string_input_producer 函数可以选择性地将文件列表中文件的顺序打乱,并加入输入队列。因为是否打乱文件的顺序是可选的,所以在图中是虚线的。tf.train.string_input_producer 函数会生成并维护一个输入文件队列,不同线程中的文件读取函数可以共享这个输入文件队列。在读取样例数据之后,需要将图像进行预处理。图像预处理的过程也会通过tf.train.shuffle_batch 提供的机制并行地跑在多个线程中。输入数据处理流程的最后通过 tf.train.shuffle_batch 函数将处理好的单个样例整理成 batch 提供给神经网络的输入层。通过这种方式,可以有效地提高数据预处理的效率,避免数据预处理成为神经网络模型性能过程中的性能瓶颈。
?TensorFlow 数据读取机制主要是两种方法:
- (1)使用文件队列方法,如使用 slice_input_producer 和 string_input_producer;这种方法既可以将数据转存为 TFRecord数据格式,也可以直接读取文件图片数据,当然转存为 TFRecord 数据格式进行读取会更高效点。而这两者之间的区别就是前者是输入 tensor_list ,因此可以将多个list组合成一个 tensorlist 作为输入;而后者只能是一个 string_tensor了。
- (2)使用TensorFlow 1.4版本后出现的 tf.data.DataSet 的数据读取机制(pipeline机制),这是TensorFlow强烈推荐的方式,是一种更高效的读取方式。使用 tf.data.Dataset 模块的pipeline机制,可以实现 CPU 多线程处理输入的数据,如读取图片和图片的一些预处理,这样 GPU就可以专注于训练过程,而CPU去准备数据。
举例如下:
1 2 3 4 5 6 7 8 9 10 |
|
tf.train.slice_input_produce() 函数的用法
这个函数的作用就是从输入的 tensor_list 按要求抽取一个 tensor 放入文件名队列,下面学习各个参数:
1 2 |
|
说明:
- tensor_list 这个就是输入,格式为tensor的列表;一般为[data, label],即由特征和标签组成的数据集
- num_epochs 这个是你抽取batch的次数,如果没有给定值,那么将会抽取无数次batch(这会导致你训练过程停不下来),如果给定值,那么在到达次数之后就会报OutOfRange的错误
- shuffle 是否随机打乱,如果为False,batch是按顺序抽取;如果为True,batch是随机抽取
- seed 随机种子
- capcity 队列容量的大小,为整数
- name 名称
举个例子:我们的数据data的 shape是(4000,10),label的shape是(4000, 2),运行下面这行代码:
1 2 |
|
结果肯定是返回值包含两组数据的 list,每个list的shape和输入的data和label的shape对应。
batch_size 的设置与影响
1,batch_size 的含义
batch_size 可以理解为批处理参数,它的极限值为训练集样本总数,当数据量比较小时,可以将batch_size 值设置为全数据集(Full batch cearning)。实际上,在深度学习中所涉及到的数据都是比较多的,一般都采用小批量数据处理原则。
2,关于小批量训练网络的优缺点
小批量训练网络的优点:
- 相对海量的的数据集和内存容量,小批量处理需要更少的内存就可以训练网络。
- 通常小批量训练网络速度更快,例如我们将一个大样本分成11小样本(每个样本100个数据),采用小批量训练网络时,每次传播后更新权重,就传播了11批,在每批次后我们均更新了网络的(权重)参数;如果在传播过程中使用了一个大样本,我们只会对训练网络的权重参数进行1次更新。
- 全数据集确定的方向能够更好地代表样本总体,从而能够更准确地朝着极值所在的方向;但是不同权值的梯度值差别较大,因此选取一个全局的学习率很困难。
小批量训练网络的缺点:
- 批次越小,梯度的估值就越不准确,在下图中,我们可以看到,与完整批次渐变(蓝色)方向相比,小批量渐变(绿色)的方向波动更大。
- 极端特例batch_size = 1,也成为在线学习(online learning);线性神经元在均方误差代价函数的错误面是一个抛物面,横截面是椭圆,对于多层神经元、非线性网络,在局部依然近似是抛物面,使用online learning,每次修正方向以各自样本的梯度方向修正,这就造成了波动较大,难以达到收敛效果。
3,为什么需要 batch_size 的参数
Batch 的选择,首先决定的时下降的方向。如果数据集比较小,完全可以采用全数据集(Full ?Batch Learning)的形式,这样做有如下好处:
- 全数据集确定的方向能够更好的代表样本总体,从而更准确地朝着极值所在的方向
- 由于不同权值的梯度差别较大,因此选取一个全局的学习率很困难
Full ?Batch Learning 可以使用 Rprop 只基于梯度符号并且针对性单独更新各权值。但是对于非常大的数据集,上述两个好处变成了两个坏处:
- 随着数据集的海量增加和内存限制,一次载入所有数据不现实
- 以Rprop的方式迭代,会由于各个 batch之间的采样差异性,各次梯度修正值相互抵消,无法修正。这才有了后来的RMSprop的妥协方案。
4,选择适中的 batch_size
可不可以选择一个适中的Batch_size 值呢?当然可以,就是批梯度下降法(Mini-batches Learning)。因为如果数据集足够充分,那么用一半(甚至少得多)的数据训练算出来的梯度与用全部数据训练出来的梯度是几乎一样的。
在合理的范围内,增大Batch_size 有什么好处?
- 内存利用率提高了,大矩阵乘法的并行化效率提高
- 跑完一次epoch(全数据集)所需要的迭代次数减少,对于相同数据量的处理速度进一步加快。
- 在一定范围内,一般来说Batch_Size 越大,其确定的下降方向越准,引起训练震荡越小。
盲目增大Batch_size 有什么坏处?
内存利用率提高了,但是内存容量可能撑不住了
跑完一次epoch(全数据集)所需要的迭代次数减少,要想达到相同的精度,其所花费的时间大大的增加了,从而对参数的修正也就显得更加缓慢。
Batch_size 增大到一定程度,其确定的下降方向已经基本不再变化。
5,调节Batch_Size 对训练效果影响到底如何?
这里有一个LeNet 在MNIST 数据集上的效果。MNIST 是一个手写体标准库。
运行结果如上图所示,其中绝对时间做了标准化处理。运行结果与上文分析相印证:
- Batch_Size 太小,算法在200 epochs 内不收敛。
- 随着Batch_Size 增大,处理相同数据量的速度越快。
- 随着Batch_Size 增大,达到相同精度所需要的epoch 数量越来越多
- 由于上述两种因素的矛盾,Batch_Size 增大到某个时候,达到时间上的最优
- 由于最终收敛精度会陷入不同的局部极值,因此Batch_Size 增大到某些时候,达到最终收敛精度上的最优
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!