Botpy-808 / app.py
Fred808's picture
Update app.py
516168a verified
raw
history blame
14.8 kB
# chatbot_api.py
import os
import time
import requests
import base64
from datetime import datetime
from bs4 import BeautifulSoup
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File
from fastapi.responses import JSONResponse, StreamingResponse
import openai
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Text
# --- Configuration & Environment Variables ---
SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "815bf76e0764456293f0e96e080e8f60")
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "pk_test_3222fb257041f1f2fd5ef33eafd19e1db4bdb634")
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres.lgbnxplydqdymepehirg:[email protected]:5432/postgres")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "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") # For NVIDIA LLM endpoints
openai.api_key = os.getenv("OPENAI_API_KEY", "your_openai_api_key")
# --- Database Setup ---
Base = declarative_base()
class ChatHistory(Base):
__tablename__ = "chat_history"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
direction = Column(String) # 'inbound' or 'outbound'
message = Column(Text)
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(String, unique=True, index=True)
user_id = Column(String, index=True)
dish = Column(String)
quantity = Column(String)
price = Column(String, default="0") # Price as string (or use a numeric type)
status = Column(String, default="Pending Payment") # e.g., Pending Payment, Paid, Completed
payment_reference = Column(String, nullable=True)
timestamp = Column(DateTime, default=datetime.utcnow)
# Create the asynchronous engine. Make sure DATABASE_URL is configured correctly.
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# --- Global In-Memory Stores for Ephemeral Data ---
user_state = {} # Example: { user_id: { "flow": "order", "step": int, "data": dict } }
# Local menu with nutritional details
menu_items = [
{"name": "Jollof Rice", "description": "A spicy and flavorful rice dish", "price": 1500, "nutrition": "Calories: 300 kcal, Carbs: 50g, Protein: 10g, Fat: 5g"},
{"name": "Fried Rice", "description": "A savory rice dish with vegetables and meat", "price": 1200, "nutrition": "Calories: 350 kcal, Carbs: 55g, Protein: 12g, Fat: 8g"},
{"name": "Chicken Wings", "description": "Crispy fried chicken wings", "price": 2000, "nutrition": "Calories: 400 kcal, Carbs: 20g, Protein: 25g, Fat: 15g"},
{"name": "Egusi Soup", "description": "A rich and hearty soup made with melon seeds", "price": 1000, "nutrition": "Calories: 250 kcal, Carbs: 15g, Protein: 8g, Fat: 10g"}
]
# --- Utility Functions ---
async def log_chat_to_db(user_id: str, direction: str, message: str):
"""Store chat messages into the database asynchronously."""
async with async_session() as session:
entry = ChatHistory(user_id=user_id, direction=direction, message=message)
session.add(entry)
await session.commit()
def google_image_scrape(query: str) -> str:
"""
Scrape Google Images using BeautifulSoup to get an image URL for the query.
Note: This basic scraper may break if Google changes its markup.
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}
search_url = f"https://www.google.com/search?tbm=isch&q={query}"
try:
response = requests.get(search_url, headers=headers, timeout=5)
except Exception:
return ""
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
img_tags = soup.find_all("img")
for img in img_tags:
src = img.get("src")
if src and src.startswith("http"):
return src
return ""
def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict:
"""
Call Paystack to initialize a transaction.
- email: customer's email
- amount: in kobo (multiply NGN amount by 100)
- reference: unique order reference
Returns a dict with the payment link and status.
"""
url = "https://api.paystack.co/transaction/initialize"
headers = {
"Authorization": f"Bearer {PAYSTACK_SECRET_KEY}",
"Content-Type": "application/json",
}
data = {
"email": email,
"amount": amount,
"reference": reference,
"callback_url": "https://yourdomain.com/payment_callback" # Replace with your callback URL.
}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
return {"status": False, "message": "Failed to initialize payment."}
except Exception as e:
return {"status": False, "message": str(e)}
# --- NVIDIA LLM Streaming Functions ---
def stream_text_completion(prompt: str):
"""
Stream text completion using NVIDIA's text-only model.
Uses the OpenAI client interface pointed to NVIDIA's endpoint.
"""
from openai import OpenAI # Using OpenAI client library
client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=NVIDIA_API_KEY
)
completion = client.chat.completions.create(
model="meta/llama-3.1-405b-instruct",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
top_p=0.7,
max_tokens=1024,
stream=True
)
for chunk in completion:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
def stream_image_completion(image_b64: str):
"""
Stream image-based query using NVIDIA's vision model.
The image (in base64) is embedded in an HTML <img> tag.
"""
invoke_url = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions"
headers = {
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Accept": "text/event-stream"
}
payload = {
"model": "meta/llama-3.2-90b-vision-instruct",
"messages": [
{
"role": "user",
"content": f'What is in this image? <img src="data:image/png;base64,{image_b64}" />'
}
],
"max_tokens": 512,
"temperature": 1.00,
"top_p": 1.00,
"stream": True
}
response = requests.post(invoke_url, headers=headers, json=payload, stream=True)
for line in response.iter_lines():
if line:
yield line.decode("utf-8") + "\n"
# --- Internal Flow: Order Processing & Payment Integration ---
def process_internal_flow(user_id: str, message: str) -> str:
"""
A simple two-step order flow:
- Step 1: Ask for dish.
- Step 2: Ask for quantity.
After collecting these details, the order is saved and a payment link is generated.
"""
if user_id in user_state:
state = user_state[user_id]
flow = state.get("flow")
step = state.get("step")
data = state.get("data", {})
if flow == "order":
if step == 1:
data["dish"] = message.title()
state["step"] = 2
return f"You selected {data['dish']}. How many servings would you like?"
elif step == 2:
data["quantity"] = message
order_id = f"ORD-{int(time.time())}"
data["order_id"] = order_id
# Price calculation example (₦1500 per serving)
price_per_serving = 1500
total_price = int(data["quantity"]) * price_per_serving
data["price"] = str(total_price)
# Save order details to the DB asynchronously.
import asyncio
async def save_order():
async with async_session() as session:
order = Order(
order_id=order_id,
user_id=user_id,
dish=data["dish"],
quantity=data["quantity"],
price=str(total_price),
status="Pending Payment"
)
session.add(order)
await session.commit()
asyncio.create_task(save_order())
# Clear the in-memory state.
del user_state[user_id]
# Assume we have the customer's email; using a placeholder.
email = "[email protected]"
payment_data = create_paystack_payment_link(email, total_price * 100, order_id)
if payment_data.get("status"):
payment_link = payment_data["data"]["authorization_url"]
return (f"Thank you for your order of {data['quantity']} serving(s) of {data['dish']}! "
f"Your Order ID is {order_id}.\nPlease complete payment here: {payment_link}")
else:
return f"Your order has been placed with Order ID {order_id}, but we could not initialize payment. Please try again later."
else:
if "order" in message.lower():
user_state[user_id] = {"flow": "order", "step": 1, "data": {}}
return "Sure! What dish would you like to order?"
return ""
# --- FastAPI Setup & Endpoints ---
app = FastAPI()
@app.on_event("startup")
async def on_startup():
await init_db()
@app.post("/chatbot")
async def chatbot_response(request: Request, background_tasks: BackgroundTasks):
"""
Main chatbot endpoint.
Expects a JSON payload with:
- 'user_id'
- 'message' (text query)
- Optionally, 'is_image': true and 'image_base64': <base64 string> for image queries.
Streaming responses will be returned.
"""
data = await request.json()
user_id = data.get("user_id")
user_message = data.get("message", "").strip()
is_image = data.get("is_image", False)
image_b64 = data.get("image_base64", None)
if not user_id:
raise HTTPException(status_code=400, detail="Missing user_id in payload.")
# Log inbound message if it's a text query (for image queries, you might log separately).
if user_message:
background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message)
if is_image and image_b64 is None:
raise HTTPException(status_code=400, detail="is_image is true but no image_base64 provided.")
# If an image is provided, use the image model.
if is_image and image_b64:
# Verify the image is small enough.
if len(image_b64) >= 180_000:
raise HTTPException(status_code=400, detail="Image too large. Use a smaller image or the assets API.")
# Return a streaming response from the image-based LLM.
return StreamingResponse(stream_image_completion(image_b64), media_type="text/plain")
# --- Process textual queries (menu, nutritional facts, internal flows) ---
if "menu" in user_message.lower():
menu_with_images = []
for item in menu_items:
image_url = google_image_scrape(item["name"])
menu_with_images.append({"name": item["name"], "description": item["description"], "price": item["price"], "image_url": image_url})
response_payload = {
"response": "Here’s our delicious menu:",
"menu": menu_with_images,
"follow_up": ("Would you like to see nutritional facts for any dish? "
"Just type, for example, 'Nutritional facts for Jollof Rice'.")
}
background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload))
return JSONResponse(content=response_payload)
if "nutritional facts for" in user_message.lower():
dish_name = user_message.lower().replace("nutritional facts for", "").strip().title()
dish = next((item for item in menu_items if item["name"].lower() == dish_name.lower()), None)
if dish:
response_text = f"Nutritional facts for {dish['name']}:\n{dish['nutrition']}"
else:
response_text = f"Sorry, I couldn't find nutritional facts for {dish_name}."
background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text)
return JSONResponse(content={"response": response_text})
internal_response = process_internal_flow(user_id, user_message)
if internal_response:
background_tasks.add_task(log_chat_to_db, user_id, "outbound", internal_response)
return JSONResponse(content={"response": internal_response})
# --- Fallback: Use NVIDIA text LLM with streaming ---
prompt = f"User query: {user_message}\nGenerate a helpful response for a restaurant chatbot."
# Create a streaming response using the NVIDIA text model.
def stream_response():
for chunk in stream_text_completion(prompt):
yield chunk
background_tasks.add_task(log_chat_to_db, user_id, "outbound", f"LLM fallback response for prompt: {prompt}")
return StreamingResponse(stream_response(), media_type="text/plain")
@app.get("/chat_history/{user_id}")
async def get_chat_history(user_id: str):
"""
Retrieve the chat history for a given user from the database.
"""
async with async_session() as session:
result = await session.execute(
ChatHistory.__table__.select().where(ChatHistory.user_id == user_id)
)
history = result.fetchall()
return [dict(row) for row in history]
@app.get("/order/{order_id}")
async def get_order(order_id: str):
"""
Retrieve details for a specific order from the database.
"""
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.fetchone()
if order:
return dict(order)
else:
raise HTTPException(status_code=404, detail="Order not found.")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)