Spaces:
Sleeping
Sleeping
import threading | |
import time | |
class TokenBucket: | |
def __init__(self, tpm, timeout=None): | |
self.capacity = int(tpm) # 令牌桶容量 | |
self.tokens = 0 # 初始令牌数为0 | |
self.rate = int(tpm) / 60 # 令牌每秒生成速率 | |
self.timeout = timeout # 等待令牌超时时间 | |
self.cond = threading.Condition() # 条件变量 | |
self.is_running = True | |
# 开启令牌生成线程 | |
threading.Thread(target=self._generate_tokens).start() | |
def _generate_tokens(self): | |
"""生成令牌""" | |
while self.is_running: | |
with self.cond: | |
if self.tokens < self.capacity: | |
self.tokens += 1 | |
self.cond.notify() # 通知获取令牌的线程 | |
time.sleep(1 / self.rate) | |
def get_token(self): | |
"""获取令牌""" | |
with self.cond: | |
while self.tokens <= 0: | |
flag = self.cond.wait(self.timeout) | |
if not flag: # 超时 | |
return False | |
self.tokens -= 1 | |
return True | |
def close(self): | |
self.is_running = False | |
if __name__ == "__main__": | |
token_bucket = TokenBucket(20, None) # 创建一个每分钟生产20个tokens的令牌桶 | |
# token_bucket = TokenBucket(20, 0.1) | |
for i in range(3): | |
if token_bucket.get_token(): | |
print(f"第{i+1}次请求成功") | |
token_bucket.close() | |