Python定时器
- 字数: 1000
- JourneyPeng
- 27 Oct, 2021

问题
如果想间隔一定时间发送一些数据,或者处理某一个任务,python里面比较简单的实现方式是sleep,但是简单的sleep会随着任务的持续执行,和最开始的时间偏离越来越大。主要原因有两个
- python中sleep执行的时间并不精确,难以做到毫秒级的精度
- 任务本身执行的时候会和sleep执行的时间累加
为了解决上面两个问题,设计了下面的代码
实现
from time import sleep, time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.is_running = False
self.args = args
self.kwargs = kwargs
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
start = int(time() * 1000)
i = 0
if not self.is_running:
self.is_running = True
while True:
if not self.is_running:
break
i += 1
now = int(time() * 1000)
sleep_ms = i * self.interval - (now - start)
if sleep_ms < 0:
self.function(*self.args, **self.kwargs)
else:
sleep(sleep_ms / 1000.0)
self.function(*self.args, **self.kwargs)
def stop(self):
self.is_running = False
使用
def hello(name):
print(f"[{time()}]\t Hello {name}!")
sleep(0.01)
print("starting...")
rt = RepeatedTimer(100, hello, "world")
try:
sleep(50)
finally:
rt.stop()
参考
https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds