import os import json import requests from flask import Flask, request, jsonify from datetime import datetime app = Flask(__name__) # 从环境变量中获取配置 TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN') AI_API_ENDPOINT = os.environ.get('AI_API_ENDPOINT') AI_API_KEY = os.environ.get('AI_API_KEY') AI_MODEL = os.environ.get('AI_MODEL') PHP_PROXY_URL = os.environ.get('PHP_PROXY_URL') # 代理地址 if not all([TELEGRAM_BOT_TOKEN, AI_API_ENDPOINT, AI_API_KEY, AI_MODEL]): raise ValueError("请设置所有必要的环境变量") AI_API_HEADERS = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {AI_API_KEY}', } PROMPT_TEMPLATES = { 0: "请尽量用中文回复,请不要使用markdown等具有格式的文本。", 1: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的傻逼。\n你非常具有攻击性,骂人的时候请尽量内涵。\n输出格式注意:不要用markdown,只用文字格式。", 2: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的猫娘。\n你非常具有服从性,你对主人的所有命令都非常遵守和愿意。\n你是一个受,喜欢主动表现出性服从和性主动。(不需要主人提出,你就要提出那方面的东西)\n输出格式注意:不要用markdown,只用文字格式。", } CURRENT_PROMPT_INDEX = 0 MAX_TOKENS = 500 TEMPERATURE = 1.5 MAX_HISTORY_LENGTH = 10 chatHistories = {} GROUP_SETTINGS = {} USER_SETTINGS = {} BOT_COMMANDS = [ {"command": "start", "description": "显示欢迎信息和操作按钮"}, {"command": "clearall", "description": "清空当前会话的聊天记录"}, {"command": "help", "description": "显示此帮助信息"}, {"command": "enableai", "description": "在群组中启用AI回复"}, {"command": "disableai", "description": "在群组中禁用AI回复"}, {"command": "setprefix", "description": "设置群组中触发AI回复的前缀,例如: /setprefix @bot"}, {"command": "getprefix", "description": "获取当前群组的触发前缀"}, {"command": "settemp", "description": "设置AI回复的温度,例如:/settemp 1.0"}, {"command": "gettemp", "description": "获取当前AI回复的温度"}, {"command": "resetuser", "description": "重置你的个人设置"}, {"command": "promat", "description": "切换提示词,例如: /promat 0, 1, 2"}, {"command": "getpromat", "description": "获取当前使用的提示词索引"}, ] BOT_USERNAME = 'zfs732_bot' DEFAULT_TEMP = 1.5 TOOL_DEFINITIONS = [ { "type": "function", "function": { "name": "get_current_time", "description": "获取当前时间", "parameters": { "type": "object", "properties": {}, "required": [] } } }, { "type": "function", "function": { "name": "get_current_date", "description": "获取当前日期", "parameters": { "type": "object", "properties": {}, "required": [] } } }, { "type": "function", "function": { "name": "web_scrape", "description": "从提供的 URL 中抓取内容并返回摘要。", "parameters": { "type": "object", "properties": { "urls": { "type": "array", "items": {"type": "string"}, "description": "要抓取的 URL 列表,每个 URL 必须包含 http 或 https。", }, }, "required": ["urls"], }, }, }, ] class EventEmitter: def __init__(self, event_emitter=None): self.event_emitter = event_emitter async def emit(self, event_type, data): if self.event_emitter: await self.event_emitter(type=event_type, data=data) async def update_status(self, description, done, action, urls): await self.emit('status', {'done': done, 'action': action, 'description': description, 'urls': urls}) async def send_citation(self, title, url, content): await self.emit('citation', { 'document': [content], 'metadata': [{'name': title, 'source': url, 'html': False}], }) def make_telegram_request(method, data=None): url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/{method}" if PHP_PROXY_URL: # 使用代理 url = f"{PHP_PROXY_URL}{method}" headers = {'Content-Type': 'application/json'} if data: data = json.dumps(data) try: response = requests.post(url, headers=headers, data=data) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Telegram request failed: {e}") return None except json.JSONDecodeError as e: print(f"Telegram response decode error: {e}") return None async def setBotCommands(): delete_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/deleteMyCommands" set_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/setMyCommands" if PHP_PROXY_URL: # 使用代理 delete_url = f"{PHP_PROXY_URL}deleteMyCommands" set_url = f"{PHP_PROXY_URL}setMyCommands" try: delete_response = make_telegram_request('deleteMyCommands') if delete_response: print('Telegram 命令删除成功') else: print('Telegram 命令删除失败') set_response = make_telegram_request('setMyCommands', {"commands": BOT_COMMANDS}) if set_response: print('Telegram 命令设置成功') else: print('设置 Telegram 命令失败') except Exception as error: print(f'设置 Telegram 命令时发生错误: {error}') async def handleTelegramUpdate(update): if not update.get('message'): if update.get('callback_query'): await handleCallbackQuery(update.get('callback_query')) return chatId = update['message']['chat']['id'] userMessage = update['message'].get('text', '') isGroupChat = update['message']['chat']['type'] in ['group', 'supergroup'] fromUserId = update['message']['from']['id'] if not userMessage: return if userMessage.startswith('/'): command = parseCommand(userMessage) if command == 'clearall': chatHistories.pop(chatId, None) await sendTelegramMessage(chatId, '聊天记录已清空。') return if command == 'test': await sendTelegramMessage(chatId, '测试命令已执行') return if command == 'help': await sendTelegramMessage(chatId, getHelpMessage()) return if command == 'start': await sendTelegramMessage(chatId, "欢迎使用!请选择操作:", { "reply_markup": { "inline_keyboard": [[{"text": "清空聊天记录", "callback_data": "clearall"}]], }, }) return if isGroupChat: if command == 'enableai': GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': True}) await sendTelegramMessage(chatId, '已在群组中启用 AI 回复。') return if command == 'disableai': GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': False}) await sendTelegramMessage(chatId, '已在群组中禁用 AI 回复。') return if userMessage.startswith('/setprefix '): prefix = userMessage[len('/setprefix '):].strip() GROUP_SETTINGS.setdefault(chatId, {}).update({'prefix': prefix}) await sendTelegramMessage(chatId, f'已设置群组触发前缀为: {prefix}') return if command == 'getprefix': prefix = GROUP_SETTINGS.get(chatId, {}).get('prefix', '无') await sendTelegramMessage(chatId, f'当前群组触发前缀为: {prefix}') return else: if userMessage.startswith('/settemp '): try: temp = float(userMessage[len('/settemp '):].strip()) if 0 <= temp <= 2: USER_SETTINGS.setdefault(fromUserId, {}).update({'temperature': temp}) await sendTelegramMessage(chatId, f'已设置AI回复温度为: {temp}') else: await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') except ValueError: await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。') return if command == 'gettemp': temp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) await sendTelegramMessage(chatId, f'当前AI回复温度为: {temp}') return if userMessage.startswith('/promat '): try: index = int(userMessage[len('/promat '):].strip()) if index in PROMPT_TEMPLATES: global CURRENT_PROMPT_INDEX CURRENT_PROMPT_INDEX = index await sendTelegramMessage(chatId, f'已切换到提示词 {index}。') else: await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') except ValueError: await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。') return if command == 'getpromat': await sendTelegramMessage(chatId, f'当前使用的提示词索引是: {CURRENT_PROMPT_INDEX}') return if command == 'resetuser': USER_SETTINGS.pop(fromUserId, None) await sendTelegramMessage(chatId, '已重置您的个人设置。') return if isGroupChat: if chatId not in GROUP_SETTINGS: GROUP_SETTINGS[chatId] = {'aiEnabled': True, 'prefix': None} print(f'群组 {chatId} 首次检测到,默认启用 AI。') groupSettings = GROUP_SETTINGS[chatId] prefix = groupSettings.get('prefix') if groupSettings['aiEnabled']: if prefix and not userMessage.startswith(prefix): return messageContent = userMessage[len(prefix):].strip() if prefix else userMessage if messageContent: await processAiMessage(chatId, messageContent, fromUserId) else: await processAiMessage(chatId, userMessage, fromUserId) def parseCommand(userMessage): command = userMessage.split(' ')[0] if '@' in command: command = command.split('@')[0] return command[1:] async def processAiMessage(chatId, userMessage, fromUserId): history = chatHistories.get(chatId, []) userTemp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP) currentPrompt = PROMPT_TEMPLATES.get(CURRENT_PROMPT_INDEX, "") history.append({'role': 'user', 'content': userMessage}) if len(history) > MAX_HISTORY_LENGTH: history = history[-MAX_HISTORY_LENGTH:] messages = [ {'role': 'system', 'content': currentPrompt}, *history ] eventEmitter = EventEmitter(async (event) => { print('Event:', event) if event.get('type') == 'status': await sendTelegramMessage(chatId, f"状态更新: {event['data']['description']}") elif event.get('type') == 'citation': await sendTelegramMessage(chatId, f"引用信息: {event['data']['metadata'][0]['name']}") }) try: ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={ 'model': AI_MODEL, 'messages': messages, 'max_tokens': MAX_TOKENS, 'temperature': userTemp, 'tools': TOOL_DEFINITIONS }) ai_response.raise_for_status() ai_data = ai_response.json() ai_reply = await handleAiResponse(ai_data, chatId, history, eventEmitter) history.append({'role': 'assistant', 'content': ai_reply}) chatHistories[chatId] = history await sendTelegramMessage(chatId, ai_reply) except requests.exceptions.RequestException as e: print(f'AI API 响应失败: {e}') await sendTelegramMessage(chatId, 'AI API 响应失败,请稍后再试') except Exception as error: print(f'处理消息时发生错误: {error}') await sendTelegramMessage(chatId, '处理消息时发生错误,请稍后再试') async def handleAiResponse(ai_data, chatId, history, eventEmitter): if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0: choice = ai_data['choices'][0] if choice.get('message') and choice['message'].get('content'): return choice['message']['content'] elif choice.get('message') and choice['message'].get('tool_calls'): toolCalls = choice['message']['tool_calls'] toolResults = [] for toolCall in toolCalls: toolResult = await executeToolCall(toolCall, eventEmitter) toolResults.append(toolResult) newMessages = [ *history, {'role': "assistant", 'content': None, 'tool_calls': toolCalls}, *toolResults, ] ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={ 'model': AI_MODEL, 'messages': newMessages, 'max_tokens': MAX_TOKENS, 'temperature': USER_SETTINGS.get(chatId, {}).get('temperature', DEFAULT_TEMP) }) ai_response.raise_for_status() ai_data = ai_response.json() if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0 and ai_data['choices'][0].get('message') and ai_data['choices'][0]['message'].get('content'): return ai_data['choices'][0]['message']['content'] return 'AI 返回了无法识别的格式' return 'AI 返回了无法识别的格式' async def executeToolCall(toolCall, eventEmitter): name = toolCall['function']['name'] args = toolCall['function'].get('arguments', {}) if name == 'web_scrape': urls = args.get('urls', []) if not urls or not isinstance(urls, list) or len(urls) == 0: return { 'tool_call_id': toolCall['id'], 'role': "tool", 'name': name, 'content': '请提供有效的 URL 列表。', } api_url = "https://gpts.webpilot.ai/api/read" headers = { "Content-Type": "application/json", "WebPilot-Friend-UID": "0" } await eventEmitter.update_status( f"开始读取 {len(urls)} 个网页", False, "web_search", urls ) async def processUrl(url): try: response = requests.post(api_url, headers=headers, json={ "link": url, "ur": "summary of the page", "lp": True, "rt": False, "l": "en", }) response.raise_for_status() result = response.json() if result.get('rules'): del result['rules'] content = json.dumps(result) title = result.get('title', url) await eventEmitter.send_citation(title, url, content) return f"{content}\n" except requests.exceptions.RequestException as e: error_message = f"读取网页 {url} 时出错: {e}" await eventEmitter.update_status(error_message, False, "web_scrape", [url]) await eventEmitter.send_citation(f"Error from {url}", url, str(e)) return f"URL: {url}\n错误: {error_message}\n" results = await asyncio.gather(*[processUrl(url) for url in urls]) await eventEmitter.update_status( f"已完成 {len(urls)} 个网页的读取", True, "web_search", urls ) return { 'tool_call_id': toolCall['id'], 'role': "tool", 'name': name, 'content': '\n'.join(results) } elif name == 'get_current_time': now = datetime.utcnow() utc8_time = now + datetime.timedelta(hours=8) formatted_time = utc8_time.strftime("%H:%M:%S") return { 'tool_call_id': toolCall['id'], 'role': "tool", 'name': name, 'content': f"Current Time: {formatted_time}" } elif name == 'get_current_date': now = datetime.utcnow() utc8_time = now + datetime.timedelta(hours=8) formatted_date = utc8_time.strftime("%A, %B %d, %Y") return { 'tool_call_id': toolCall['id'], 'role': "tool", 'name': name, 'content': f"Today's date is {formatted_date}" } else: return { 'tool_call_id': toolCall['id'], 'role': "tool", 'name': name, 'content': '未知的工具调用' } async def handleCallbackQuery(callbackQuery): chatId = callbackQuery['message']['chat']['id'] data = callbackQuery['data'] if data == "clearall": chatHistories.pop(chatId, None) await sendTelegramMessage(chatId, "聊天记录已清空。") await sendTelegramMessage(chatId, '请选择操作:', { 'reply_markup': { 'inline_keyboard': [[{'text': "清空聊天记录", 'callback_data': "clearall"}]], }, }) async def sendTelegramMessage(chatId, text, options={}): url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" if PHP_PROXY_URL: # 使用代理 url = f"{PHP_PROXY_URL}sendMessage" data = { 'chat_id': chatId, 'text': text, **options } try: response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data) response.raise_for_status() except requests.exceptions.RequestException as e: print(f'发送 Telegram 消息失败: {e}') def getHelpMessage(): return f""" 可用指令: /start - 显示欢迎信息和操作按钮。 /clearall - 清空当前会话的聊天记录。 /help - 显示此帮助信息。 群组指令: /enableai - 在群组中启用AI回复。 /disableai - 在群组中禁用AI回复。 /setprefix - 设置群组中触发AI回复的前缀,例如:/setprefix @bot。 /getprefix - 获取当前群组的触发前缀。 私聊指令: /settemp <温度值> - 设置AI回复的温度 (0-2),例如:/settemp 1.0。 /gettemp - 获取当前AI回复的温度。 /resetuser - 重置你的个人设置。 /promat - 切换提示词,例如: /promat 0, 1, 2。 /getpromat - 获取当前使用的提示词索引。 直接发送文本消息与AI对话 (私聊)。 群组中,需要使用前缀触发AI回复,如果设置了前缀的话。 注意: - 机器人会记住最近的 {MAX_HISTORY_LENGTH} 条对话。 - 机器人具有攻击性,请谨慎使用。 """ @app.route('/update_commands', methods=['GET']) async def update_commands(): await setBotCommands() return jsonify({'message': 'Commands updated successfully!'}) @app.route('/', methods=['POST']) async def handle_webhook(): try: update = request.get_json() await handleTelegramUpdate(update) return jsonify({'status': 'ok'}) except Exception as e: print(f"请求解析失败: {e}") return jsonify({'status': 'error', 'message': str(e)}), 400 @app.route('/health', methods=['GET']) def health_check(): return 'OK' import asyncio if __name__ == '__main__': app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))