连续触发定时器 RepeatTimer

2023-12-16 05:00:55

Python threading.Timer 是个单次触发定时器,即,调用 start 后只能触发一次,且 start 只能调用一次。

参考了部分网文,进行了如下简单封装,可以重复调用 start 和 stop,且可以获取运行状态。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec  6 13:55:20 2023

@author: farman
"""

from threading import Timer


class _RepeatTimer(Timer):
    """
    Repeat version of threading.Timer
    """
    def run(self):
        while not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
            self.finished.wait(self.interval)
        return


class RepeatTimer:
    """
    All operations are same to threading.Timer EXCEPT the below:
        (1) while the timer is started, the custom function will be executed 
            repeatly.
        (2) if the time consumption of the custom function is longer than
            the repeat interval, the custom function only executed once after
            once, no overlap will occured.
    
    __init__(self, interval, function, args=None, kwargs=None)
         This constructor should always be called with keyword arguments. Arguments are:
         
         *group* should be None; reserved for future extension when a ThreadGroup
         class is implemented.
         
         *target* is the callable object to be invoked by the run()
         method. Defaults to None, meaning nothing is called.
         
         *name* is the thread name. By default, a unique name is constructed of
         the form "Thread-N" where N is a small decimal number.
         
         *args* is a list or tuple of arguments for the target invocation. Defaults to ().
         
         *kwargs* is a dictionary of keyword arguments for the target
         invocation. Defaults to {}.
         
         If a subclass overrides the constructor, it must make sure to invoke
         the base class constructor (Thread.__init__()) before doing anything
         else to the thread.
    
    start(self)
          Start the timer if is not running.
          Do nothing      if is     running.
    
    stop(self)
         Stop the timer if is     running.
         Do nothing     if is not running.
    """
    def __init__(self, interval_sec, function, args=None, kwargs=None):
        self.interval = interval_sec
        self.function = function
        self.args     = args
        self.kwargs   = kwargs
        self.timer    = None
        return
    
    def start(self):
        if   self.timer is not None and self.timer.is_alive():
            print("RepeatTimer : already running, won't start.")
            return
        
        self.timer = _RepeatTimer(self.interval, self.function, self.args, self.kwargs)
        self.timer.start()
        return
    
    def stop(self):
        if   self.timer is None:
            print("RepeatTimer : not running, won't stop.")
        elif self.timer.is_alive():
            self.timer.cancel()
            self.timer = None
        else: # self.timer is not None and not self.timer.is_alive()
            pass # do nothing
    
        return
    
    def is_running(self):
        if self.timer is not None:
            return self.timer.is_alive()
        else:
            return False

#------------------------------------------------------------------------------

if __name__ == "__main__":
    import time
    
    def func():
        print(time.asctime())
        return
    
    timer = RepeatTimer(0.5, func)
    
    while True:
        ans = input("(s)tart/(b)reak/(q)uit ? ")
    
        if ans == 's':
            timer.start()
        elif ans == 'b':
            timer.stop()
        elif ans == 'q':
            timer.stop()
            break
        else:
            print("Invalid input.")

注意:有篇网文中,使用递归调用工作函数的方式实现重复触发,每触发一次,调用栈就会加深一层,占用内存随触发次数增加。这种方式,短时间可能不会发生问题,但只要触发次数足够多,总会因为内存占用过多导致出错。慎重!

文章来源:https://blog.csdn.net/farmanlinuxer/article/details/131613670
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。