import os,time,logging,requests,json,uuid,concurrent.futures,threading,base64,io from io import BytesIO from itertools import chain from PIL import Image from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from flask import Flask, request, jsonify, Response, stream_with_context from werkzeug.middleware.proxy_fix import ProxyFix from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry os.environ['TZ'] = 'Asia/Shanghai' time.tzset() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') API_ENDPOINT = "https://api-st.siliconflow.cn/v1/user/info" TEST_MODEL_ENDPOINT = "https://api-st.siliconflow.cn/v1/chat/completions" MODELS_ENDPOINT = "https://api-st.siliconflow.cn/v1/models" EMBEDDINGS_ENDPOINT = "https://api-st.siliconflow.cn/v1/embeddings" IMAGE_ENDPOINT = "https://api-st.siliconflow.cn/v1/images/generations" def requests_session_with_retries( retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504) ): session = requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter( max_retries=retry, pool_connections=1000, pool_maxsize=10000, pool_block=False ) session.mount("http://", adapter) session.mount("https://", adapter) return session session = requests_session_with_retries() app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1) models = { "text": [], "free_text": [], "embedding": [], "free_embedding": [], "image": [], "free_image": [] } key_status = { "invalid": [], "free": [], "unverified": [], "valid": [] } executor = concurrent.futures.ThreadPoolExecutor(max_workers=10000) model_key_indices = {} request_timestamps = [] token_counts = [] request_timestamps_day = [] token_counts_day = [] data_lock = threading.Lock() def get_credit_summary(api_key): headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } max_retries = 3 for attempt in range(max_retries): try: response = session.get(API_ENDPOINT, headers=headers, timeout=2) response.raise_for_status() data = response.json().get("data", {}) total_balance = data.get("totalBalance", 0) logging.info(f"获取额度,API Key:{api_key},当前额度: {total_balance}") return {"total_balance": float(total_balance)} except requests.exceptions.Timeout as e: logging.error(f"获取额度信息失败,API Key:{api_key},尝试次数:{attempt+1}/{max_retries},错误信息:{e} (Timeout)") if attempt >= max_retries - 1: logging.error(f"获取额度信息失败,API Key:{api_key},所有重试次数均已失败 (Timeout)") except requests.exceptions.RequestException as e: logging.error(f"获取额度信息失败,API Key:{api_key},错误信息:{e}") return None FREE_MODEL_TEST_KEY = ( "sk-bmjbjzleaqfgtqfzmcnsbagxrlohriadnxqrzfocbizaxukw" ) FREE_IMAGE_LIST = [ "stabilityai/stable-diffusion-3-5-large", "black-forest-labs/FLUX.1-schnell", "stabilityai/stable-diffusion-3-medium", "stabilityai/stable-diffusion-xl-base-1.0", "stabilityai/stable-diffusion-2-1" ] def test_model_availability(api_key, model_name, model_type="chat"): headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } if model_type == "image": return model_name in FREE_IMAGE_LIST try: endpoint = EMBEDDINGS_ENDPOINT if model_type == "embedding" else TEST_MODEL_ENDPOINT payload = ( {"model": model_name, "input": ["hi"]} if model_type == "embedding" else {"model": model_name, "messages": [{"role": "user", "content": "hi"}], "max_tokens": 5, "stream": False} ) timeout = 10 if model_type == "embedding" else 5 response = session.post( endpoint, headers=headers, json=payload, timeout=timeout ) return response.status_code in [200, 429] except requests.exceptions.RequestException as e: logging.error( f"测试{model_type}模型 {model_name} 可用性失败," f"API Key:{api_key},错误信息:{e}" ) return False def process_image_url(image_url, response_format=None): if not image_url: return {"url": ""} if response_format == "b64_json": try: response = session.get(image_url, stream=True) response.raise_for_status() image = Image.open(response.raw) buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return {"b64_json": img_str} except Exception as e: logging.error(f"图片转base64失败: {e}") return {"url": image_url} return {"url": image_url} def create_base64_markdown_image(image_url): try: response = session.get(image_url, stream=True) response.raise_for_status() image = Image.open(BytesIO(response.content)) new_size = tuple(dim // 4 for dim in image.size) resized_image = image.resize(new_size, Image.LANCZOS) buffered = BytesIO() resized_image.save(buffered, format="PNG") base64_encoded = base64.b64encode(buffered.getvalue()).decode('utf-8') markdown_image_link = f"![](data:image/png;base64,{base64_encoded})" logging.info("Created base64 markdown image link.") return markdown_image_link except Exception as e: logging.error(f"Error creating markdown image: {e}") return None def extract_user_content(messages): user_content = "" for message in messages: if message["role"] == "user": if isinstance(message["content"], str): user_content += message["content"] + " " elif isinstance(message["content"], list): for item in message["content"]: if isinstance(item, dict) and item.get("type") == "text": user_content += item.get("text", "") + " " return user_content.strip() def get_siliconflow_data(model_name, data): siliconflow_data = { "model": model_name, "prompt": data.get("prompt") or "", } if model_name == "black-forest-labs/FLUX.1-pro": siliconflow_data.update({ "width": max(256, min(1440, (data.get("width", 1024) // 32) * 32)), "height": max(256, min(1440, (data.get("height", 768) // 32) * 32)), "prompt_upsampling": data.get("prompt_upsampling", False), "image_prompt": data.get("image_prompt"), "steps": max(1, min(50, data.get("steps", 20))), "guidance": max(1.5, min(5, data.get("guidance", 3))), "safety_tolerance": max(0, min(6, data.get("safety_tolerance", 2))), "interval": max(1, min(4, data.get("interval", 2))), "output_format": data.get("output_format", "png") }) seed = data.get("seed") if isinstance(seed, int) and 0 < seed < 9999999999: siliconflow_data["seed"] = seed else: siliconflow_data.update({ "image_size": data.get("image_size", "1024x1024"), "prompt_enhancement": data.get("prompt_enhancement", False) }) seed = data.get("seed") if isinstance(seed, int) and 0 < seed < 9999999999: siliconflow_data["seed"] = seed if model_name not in ["black-forest-labs/FLUX.1-schnell", "Pro/black-forest-labs/FLUX.1-schnell"]: siliconflow_data.update({ "batch_size": max(1, min(4, data.get("n", 1))), "num_inference_steps": max(1, min(50, data.get("steps", 20))), "guidance_scale": max(0, min(100, data.get("guidance_scale", 7.5))), "negative_prompt": data.get("negative_prompt") }) valid_sizes = ["1024x1024", "512x1024", "768x512", "768x1024", "1024x576", "576x1024", "960x1280", "720x1440", "720x1280"] if "image_size" in siliconflow_data and siliconflow_data["image_size"] not in valid_sizes: siliconflow_data["image_size"] = "1024x1024" return siliconflow_data def refresh_models(): global models models["text"] = get_all_models(FREE_MODEL_TEST_KEY, "chat") models["embedding"] = get_all_models(FREE_MODEL_TEST_KEY, "embedding") models["image"] = get_all_models(FREE_MODEL_TEST_KEY, "text-to-image") models["free_text"] = [] models["free_embedding"] = [] models["free_image"] = [] ban_models = [] ban_models_str = os.environ.get("BAN_MODELS") if ban_models_str: try: ban_models = json.loads(ban_models_str) if not isinstance(ban_models, list): logging.warning("环境变量 BAN_MODELS 格式不正确,应为 JSON 数组。") ban_models = [] except json.JSONDecodeError: logging.warning("环境变量 BAN_MODELS JSON 解析失败,请检查格式。") models["text"] = [model for model in models["text"] if model not in ban_models] models["embedding"] = [model for model in models["embedding"] if model not in ban_models] models["image"] = [model for model in models["image"] if model not in ban_models] model_types = [ ("text", "chat"), ("embedding", "embedding"), ("image", "image") ] for model_type, test_type in model_types: with concurrent.futures.ThreadPoolExecutor(max_workers=10000) as executor: future_to_model = { executor.submit( test_model_availability, FREE_MODEL_TEST_KEY, model, test_type ): model for model in models[model_type] } for future in concurrent.futures.as_completed(future_to_model): model = future_to_model[future] try: is_free = future.result() if is_free: models[f"free_{model_type}"].append(model) except Exception as exc: logging.error(f"{model_type}模型 {model} 测试生成异常: {exc}") for model_type in ["text", "embedding", "image"]: logging.info(f"所有{model_type}模型列表:{models[model_type]}") logging.info(f"免费{model_type}模型列表:{models[f'free_{model_type}']}") def load_keys(): global key_status for status in key_status: key_status[status] = [] keys_str = os.environ.get("KEYS") if not keys_str: logging.warning("环境变量 KEYS 未设置。") return test_model = os.environ.get("TEST_MODEL", "Pro/google/gemma-2-9b-it") unique_keys = list(set(key.strip() for key in keys_str.split(','))) os.environ["KEYS"] = ','.join(unique_keys) logging.info(f"加载的 keys:{unique_keys}") def process_key_with_logging(key): try: key_type = process_key(key, test_model) if key_type in key_status: key_status[key_type].append(key) return key_type except Exception as exc: logging.error(f"处理 KEY {key} 生成异常: {exc}") return "invalid" with concurrent.futures.ThreadPoolExecutor(max_workers=10000) as executor: futures = [executor.submit(process_key_with_logging, key) for key in unique_keys] concurrent.futures.wait(futures) for status, keys in key_status.items(): logging.info(f"{status.capitalize()} KEYS: {keys}") global invalid_keys_global, free_keys_global, unverified_keys_global, valid_keys_global invalid_keys_global = key_status["invalid"] free_keys_global = key_status["free"] unverified_keys_global = key_status["unverified"] valid_keys_global = key_status["valid"] def process_key(key, test_model): credit_summary = get_credit_summary(key) if credit_summary is None: return "invalid" else: total_balance = credit_summary.get("total_balance", 0) if total_balance <= 0.03: return "free" else: if test_model_availability(key, test_model): return "valid" else: return "unverified" def get_all_models(api_key, sub_type): headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: response = session.get( MODELS_ENDPOINT, headers=headers, params={"sub_type": sub_type} ) response.raise_for_status() data = response.json() if ( isinstance(data, dict) and 'data' in data and isinstance(data['data'], list) ): return [ model.get("id") for model in data["data"] if isinstance(model, dict) and "id" in model ] else: logging.error("获取模型列表失败:响应数据格式不正确") return [] except requests.exceptions.RequestException as e: logging.error( f"获取模型列表失败," f"API Key:{api_key},错误信息:{e}" ) return [] except (KeyError, TypeError) as e: logging.error( f"解析模型列表失败," f"API Key:{api_key},错误信息:{e}" ) return [] def determine_request_type(model_name, model_list, free_model_list): if model_name in free_model_list: return "free" elif model_name in model_list: return "paid" else: return "unknown" def select_key(request_type, model_name): if request_type == "free": available_keys = ( free_keys_global + unverified_keys_global + valid_keys_global ) elif request_type == "paid": available_keys = unverified_keys_global + valid_keys_global else: available_keys = ( free_keys_global + unverified_keys_global + valid_keys_global ) if not available_keys: return None current_index = model_key_indices.get(model_name, 0) for _ in range(len(available_keys)): key = available_keys[current_index % len(available_keys)] current_index += 1 if key_is_valid(key, request_type): model_key_indices[model_name] = current_index return key else: logging.warning( f"KEY {key} 无效或达到限制,尝试下一个 KEY" ) model_key_indices[model_name] = 0 return None def key_is_valid(key, request_type): if request_type == "invalid": return False credit_summary = get_credit_summary(key) if credit_summary is None: return False total_balance = credit_summary.get("total_balance", 0) if request_type == "free": return True elif request_type == "paid" or request_type == "unverified": return total_balance > 0 else: return False def check_authorization(request): authorization_key = os.environ.get("AUTHORIZATION_KEY") if not authorization_key: logging.warning("环境变量 AUTHORIZATION_KEY 未设置,此时无需鉴权即可使用,建议进行设置后再使用。") return True auth_header = request.headers.get('Authorization') if not auth_header: logging.warning("请求头中缺少 Authorization 字段。") return False if auth_header != f"Bearer {authorization_key}": logging.warning(f"无效的 Authorization 密钥:{auth_header}") return False return True scheduler = BackgroundScheduler() scheduler.add_job(load_keys, 'interval', hours=1) scheduler.remove_all_jobs() scheduler.add_job(refresh_models, 'interval', hours=1) @app.route('/') def index(): current_time = time.time() one_minute_ago = current_time - 60 one_day_ago = current_time - 86400 with data_lock: while request_timestamps and request_timestamps[0] < one_minute_ago: request_timestamps.pop(0) token_counts.pop(0) rpm = len(request_timestamps) tpm = sum(token_counts) with data_lock: while request_timestamps_day and request_timestamps_day[0] < one_day_ago: request_timestamps_day.pop(0) token_counts_day.pop(0) rpd = len(request_timestamps_day) tpd = sum(token_counts_day) return jsonify({"rpm": rpm, "tpm": tpm, "rpd": rpd, "tpd": tpd}) @app.route('/handsome/v1/models', methods=['GET']) def list_models(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 detailed_models = [] all_models = chain( models["text"], models["embedding"], models["image"] ) for model in all_models: detailed_models.append({ "id": model, "object": "model", "created": 1678888888, "owned_by": "openai", "permission": [], "root": model, "parent": None }) return jsonify({ "success": True, "data": detailed_models }) @app.route('/handsome/v1/dashboard/billing/usage', methods=['GET']) def billing_usage(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 daily_usage = [] return jsonify({ "object": "list", "data": daily_usage, "total_usage": 0 }) @app.route('/handsome/v1/dashboard/billing/subscription', methods=['GET']) def billing_subscription(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 keys = valid_keys_global + unverified_keys_global total_balance = 0 with concurrent.futures.ThreadPoolExecutor( max_workers=10000 ) as executor: futures = [ executor.submit(get_credit_summary, key) for key in keys ] for future in concurrent.futures.as_completed(futures): try: credit_summary = future.result() if credit_summary: total_balance += credit_summary.get("total_balance", 0) except Exception as exc: logging.error(f"获取额度信息生成异常: {exc}") return jsonify({ "object": "billing_subscription", "access_until": int(datetime(9999, 12, 31).timestamp()), "soft_limit": 0, "hard_limit": total_balance, "system_hard_limit": total_balance, "soft_limit_usd": 0, "hard_limit_usd": total_balance, "system_hard_limit_usd": total_balance }) @app.route('/handsome/v1/embeddings', methods=['POST']) def handsome_embeddings(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 data = request.get_json() if not data or 'model' not in data: return jsonify({"error": "Invalid request data"}), 400 if data['model'] not in models["embedding"]: return jsonify({"error": "Invalid model"}), 400 model_name = data['model'] request_type = determine_request_type( model_name, models["embedding"], models["free_embedding"] ) api_key = select_key(request_type, model_name) if not api_key: return jsonify({"error": ("No available API key for this request type or all keys have reached their limits")}), 429 headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } try: start_time = time.time() response = requests.post( EMBEDDINGS_ENDPOINT, headers=headers, json=data, timeout=120 ) if response.status_code == 429: return jsonify(response.json()), 429 response.raise_for_status() end_time = time.time() response_json = response.json() total_time = end_time - start_time try: prompt_tokens = response_json["usage"]["prompt_tokens"] embedding_data = response_json["data"] except (KeyError, ValueError, IndexError) as e: logging.error( f"解析响应 JSON 失败: {e}, " f"完整内容: {response_json}" ) prompt_tokens = 0 embedding_data = [] logging.info( f"使用的key: {api_key}, " f"提示token: {prompt_tokens}, " f"总共用时: {total_time:.4f}秒, " f"使用的模型: {model_name}" ) with data_lock: request_timestamps.append(time.time()) token_counts.append(prompt_tokens) request_timestamps_day.append(time.time()) token_counts_day.append(prompt_tokens) return jsonify({ "object": "list", "data": embedding_data, "model": model_name, "usage": { "prompt_tokens": prompt_tokens, "total_tokens": prompt_tokens } }) except requests.exceptions.RequestException as e: return jsonify({"error": str(e)}), 500 @app.route('/handsome/v1/images/generations', methods=['POST']) def handsome_images_generations(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 data = request.get_json() if not data or 'model' not in data: return jsonify({"error": "Invalid request data"}), 400 if data['model'] not in models["image"]: return jsonify({"error": "Invalid model"}), 400 model_name = data.get('model') request_type = determine_request_type( model_name, models["image"], models["free_image"] ) api_key = select_key(request_type, model_name) if not api_key: return jsonify({"error": ("No available API key for this request type or all keys have reached their limits")}), 429 headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } response_data = {} if "stable-diffusion" in model_name or model_name in ["black-forest-labs/FLUX.1-schnell", "Pro/black-forest-labs/FLUX.1-schnell","black-forest-labs/FLUX.1-dev", "black-forest-labs/FLUX.1-pro"]: siliconflow_data = get_siliconflow_data(model_name, data) try: start_time = time.time() response = requests.post( IMAGE_ENDPOINT, headers=headers, json=siliconflow_data, timeout=120 ) if response.status_code == 429: return jsonify(response.json()), 429 response.raise_for_status() end_time = time.time() response_json = response.json() total_time = end_time - start_time try: images = response_json.get("images", []) openai_images = [] for item in images: if isinstance(item, dict) and "url" in item: image_url = item["url"] print(f"image_url: {image_url}") if data.get("response_format") == "b64_json": try: image_data = session.get(image_url, stream=True).raw image = Image.open(image_data) buffered = io.BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() openai_images.append({"b64_json": img_str}) except Exception as e: logging.error(f"图片转base64失败: {e}") openai_images.append({"url": image_url}) else: openai_images.append({"url": image_url}) else: logging.error(f"无效的图片数据: {item}") openai_images.append({"url": item}) response_data = { "created": int(time.time()), "data": openai_images } except (KeyError, ValueError, IndexError) as e: logging.error( f"解析响应 JSON 失败: {e}, " f"完整内容: {response_json}" ) response_data = { "created": int(time.time()), "data": [] } logging.info( f"使用的key: {api_key}, " f"总共用时: {total_time:.4f}秒, " f"使用的模型: {model_name}" ) with data_lock: request_timestamps.append(time.time()) token_counts.append(0) request_timestamps_day.append(time.time()) token_counts_day.append(0) return jsonify(response_data) except requests.exceptions.RequestException as e: logging.error(f"请求转发异常: {e}") return jsonify({"error": str(e)}), 500 else: return jsonify({"error": "Unsupported model"}), 400 @app.route('/handsome/v1/chat/completions', methods=['POST']) def handsome_chat_completions(): if not check_authorization(request): return jsonify({"error": "Unauthorized"}), 401 data = request.get_json() if not data or 'model' not in data: return jsonify({"error": "Invalid request data"}), 400 if data['model'] not in models["text"] and data['model'] not in models["image"]: return jsonify({"error": "Invalid model"}), 400 model_name = data['model'] request_type = determine_request_type( model_name, models["text"] + models["image"], models["free_text"] + models["free_image"] ) api_key = select_key(request_type, model_name) if not api_key: return jsonify( { "error": ( "No available API key for this " "request type or all keys have " "reached their limits" ) } ), 429 headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } if model_name in models["image"]: if isinstance(data.get("messages"), list): data = data.copy() data["prompt"] = extract_user_content(data["messages"]) siliconflow_data = get_siliconflow_data(model_name, data) try: start_time = time.time() response = requests.post( IMAGE_ENDPOINT, headers=headers, json=siliconflow_data, stream=data.get("stream", False) ) if response.status_code == 429: return jsonify(response.json()), 429 if data.get("stream", False): def generate(): try: response.raise_for_status() response_json = response.json() images = response_json.get("images", []) image_url = "" if images and isinstance(images[0], dict) and "url" in images[0]: image_url = images[0]["url"] logging.info(f"Extracted image URL: {image_url}") elif images and isinstance(images[0], str): image_url = images[0] logging.info(f"Extracted image URL: {image_url}") markdown_image_link = create_base64_markdown_image(image_url) if image_url: chunk_size = 8192 for i in range(0, len(markdown_image_link), chunk_size): chunk = markdown_image_link[i:i + chunk_size] chunk_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": model_name, "choices": [ { "index": 0, "delta": { "role": "assistant", "content": chunk }, "finish_reason": None } ] } yield f"data: {json.dumps(chunk_data)}\n\n".encode('utf-8') else: chunk_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": model_name, "choices": [ { "index": 0, "delta": { "role": "assistant", "content": "Failed to generate image" }, "finish_reason": None } ] } yield f"data: {json.dumps(chunk_data)}\n\n".encode('utf-8') end_chunk_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": model_name, "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop" } ] } yield f"data: {json.dumps(end_chunk_data)}\n\n".encode('utf-8') with data_lock: request_timestamps.append(time.time()) token_counts.append(0) request_timestamps_day.append(time.time()) token_counts_day.append(0) except requests.exceptions.RequestException as e: logging.error(f"请求转发异常: {e}") error_chunk_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": model_name, "choices": [ { "index": 0, "delta": { "role": "assistant", "content": f"Error: {str(e)}" }, "finish_reason": None } ] } yield f"data: {json.dumps(error_chunk_data)}\n\n".encode('utf-8') end_chunk_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": model_name, "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop" } ] } yield f"data: {json.dumps(end_chunk_data)}\n\n".encode('utf-8') logging.info( f"使用的key: {api_key}, " f"使用的模型: {model_name}" ) yield "data: [DONE]\n\n".encode('utf-8') return Response(stream_with_context(generate()), content_type='text/event-stream') else: response.raise_for_status() end_time = time.time() response_json = response.json() total_time = end_time - start_time try: images = response_json.get("images", []) image_url = "" if images and isinstance(images[0], dict) and "url" in images[0]: image_url = images[0]["url"] logging.info(f"Extracted image URL: {image_url}") elif images and isinstance(images[0], str): image_url = images[0] logging.info(f"Extracted image URL: {image_url}") markdown_image_link = f"![image]({image_url})" response_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(time.time()), "model": model_name, "choices": [ { "index": 0, "message": { "role": "assistant", "content": markdown_image_link if image_url else "Failed to generate image", }, "finish_reason": "stop", } ], } except (KeyError, ValueError, IndexError) as e: logging.error( f"解析响应 JSON 失败: {e}, " f"完整内容: {response_json}" ) response_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(time.time()), "model": model_name, "choices": [ { "index": 0, "message": { "role": "assistant", "content": "Failed to process image data", }, "finish_reason": "stop", } ], } logging.info( f"使用的key: {api_key}, " f"总共用时: {total_time:.4f}秒, " f"使用的模型: {model_name}" ) with data_lock: request_timestamps.append(time.time()) token_counts.append(0) request_timestamps_day.append(time.time()) token_counts_day.append(0) return jsonify(response_data) except requests.exceptions.RequestException as e: logging.error(f"请求转发异常: {e}") return jsonify({"error": str(e)}), 500 else: try: start_time = time.time() response = requests.post( TEST_MODEL_ENDPOINT, headers=headers, json=data, stream=data.get("stream", False) ) if response.status_code == 429: return jsonify(response.json()), 429 if data.get("stream", False): def generate(): first_chunk_time = None full_response_content = "" for chunk in response.iter_content(chunk_size=2048): if chunk: if first_chunk_time is None: first_chunk_time = time.time() full_response_content += chunk.decode("utf-8") yield chunk end_time = time.time() first_token_time = ( first_chunk_time - start_time if first_chunk_time else 0 ) total_time = end_time - start_time prompt_tokens = 0 completion_tokens = 0 response_content = "" for line in full_response_content.splitlines(): if line.startswith("data:"): line = line[5:].strip() if line == "[DONE]": continue try: response_json = json.loads(line) if ( "usage" in response_json and "completion_tokens" in response_json["usage"] ): completion_tokens = response_json[ "usage" ]["completion_tokens"] if ( "choices" in response_json and len(response_json["choices"]) > 0 and "delta" in response_json["choices"][0] and "content" in response_json[ "choices" ][0]["delta"] ): response_content += response_json[ "choices" ][0]["delta"]["content"] if ( "usage" in response_json and "prompt_tokens" in response_json["usage"] ): prompt_tokens = response_json[ "usage" ]["prompt_tokens"] except ( KeyError, ValueError, IndexError ) as e: logging.error( f"解析流式响应单行 JSON 失败: {e}, " f"行内容: {line}" ) user_content = extract_user_content(data.get("messages", [])) user_content_replaced = user_content.replace( '\n', '\\n' ).replace('\r', '\\n') response_content_replaced = response_content.replace( '\n', '\\n' ).replace('\r', '\\n') logging.info( f"使用的key: {api_key}, " f"提示token: {prompt_tokens}, " f"输出token: {completion_tokens}, " f"首字用时: {first_token_time:.4f}秒, " f"总共用时: {total_time:.4f}秒, " f"使用的模型: {model_name}, " f"用户的内容: {user_content_replaced}, " f"输出的内容: {response_content_replaced}" ) with data_lock: request_timestamps.append(time.time()) token_counts.append(prompt_tokens+completion_tokens) request_timestamps_day.append(time.time()) token_counts_day.append(prompt_tokens+completion_tokens) return Response( stream_with_context(generate()), content_type=response.headers['Content-Type'] ) else: response.raise_for_status() end_time = time.time() response_json = response.json() total_time = end_time - start_time try: prompt_tokens = response_json["usage"]["prompt_tokens"] completion_tokens = response_json[ "usage" ]["completion_tokens"] response_content = response_json[ "choices" ][0]["message"]["content"] except (KeyError, ValueError, IndexError) as e: logging.error( f"解析非流式响应 JSON 失败: {e}, " f"完整内容: {response_json}" ) prompt_tokens = 0 completion_tokens = 0 response_content = "" user_content = extract_user_content(data.get("messages", [])) user_content_replaced = user_content.replace( '\n', '\\n' ).replace('\r', '\\n') response_content_replaced = response_content.replace( '\n', '\\n' ).replace('\r', '\\n') logging.info( f"使用的key: {api_key}, " f"提示token: {prompt_tokens}, " f"输出token: {completion_tokens}, " f"首字用时: 0, " f"总共用时: {total_time:.4f}秒, " f"使用的模型: {model_name}, " f"用户的内容: {user_content_replaced}, " f"输出的内容: {response_content_replaced}" ) with data_lock: request_timestamps.append(time.time()) if "prompt_tokens" in response_json["usage"] and "completion_tokens" in response_json["usage"]: token_counts.append(response_json["usage"]["prompt_tokens"] + response_json["usage"]["completion_tokens"]) else: token_counts.append(0) request_timestamps_day.append(time.time()) if "prompt_tokens" in response_json["usage"] and "completion_tokens" in response_json["usage"]: token_counts_day.append(response_json["usage"]["prompt_tokens"] + response_json["usage"]["completion_tokens"]) else: token_counts_day.append(0) return jsonify(response_json) except requests.exceptions.RequestException as e: logging.error(f"请求转发异常: {e}") return jsonify({"error": str(e)}), 500 if __name__ == '__main__': logging.info(f"环境变量:{os.environ}") load_keys() logging.info("程序启动时首次加载 keys 已执行") scheduler.start() logging.info("首次加载 keys 已手动触发执行") refresh_models() logging.info("首次刷新模型列表已手动触发执行") app.run(debug=False,host='0.0.0.0',port=int(os.environ.get('PORT', 7860)))