API / app.py
Reality123b's picture
Update app.py
e69c140 verified
raw
history blame
13.8 kB
import os
import base64
import requests
import gradio as gr
from huggingface_hub import InferenceClient
from dataclasses import dataclass
import speech_recognition as sr
import easyocr
from PIL import Image
@dataclass
class ChatMessage:
role: str
content: str
def to_dict(self):
return {"role": self.role, "content": self.content}
class XylariaChat:
def __init__(self):
self.hf_token = os.getenv("HF_TOKEN")
if not self.hf_token:
raise ValueError("HuggingFace token not found in environment variables")
self.client = InferenceClient(
model="Qwen/QwQ-32B-Preview",
api_key=self.hf_token
)
self.image_api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large"
self.image_api_headers = {"Authorization": f"Bearer {self.hf_token}"}
self.conversation_history = []
self.persistent_memory = {}
self.system_prompt = """You are a helpful and harmless assistant. You are Xylaria developed by Sk Md Saad Amin . You should think step-by-step."""
self.reader = easyocr.Reader(['ch_sim','en'])
def store_information(self, key, value):
self.persistent_memory[key] = value
return f"Stored: {key} = {value}"
def retrieve_information(self, key):
return self.persistent_memory.get(key, "No information found for this key.")
def reset_conversation(self):
self.conversation_history = []
self.persistent_memory.clear()
try:
self.client = InferenceClient(
model="Qwen/QwQ-32B-Preview",
api_key=self.hf_token
)
except Exception as e:
print(f"Error resetting API client: {e}")
return None
def caption_image(self, image):
try:
if isinstance(image, str) and os.path.isfile(image):
with open(image, "rb") as f:
data = f.read()
elif isinstance(image, str):
if image.startswith('data:image'):
image = image.split(',')[1]
data = base64.b64decode(image)
else:
data = image.read()
response = requests.post(
self.image_api_url,
headers=self.image_api_headers,
data=data
)
if response.status_code == 200:
caption = response.json()[0].get('generated_text', 'No caption generated')
return caption
else:
return f"Error captioning image: {response.status_code} - {response.text}"
except Exception as e:
return f"Error processing image: {str(e)}"
def perform_math_ocr(self, image_path):
try:
img = Image.open(image_path)
result = self.reader.readtext(image_path)
text = ' '.join([item[1] for item in result])
return text.strip()
except Exception as e:
return f"Error during Math OCR: {e}"
def get_response(self, user_input, image=None):
try:
messages = []
messages.append(ChatMessage(
role="system",
content=self.system_prompt
).to_dict())
if self.persistent_memory:
memory_context = "Remembered Information:\n" + "\n".join(
[f"{k}: {v}" for k, v in self.persistent_memory.items()]
)
messages.append(ChatMessage(
role="system",
content=memory_context
).to_dict())
for msg in self.conversation_history:
messages.append(msg)
if image:
image_caption = self.caption_image(image)
user_input = f"description of an image: {image_caption}\n\nUser's message about it: {user_input}"
messages.append(ChatMessage(
role="user",
content=user_input
).to_dict())
input_tokens = sum(len(msg['content'].split()) for msg in messages)
max_new_tokens = 16384 - input_tokens - 50
max_new_tokens = min(max_new_tokens, 10020)
stream = self.client.chat_completion(
messages=messages,
model="Qwen/QwQ-32B-Preview",
temperature=0.7,
max_tokens=max_new_tokens,
top_p=0.9,
stream=True
)
return stream
except Exception as e:
print(f"Detailed error in get_response: {e}")
return f"Error generating response: {str(e)}"
def messages_to_prompt(self, messages):
prompt = ""
for msg in messages:
if msg["role"] == "system":
prompt += f"<|system|>\n{msg['content']}<|end|>\n"
elif msg["role"] == "user":
prompt += f"<|user|>\n{msg['content']}<|end|>\n"
elif msg["role"] == "assistant":
prompt += f"<|assistant|>\n{msg['content']}<|end|>\n"
prompt += "<|assistant|>\n"
return prompt
def recognize_speech(self, audio_file):
recognizer = sr.Recognizer()
try:
with sr.AudioFile(audio_file) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data)
return text
except sr.UnknownValueError:
return "Could not understand audio"
except sr.RequestError:
return "Could not request results from Google Speech Recognition service"
def create_interface(self):
def streaming_response(message, chat_history, image_filepath, math_ocr_image_path, audio_file):
if audio_file:
voice_message = self.recognize_speech(audio_file)
if not voice_message.startswith("Error"):
message = voice_message
ocr_text = ""
if math_ocr_image_path:
ocr_text = self.perform_math_ocr(math_ocr_image_path)
if ocr_text.startswith("Error"):
updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": ocr_text}]]
yield "", updated_history, None, None, None
return
elif len(ocr_text) > 500:
ocr_text = "OCR output is too large to be processed."
updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": ocr_text}]]
yield "", updated_history, None, None, None
return
else:
message = f"Math OCR Result: {ocr_text}\n\nUser's message: {message}"
if image_filepath:
response_stream = self.get_response(message, image_filepath)
else:
response_stream = self.get_response(message)
if isinstance(response_stream, str):
updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": response_stream}]]
yield "", updated_history, None, None, None
return
full_response = ""
updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": ""}]]
try:
for chunk in response_stream:
if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content:
chunk_content = chunk.choices[0].delta.content
full_response += chunk_content
updated_history[-1][1]["content"] = full_response
yield "", updated_history, None, None, None
except Exception as e:
print(f"Streaming error: {e}")
updated_history[-1][1]["content"] = f"Error during response: {e}"
yield "", updated_history, None, None, None
return
self.conversation_history.append(ChatMessage(role="user", content=message).to_dict())
self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict())
if len(self.conversation_history) > 10:
self.conversation_history = self.conversation_history[-10:]
custom_css = """
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap');
body, .gradio-container {
font-family: 'Inter', sans-serif !important;
}
.chatbot-container .message {
font-family: 'Inter', sans-serif !important;
}
.gradio-container input,
.gradio-container textarea,
.gradio-container button {
font-family: 'Inter', sans-serif !important;
}
.image-container {
display: flex;
gap: 10px;
margin-bottom: 10px;
}
.image-upload {
border: 1px solid #ccc;
border-radius: 8px;
padding: 10px;
background-color: #f8f8f8;
}
.image-preview {
max-width: 200px;
max-height: 200px;
border-radius: 8px;
}
.clear-button {
display: none;
}
.chatbot-container .message {
opacity: 0;
animation: fadeIn 0.5s ease-in-out forwards;
}
@keyframes fadeIn {
from {
opacity: 0;
transform: translateY(20px);
}
to {
opacity: 1;
transform: translateY(0);
}
}
.gradio-accordion {
overflow: hidden;
transition: max-height 0.3s ease-in-out;
max-height: 0;
}
.gradio-accordion.open {
max-height: 500px;
}
"""
with gr.Blocks(theme='soft', css=custom_css) as demo:
with gr.Column():
chatbot = gr.Chatbot(
label="Xylaria 1.5 Senoa (EXPERIMENTAL)",
height=500,
show_copy_button=True,
type='messages'
)
with gr.Accordion("Image Input", open=False) as accordion:
with gr.Row(elem_classes="image-container"):
with gr.Column(elem_classes="image-upload"):
img = gr.Image(
sources=["upload", "webcam"],
type="filepath",
label="Upload Image",
elem_classes="image-preview"
)
with gr.Column(elem_classes="image-upload"):
math_ocr_img = gr.Image(
sources=["upload", "webcam"],
type="filepath",
label="Upload Image for Math OCR",
elem_classes="image-preview"
)
with gr.Row():
with gr.Column(scale=4):
txt = gr.Textbox(
show_label=False,
placeholder="Type your message...",
container=False
)
with gr.Column(scale=1):
audio_input = gr.Audio(
sources=["microphone"],
type="filepath",
label="Voice Input"
)
btn = gr.Button("Send", scale=1)
with gr.Row():
clear = gr.Button("Clear Conversation")
clear_memory = gr.Button("Clear Memory")
btn.click(
fn=streaming_response,
inputs=[txt, chatbot, img, math_ocr_img, audio_input],
outputs=[txt, chatbot, img, math_ocr_img, audio_input]
)
txt.submit(
fn=streaming_response,
inputs=[txt, chatbot, img, math_ocr_img, audio_input],
outputs=[txt, chatbot, img, math_ocr_img, audio_input]
)
clear.click(
fn=lambda: None,
inputs=None,
outputs=[chatbot],
queue=False
)
clear_memory.click(
fn=self.reset_conversation,
inputs=None,
outputs=[chatbot],
queue=False
)
demo.load(None, None, None, _js="""
() => {
const accordion = document.querySelector(".gradio-accordion");
if (accordion) {
const accordionHeader = accordion.querySelector(".label-wrap");
accordionHeader.addEventListener("click", () => {
accordion.classList.toggle("open");
});
}
}
""")
demo.load(self.reset_conversation, None, None)
return demo
def main():
chat = XylariaChat()
interface = chat.create_interface()
interface.launch(
share=True,
debug=True
)
if __name__ == "__main__":
main()