OpenCV图像处理——Python开发中OpenCV视频流的多线程处理方式
前言
在做视觉类项目中,常常需要在Python环境下使用OpenCV读取本地的还是网络摄像头的视频流,之后再调入各种模型,如目标分类、目标检测,人脸识别等等。如果使用单线程处理,很多时候会出现比较严重的时延,如果算力吃紧,模型推理所占用的更长的话,这种延迟感会更加明显,会出现卡帧的现象。在这种情况下,往往要把代码从单线程改为了多线程,即单独用一个线程实时捕获视频帧,主线程在需要时从子线程拷贝最近的帧使用即可。
单线程处理视频流时,如果目标检测模型较大或者任务复杂,会影响处理速度。而使用多线程,让视频捕获和目标检测分别在各自的线程中运行,能够更充分地利用 CPU 的多核心处理能力,提高整体的处理效率和实时性。
在实时视频处理中,特别是涉及到深度学习模型推理这种计算密集型的任务时,多线程确实能够带来显著的性能提升。通过将视频捕获和处理分开,可以避免由于处理时间过长而导致的帧丢失或延迟。
一、多线程
在 Python 中,可以使用 threading 模块来实现多线程。下面是一个简单的例子,演示了如何在 Python 中创建和使用多线程:
1、导入 threading 模块
首先导入 Python 的 threading
模块,它提供了多线程编程所需的功能。
import threading
2、创建线程执行函数
定义一个函数,作为线程的执行体。这个函数将会在每个线程中运行。在函数内编写希望线程执行的代码逻辑。
def my_function():
# Your code here
pass
3、创建线程对象
使用 threading.Thread()
创建一个线程对象,将目标函数指定为刚才定义的函数,并传入所需参数。
my_thread = threading.Thread(target=my_function, args=(arg1, arg2)) # 传入参数 args
4、动线程
使用 start()
方法启动线程。
my_thread.start()
5、等待线程执行完成
使用 join()
方法等待线程执行完毕。这会让主线程等待子线程的结束。
my_thread.join()
6、示例
以下是一个简单的示例,演示了如何使用多线程:
import threading
import time
# 线程执行体函数
def print_numbers():
for i in range(5):
print(f"Child Thread: {i}")
time.sleep(1)
# 创建线程对象
thread = threading.Thread(target=print_numbers)
# 启动线程
thread.start()
# 主线程中的其他操作
for i in range(5):
print(f"Main Thread: {i}")
time.sleep(0.5)
# 等待子线程执行结束
thread.join()
print("Main Thread exiting...")
print_numbers()
函数是子线程的执行体,在子线程中打印数字。主线程在启动子线程后,会同时进行自己的任务。最后通过 join()
方法等待子线程结束。
二、视频处理
一般视频处理代码分为两部分:从相机读取下一个可用帧以及对帧进行图像处理,例如把图像送到Yolov5目标检测模型进行检测。
在没有多线程的程序中,按顺序读取下一帧并进行处理。程序等待下一帧可用,然后对其进行必要的处理。读取帧所需的时间主要取决于请求、等待下一个视频帧,并将其从相机传输到内存的时间。无论是在 CPU 还是 GPU 上,对视频帧进行计算所需的时间都占据了视频处理所花费的大部分时间。
然而,在具有多线程的程序中,读取下一帧并对其进行处理不需要按顺序进行。当一个线程执行读取下一帧的任务时,主线程可以使用 CPU 或 GPU 来处理最后一个读取的帧。通过这种方式,这两个任务可以重叠执行,从而减少读取和处理帧的总时间。
1.视频单线程处理
# importing required libraries
import cv2
import time# opening video capture stream
vcap = cv2.VideoCapture(0)
if vcap.isOpened() is False :
print("[Exiting]: Error accessing webcam stream.")
exit(0)
fps_input_stream = int(vcap.get(5))
print("FPS of webcam hardware/input stream: {}".format(fps_input_stream))
grabbed, frame = vcap.read() # reading single frame for initialization/ hardware warm-up# processing frames in input stream
num_frames_processed = 0
start = time.time()
while True :
grabbed, frame = vcap.read()
if grabbed is False :
print('[Exiting] No more frames to read')
break# adding a delay for simulating time taken for processing a frame
delay = 0.03 # delay value in seconds. so, delay=1 is equivalent to 1 second
time.sleep(delay)
num_frames_processed += 1cv2.imshow('frame' , frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
end = time.time()# printing time elapsed and fps
elapsed = end-start
fps = num_frames_processed/elapsed
print("FPS: {} , Elapsed Time: {} , Frames Processed: {}".format(fps, elapsed, num_frames_processed))# releasing input stream , closing all windows
vcap.release()
cv2.destroyAllWindows()
2.视频多线程处理
# importing required libraries
import cv2
import time
from threading import Thread # library for implementing multi-threaded processing# defining a helper class for implementing multi-threaded processing
class WebcamStream :
def __init__(self, stream_id=0):
self.stream_id = stream_id # default is 0 for primary camera
# opening video capture stream
self.vcap = cv2.VideoCapture(self.stream_id)
if self.vcap.isOpened() is False :
print("[Exiting]: Error accessing webcam stream.")
exit(0)
fps_input_stream = int(self.vcap.get(5))
print("FPS of webcam hardware/input stream: {}".format(fps_input_stream))
# reading a single frame from vcap stream for initializing
self.grabbed , self.frame = self.vcap.read()
if self.grabbed is False :
print('[Exiting] No more frames to read')
exit(0)# self.stopped is set to False when frames are being read from self.vcap stream
self.stopped = True# reference to the thread for reading next available frame from input stream
self.t = Thread(target=self.update, args=())
self.t.daemon = True # daemon threads keep running in the background while the program is executing
# method for starting the thread for grabbing next available frame in input stream
def start(self):
self.stopped = False
self.t.start()# method for reading next frame
def update(self):
while True :
if self.stopped is True :
break
self.grabbed , self.frame = self.vcap.read()
if self.grabbed is False :
print('[Exiting] No more frames to read')
self.stopped = True
break
self.vcap.release()# method for returning latest read frame
def read(self):
return self.frame# method called to stop reading frames
def stop(self):
self.stopped = True# initializing and starting multi-threaded webcam capture input stream
webcam_stream = WebcamStream(stream_id=0) # stream_id = 0 is for primary camera
webcam_stream.start()# processing frames in input stream
num_frames_processed = 0
start = time.time()
while True :
if webcam_stream.stopped is True :
break
else :
frame = webcam_stream.read()# adding a delay for simulating time taken for processing a frame
delay = 0.03 # delay value in seconds. so, delay=1 is equivalent to 1 second
time.sleep(delay)
num_frames_processed += 1cv2.imshow('frame' , frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
end = time.time()
webcam_stream.stop() # stop the webcam stream# printing time elapsed and fps
elapsed = end-start
fps = num_frames_processed/elapsed
print("FPS: {} , Elapsed Time: {} , Frames Processed: {}".format(fps, elapsed, num_frames_processed))# closing all windows
cv2.destroyAllWindows()
上面的代码创建了一个 WebcamStream
类,其中包含了多线程读取相机帧的逻辑。在主循环中,它仍然以顺序方式处理每个帧,但是读取帧的线程是在后台运行的。
但上面的代码提升速度的同时,还有以下有改进的问题:
-
处理多个帧的逻辑: 代码在主循环中每次处理帧时都有一个固定的延迟
time.sleep(delay)
,这并不真实地模拟出帧处理的时间。应该考虑记录每个帧的时间戳,并在处理完帧后计算帧处理的实际时间。 -
多线程下的帧处理: 虽然视频流读取部分在单独的线程中,但是主循环仍然是顺序执行的,它等待每个帧进行处理。在多线程环境中,也许值得考虑在单独的线程中对帧进行处理。
-
内存和资源管理: 确保在程序退出时释放所有资源,特别是在多线程环境中,需要小心确保线程的安全退出。
-
代码结构和注释: 为了更好地可读性和维护性,添加一些注释来解释每个函数和方法的作用,以及代码块的意图。
3.多线程代码优化
-
去除固定延迟的处理方式: 代码在处理每一帧时都有固定的延迟。考虑使用实际帧处理时间的方法,而不是使用固定的延迟。这可以通过记录每个帧的时间戳来实现。
-
并行处理视频帧: 在主线程中按顺序处理每一帧。在多线程环境下,可以考虑使用多个线程并行处理视频帧,以加快处理速度。
-
资源释放: 在程序结束时,确保释放所有资源。这包括在适当的时候关闭视频流、终止线程等。
import cv2
import time
from threading import Thread
class WebcamStream:
def __init__(self, stream_id=0):
self.stream_id = stream_id
self.vcap = cv2.VideoCapture(self.stream_id)
if not self.vcap.isOpened():
print("[Exiting]: Error accessing webcam stream.")
exit(0)
self.fps_input_stream = int(self.vcap.get(cv2.CAP_PROP_FPS))
print("FPS of webcam hardware/input stream: {}".format(self.fps_input_stream))
self.grabbed, self.frame = self.vcap.read()
if not self.grabbed:
print('[Exiting] No more frames to read')
exit(0)
self.stopped = False
self.t = Thread(target=self.update, args=())
self.t.daemon = True
self.t.start()
def update(self):
while not self.stopped:
grabbed, frame = self.vcap.read()
if not grabbed:
print('[Exiting] No more frames to read')
self.stopped = True
break
self.frame = frame
def read(self):
return self.frame
def stop(self):
self.stopped = True
self.t.join()
self.vcap.release()
webcam_stream = WebcamStream(stream_id=0)
num_frames_processed = 0
start = time.time()
while True:
frame = webcam_stream.read()
if webcam_stream.stopped:
break
delay = 0.03
time.sleep(delay)
num_frames_processed += 1
cv2.imshow('frame', frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
end = time.time()
webcam_stream.stop()
elapsed = end - start
fps = num_frames_processed / elapsed
print("FPS: {} , Elapsed Time: {} , Frames Processed: {}".format(fps, elapsed, num_frames_processed))
cv2.destroyAllWindows()
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!