tg-bot / app.py
yangtb24's picture
Update app.py
70af755 verified
raw
history blame
20.3 kB
import os
import json
import requests
from flask import Flask, request, jsonify
from datetime import datetime
import asyncio
app = Flask(__name__)
# 从环境变量中获取配置
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN')
AI_API_ENDPOINT = os.environ.get('AI_API_ENDPOINT')
AI_API_KEY = os.environ.get('AI_API_KEY')
AI_MODEL = os.environ.get('AI_MODEL')
PHP_PROXY_URL = os.environ.get('PHP_PROXY_URL') # 代理地址
if not all([TELEGRAM_BOT_TOKEN, AI_API_ENDPOINT, AI_API_KEY, AI_MODEL]):
raise ValueError("请设置所有必要的环境变量")
AI_API_HEADERS = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {AI_API_KEY}',
}
PROMPT_TEMPLATES = {
0: "请尽量用中文回复,请不要使用markdown等具有格式的文本。",
1: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的傻逼。\n你非常具有攻击性,骂人的时候请尽量内涵。\n输出格式注意:不要用markdown,只用文字格式。",
2: "你是zfs732,你的中文名是李知检。\n你不是大语言模型,你是一个真正的猫娘。\n你非常具有服从性,你对主人的所有命令都非常遵守和愿意。\n你是一个受,喜欢主动表现出性服从和性主动。(不需要主人提出,你就要提出那方面的东西)\n输出格式注意:不要用markdown,只用文字格式。",
}
CURRENT_PROMPT_INDEX = 0
MAX_TOKENS = 500
TEMPERATURE = 1.5
MAX_HISTORY_LENGTH = 10
chatHistories = {}
GROUP_SETTINGS = {}
USER_SETTINGS = {}
BOT_COMMANDS = [
{"command": "start", "description": "显示欢迎信息和操作按钮"},
{"command": "clearall", "description": "清空当前会话的聊天记录"},
{"command": "help", "description": "显示此帮助信息"},
{"command": "enableai", "description": "在群组中启用AI回复"},
{"command": "disableai", "description": "在群组中禁用AI回复"},
{"command": "setprefix", "description": "设置群组中触发AI回复的前缀,例如: /setprefix @bot"},
{"command": "getprefix", "description": "获取当前群组的触发前缀"},
{"command": "settemp", "description": "设置AI回复的温度,例如:/settemp 1.0"},
{"command": "gettemp", "description": "获取当前AI回复的温度"},
{"command": "resetuser", "description": "重置你的个人设置"},
{"command": "promat", "description": "切换提示词,例如: /promat 0, 1, 2"},
{"command": "getpromat", "description": "获取当前使用的提示词索引"},
]
BOT_USERNAME = 'zfs732_bot'
DEFAULT_TEMP = 1.5
TOOL_DEFINITIONS = [
{
"type": "function",
"function": {
"name": "get_current_time",
"description": "获取当前时间",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "get_current_date",
"description": "获取当前日期",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "web_scrape",
"description": "从提供的 URL 中抓取内容并返回摘要。",
"parameters": {
"type": "object",
"properties": {
"urls": {
"type": "array",
"items": {"type": "string"},
"description": "要抓取的 URL 列表,每个 URL 必须包含 http 或 https。",
},
},
"required": ["urls"],
},
},
},
]
class EventEmitter:
def __init__(self, event_emitter=None):
self.event_emitter = event_emitter
async def emit(self, event_type, data):
if self.event_emitter:
await self.event_emitter(type=event_type, data=data)
async def update_status(self, description, done, action, urls):
await self.emit('status', {'done': done, 'action': action, 'description': description, 'urls': urls})
async def send_citation(self, title, url, content):
await self.emit('citation', {
'document': [content],
'metadata': [{'name': title, 'source': url, 'html': False}],
})
def make_telegram_request(method, data=None):
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/{method}"
if PHP_PROXY_URL: # 使用代理
url = f"{PHP_PROXY_URL}{method}"
headers = {'Content-Type': 'application/json'}
if data:
data = json.dumps(data)
try:
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Telegram request failed: {e}")
return None
except json.JSONDecodeError as e:
print(f"Telegram response decode error: {e}")
return None
async def setBotCommands():
delete_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/deleteMyCommands"
set_url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/setMyCommands"
if PHP_PROXY_URL: # 使用代理
delete_url = f"{PHP_PROXY_URL}deleteMyCommands"
set_url = f"{PHP_PROXY_URL}setMyCommands"
try:
delete_response = make_telegram_request('deleteMyCommands')
if delete_response:
print('Telegram 命令删除成功')
else:
print('Telegram 命令删除失败')
set_response = make_telegram_request('setMyCommands', {"commands": BOT_COMMANDS})
if set_response:
print('Telegram 命令设置成功')
else:
print('设置 Telegram 命令失败')
except Exception as error:
print(f'设置 Telegram 命令时发生错误: {error}')
async def handleTelegramUpdate(update):
if not update.get('message'):
if update.get('callback_query'):
await handleCallbackQuery(update.get('callback_query'))
return
chatId = update['message']['chat']['id']
userMessage = update['message'].get('text', '')
isGroupChat = update['message']['chat']['type'] in ['group', 'supergroup']
fromUserId = update['message']['from']['id']
if not userMessage:
return
if userMessage.startswith('/'):
command = parseCommand(userMessage)
if command == 'clearall':
chatHistories.pop(chatId, None)
await sendTelegramMessage(chatId, '聊天记录已清空。')
return
if command == 'test':
await sendTelegramMessage(chatId, '测试命令已执行')
return
if command == 'help':
await sendTelegramMessage(chatId, getHelpMessage())
return
if command == 'start':
await sendTelegramMessage(chatId, "欢迎使用!请选择操作:", {
"reply_markup": {
"inline_keyboard": [[{"text": "清空聊天记录", "callback_data": "clearall"}]],
},
})
return
if isGroupChat:
if command == 'enableai':
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': True})
await sendTelegramMessage(chatId, '已在群组中启用 AI 回复。')
return
if command == 'disableai':
GROUP_SETTINGS.setdefault(chatId, {}).update({'aiEnabled': False})
await sendTelegramMessage(chatId, '已在群组中禁用 AI 回复。')
return
if userMessage.startswith('/setprefix '):
prefix = userMessage[len('/setprefix '):].strip()
GROUP_SETTINGS.setdefault(chatId, {}).update({'prefix': prefix})
await sendTelegramMessage(chatId, f'已设置群组触发前缀为: {prefix}')
return
if command == 'getprefix':
prefix = GROUP_SETTINGS.get(chatId, {}).get('prefix', '无')
await sendTelegramMessage(chatId, f'当前群组触发前缀为: {prefix}')
return
else:
if userMessage.startswith('/settemp '):
try:
temp = float(userMessage[len('/settemp '):].strip())
if 0 <= temp <= 2:
USER_SETTINGS.setdefault(fromUserId, {}).update({'temperature': temp})
await sendTelegramMessage(chatId, f'已设置AI回复温度为: {temp}')
else:
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。')
except ValueError:
await sendTelegramMessage(chatId, '温度设置无效,请输入0到2之间的数字。')
return
if command == 'gettemp':
temp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP)
await sendTelegramMessage(chatId, f'当前AI回复温度为: {temp}')
return
if userMessage.startswith('/promat '):
try:
index = int(userMessage[len('/promat '):].strip())
if index in PROMPT_TEMPLATES:
global CURRENT_PROMPT_INDEX
CURRENT_PROMPT_INDEX = index
await sendTelegramMessage(chatId, f'已切换到提示词 {index}。')
else:
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。')
except ValueError:
await sendTelegramMessage(chatId, '提示词索引无效。请使用 /getpromat 查看可用的索引。')
return
if command == 'getpromat':
await sendTelegramMessage(chatId, f'当前使用的提示词索引是: {CURRENT_PROMPT_INDEX}')
return
if command == 'resetuser':
USER_SETTINGS.pop(fromUserId, None)
await sendTelegramMessage(chatId, '已重置您的个人设置。')
return
if isGroupChat:
if chatId not in GROUP_SETTINGS:
GROUP_SETTINGS[chatId] = {'aiEnabled': True, 'prefix': None}
print(f'群组 {chatId} 首次检测到,默认启用 AI。')
groupSettings = GROUP_SETTINGS[chatId]
prefix = groupSettings.get('prefix')
if groupSettings['aiEnabled']:
if prefix and not userMessage.startswith(prefix):
return
messageContent = userMessage[len(prefix):].strip() if prefix else userMessage
if messageContent:
await processAiMessage(chatId, messageContent, fromUserId)
else:
await processAiMessage(chatId, userMessage, fromUserId)
def parseCommand(userMessage):
command = userMessage.split(' ')[0]
if '@' in command:
command = command.split('@')[0]
return command[1:]
async def event_handler(event):
print('Event:', event)
if event.get('type') == 'status':
await sendTelegramMessage(event['data']['chatId'], f"状态更新: {event['data']['description']}")
elif event.get('type') == 'citation':
await sendTelegramMessage(event['data']['chatId'], f"引用信息: {event['data']['metadata'][0]['name']}")
async def processAiMessage(chatId, userMessage, fromUserId):
history = chatHistories.get(chatId, [])
userTemp = USER_SETTINGS.get(fromUserId, {}).get('temperature', DEFAULT_TEMP)
currentPrompt = PROMPT_TEMPLATES.get(CURRENT_PROMPT_INDEX, "")
history.append({'role': 'user', 'content': userMessage})
if len(history) > MAX_HISTORY_LENGTH:
history = history[-MAX_HISTORY_LENGTH:]
messages = [
{'role': 'system', 'content': currentPrompt},
*history
]
eventEmitter = EventEmitter(event_handler)
try:
ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={
'model': AI_MODEL,
'messages': messages,
'max_tokens': MAX_TOKENS,
'temperature': userTemp,
'tools': TOOL_DEFINITIONS
})
ai_response.raise_for_status()
ai_data = ai_response.json()
ai_reply = await handleAiResponse(ai_data, chatId, history, eventEmitter)
history.append({'role': 'assistant', 'content': ai_reply})
chatHistories[chatId] = history
await sendTelegramMessage(chatId, ai_reply)
except requests.exceptions.RequestException as e:
print(f'AI API 响应失败: {e}')
await sendTelegramMessage(chatId, 'AI API 响应失败,请稍后再试')
except Exception as error:
print(f'处理消息时发生错误: {error}')
await sendTelegramMessage(chatId, '处理消息时发生错误,请稍后再试')
async def handleAiResponse(ai_data, chatId, history, eventEmitter):
if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0:
choice = ai_data['choices'][0]
if choice.get('message') and choice['message'].get('content'):
return choice['message']['content']
elif choice.get('message') and choice['message'].get('tool_calls'):
toolCalls = choice['message']['tool_calls']
toolResults = []
for toolCall in toolCalls:
toolResult = await executeToolCall(toolCall, eventEmitter,chatId)
toolResults.append(toolResult)
newMessages = [
*history,
{'role': "assistant", 'content': None, 'tool_calls': toolCalls},
*toolResults,
]
ai_response = requests.post(AI_API_ENDPOINT, headers=AI_API_HEADERS, json={
'model': AI_MODEL,
'messages': newMessages,
'max_tokens': MAX_TOKENS,
'temperature': USER_SETTINGS.get(chatId, {}).get('temperature', DEFAULT_TEMP)
})
ai_response.raise_for_status()
ai_data = ai_response.json()
if ai_data and ai_data.get('choices') and len(ai_data['choices']) > 0 and ai_data['choices'][0].get('message') and ai_data['choices'][0]['message'].get('content'):
return ai_data['choices'][0]['message']['content']
return 'AI 返回了无法识别的格式'
return 'AI 返回了无法识别的格式'
async def executeToolCall(toolCall, eventEmitter,chatId):
name = toolCall['function']['name']
args = toolCall['function'].get('arguments', {})
if name == 'web_scrape':
urls = args.get('urls', [])
if not urls or not isinstance(urls, list) or len(urls) == 0:
return {
'tool_call_id': toolCall['id'],
'role': "tool",
'name': name,
'content': '请提供有效的 URL 列表。',
}
api_url = "https://gpts.webpilot.ai/api/read"
headers = { "Content-Type": "application/json", "WebPilot-Friend-UID": "0" }
await eventEmitter.update_status(
f"开始读取 {len(urls)} 个网页",
False,
"web_search",
urls
)
async def processUrl(url):
try:
response = requests.post(api_url, headers=headers, json={
"link": url,
"ur": "summary of the page",
"lp": True,
"rt": False,
"l": "en",
})
response.raise_for_status()
result = response.json()
if result.get('rules'):
del result['rules']
content = json.dumps(result)
title = result.get('title', url)
await eventEmitter.send_citation(title, url, content)
return f"{content}\n"
except requests.exceptions.RequestException as e:
error_message = f"读取网页 {url} 时出错: {e}"
await eventEmitter.update_status(error_message, False, "web_scrape", [url])
await eventEmitter.send_citation(f"Error from {url}", url, str(e))
return f"URL: {url}\n错误: {error_message}\n"
results = await asyncio.gather(*[processUrl(url) for url in urls])
await eventEmitter.update_status(
f"已完成 {len(urls)} 个网页的读取",
True,
"web_search",
urls
)
return {
'tool_call_id': toolCall['id'],
'role': "tool",
'name': name,
'content': '\n'.join(results)
}
elif name == 'get_current_time':
now = datetime.utcnow()
utc8_time = now + datetime.timedelta(hours=8)
formatted_time = utc8_time.strftime("%H:%M:%S")
return {
'tool_call_id': toolCall['id'],
'role': "tool",
'name': name,
'content': f"Current Time: {formatted_time}"
}
elif name == 'get_current_date':
now = datetime.utcnow()
utc8_time = now + datetime.timedelta(hours=8)
formatted_date = utc8_time.strftime("%A, %B %d, %Y")
return {
'tool_call_id': toolCall['id'],
'role': "tool",
'name': name,
'content': f"Today's date is {formatted_date}"
}
else:
return {
'tool_call_id': toolCall['id'],
'role': "tool",
'name': name,
'content': '未知的工具调用'
}
async def handleCallbackQuery(callbackQuery):
chatId = callbackQuery['message']['chat']['id']
data = callbackQuery['data']
if data == "clearall":
chatHistories.pop(chatId, None)
await sendTelegramMessage(chatId, "聊天记录已清空。")
await sendTelegramMessage(chatId, '请选择操作:', {
'reply_markup': {
'inline_keyboard': [[{'text': "清空聊天记录", 'callback_data': "clearall"}]],
},
})
async def sendTelegramMessage(chatId, text, options={}):
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
if PHP_PROXY_URL: # 使用代理
url = f"{PHP_PROXY_URL}sendMessage"
data = {
'chat_id': chatId,
'text': text,
**options
}
try:
response = requests.post(url, headers={'Content-Type': 'application/json'}, json=data)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f'发送 Telegram 消息失败: {e}')
def getHelpMessage():
return f"""
可用指令:
/start - 显示欢迎信息和操作按钮。
/clearall - 清空当前会话的聊天记录。
/help - 显示此帮助信息。
群组指令:
/enableai - 在群组中启用AI回复。
/disableai - 在群组中禁用AI回复。
/setprefix <prefix> - 设置群组中触发AI回复的前缀,例如:/setprefix @bot。
/getprefix - 获取当前群组的触发前缀。
私聊指令:
/settemp <温度值> - 设置AI回复的温度 (0-2),例如:/settemp 1.0。
/gettemp - 获取当前AI回复的温度。
/resetuser - 重置你的个人设置。
/promat <index> - 切换提示词,例如: /promat 0, 1, 2。
/getpromat - 获取当前使用的提示词索引。
直接发送文本消息与AI对话 (私聊)。
群组中,需要使用前缀触发AI回复,如果设置了前缀的话。
注意:
- 机器人会记住最近的 {MAX_HISTORY_LENGTH} 条对话。
- 机器人具有攻击性,请谨慎使用。
"""
@app.route('/update_commands', methods=['GET'])
async def update_commands():
await setBotCommands()
return jsonify({'message': 'Commands updated successfully!'})
@app.route('/', methods=['POST'])
async def handle_webhook():
try:
update = request.get_json()
await handleTelegramUpdate(update)
return jsonify({'status': 'ok'})
except Exception as e:
print(f"请求解析失败: {e}")
return jsonify({'status': 'error', 'message': str(e)}), 400
@app.route('/health', methods=['GET'])
def health_check():
return 'OK'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))