连续触发定时器 RepeatTimer
2023-12-16 05:00:55
Python threading.Timer 是个单次触发定时器,即,调用 start 后只能触发一次,且 start 只能调用一次。
参考了部分网文,进行了如下简单封装,可以重复调用 start 和 stop,且可以获取运行状态。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 6 13:55:20 2023
@author: farman
"""
from threading import Timer
class _RepeatTimer(Timer):
"""
Repeat version of threading.Timer
"""
def run(self):
while not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.wait(self.interval)
return
class RepeatTimer:
"""
All operations are same to threading.Timer EXCEPT the below:
(1) while the timer is started, the custom function will be executed
repeatly.
(2) if the time consumption of the custom function is longer than
the repeat interval, the custom function only executed once after
once, no overlap will occured.
__init__(self, interval, function, args=None, kwargs=None)
This constructor should always be called with keyword arguments. Arguments are:
*group* should be None; reserved for future extension when a ThreadGroup
class is implemented.
*target* is the callable object to be invoked by the run()
method. Defaults to None, meaning nothing is called.
*name* is the thread name. By default, a unique name is constructed of
the form "Thread-N" where N is a small decimal number.
*args* is a list or tuple of arguments for the target invocation. Defaults to ().
*kwargs* is a dictionary of keyword arguments for the target
invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke
the base class constructor (Thread.__init__()) before doing anything
else to the thread.
start(self)
Start the timer if is not running.
Do nothing if is running.
stop(self)
Stop the timer if is running.
Do nothing if is not running.
"""
def __init__(self, interval_sec, function, args=None, kwargs=None):
self.interval = interval_sec
self.function = function
self.args = args
self.kwargs = kwargs
self.timer = None
return
def start(self):
if self.timer is not None and self.timer.is_alive():
print("RepeatTimer : already running, won't start.")
return
self.timer = _RepeatTimer(self.interval, self.function, self.args, self.kwargs)
self.timer.start()
return
def stop(self):
if self.timer is None:
print("RepeatTimer : not running, won't stop.")
elif self.timer.is_alive():
self.timer.cancel()
self.timer = None
else: # self.timer is not None and not self.timer.is_alive()
pass # do nothing
return
def is_running(self):
if self.timer is not None:
return self.timer.is_alive()
else:
return False
#------------------------------------------------------------------------------
if __name__ == "__main__":
import time
def func():
print(time.asctime())
return
timer = RepeatTimer(0.5, func)
while True:
ans = input("(s)tart/(b)reak/(q)uit ? ")
if ans == 's':
timer.start()
elif ans == 'b':
timer.stop()
elif ans == 'q':
timer.stop()
break
else:
print("Invalid input.")
注意:有篇网文中,使用递归调用工作函数的方式实现重复触发,每触发一次,调用栈就会加深一层,占用内存随触发次数增加。这种方式,短时间可能不会发生问题,但只要触发次数足够多,总会因为内存占用过多导致出错。慎重!
文章来源:https://blog.csdn.net/farmanlinuxer/article/details/131613670
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!