import os import time import logging import requests from apscheduler.schedulers.background import BackgroundScheduler # 配置日志记录 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') API_ENDPOINT = "https://api.siliconflow.cn/v1/usage/status" def get_credit_usage(key): """ 获取单个 key 的额度使用情况。 Args: key: 要查询的 API key。 Returns: 包含额度信息的字典,如果出错则返回 None。 """ headers = { "Authorization": f"Bearer {key}" } try: response = requests.get(API_ENDPOINT, headers=headers) response.raise_for_status() # 检查请求是否成功 data = response.json() return data except requests.exceptions.RequestException as e: logging.error(f"请求 API 时出错,key:{key}, 错误信息:{e}") except ValueError as e: logging.error(f"解析 JSON 响应时出错,key:{key}, 错误信息:{e}") return None def load_keys_and_get_usage(): """ 从环境变量中加载 keys,并获取每个 key 的额度使用情况,然后记录到日志中。 """ keys_str = os.environ.get("KEYS") if keys_str: keys = [key.strip() for key in keys_str.split(',')] logging.info(f"加载的 keys:{keys}") for key in keys: usage_data = get_credit_usage(key) if usage_data: logging.info(f"Key: {key}, 额度信息: {usage_data}") else: logging.warning(f"获取 key:{key} 的额度信息失败。") else: logging.warning("环境变量 KEYS 未设置。") # 创建一个后台调度器 scheduler = BackgroundScheduler() # 添加定时任务,每小时执行一次 load_keys_and_get_usage 函数 scheduler.add_job(load_keys_and_get_usage, 'interval', hours=1) # 启动调度器 scheduler.start() # 为了保持程序运行,添加一个循环 if __name__ == '__main__': try: while True: time.sleep(120) except (KeyboardInterrupt, SystemExit): scheduler.shutdown()