# chatbot_api.py import os import time import requests import base64 from datetime import datetime from bs4 import BeautifulSoup from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File from fastapi.responses import JSONResponse, StreamingResponse import openai from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker, declarative_base from sqlalchemy import Column, Integer, String, DateTime, Text # --- Configuration & Environment Variables --- SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "815bf76e0764456293f0e96e080e8f60") PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "pk_test_3222fb257041f1f2fd5ef33eafd19e1db4bdb634") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres.lgbnxplydqdymepehirg:Lovyelias5584.@aws-0-eu-central-1.pooler.supabase.com:5432/postgres") NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "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") # For NVIDIA LLM endpoints openai.api_key = os.getenv("OPENAI_API_KEY", "your_openai_api_key") # --- Database Setup --- Base = declarative_base() class ChatHistory(Base): __tablename__ = "chat_history" id = Column(Integer, primary_key=True, index=True) user_id = Column(String, index=True) timestamp = Column(DateTime, default=datetime.utcnow) direction = Column(String) # 'inbound' or 'outbound' message = Column(Text) class Order(Base): __tablename__ = "orders" id = Column(Integer, primary_key=True, index=True) order_id = Column(String, unique=True, index=True) user_id = Column(String, index=True) dish = Column(String) quantity = Column(String) price = Column(String, default="0") # Price as string (or use a numeric type) status = Column(String, default="Pending Payment") # e.g., Pending Payment, Paid, Completed payment_reference = Column(String, nullable=True) timestamp = Column(DateTime, default=datetime.utcnow) # Create the asynchronous engine. Make sure DATABASE_URL is configured correctly. engine = create_async_engine(DATABASE_URL, echo=True) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async def init_db(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # --- Global In-Memory Stores for Ephemeral Data --- user_state = {} # Example: { user_id: { "flow": "order", "step": int, "data": dict } } # Local menu with nutritional details menu_items = [ {"name": "Jollof Rice", "description": "A spicy and flavorful rice dish", "price": 1500, "nutrition": "Calories: 300 kcal, Carbs: 50g, Protein: 10g, Fat: 5g"}, {"name": "Fried Rice", "description": "A savory rice dish with vegetables and meat", "price": 1200, "nutrition": "Calories: 350 kcal, Carbs: 55g, Protein: 12g, Fat: 8g"}, {"name": "Chicken Wings", "description": "Crispy fried chicken wings", "price": 2000, "nutrition": "Calories: 400 kcal, Carbs: 20g, Protein: 25g, Fat: 15g"}, {"name": "Egusi Soup", "description": "A rich and hearty soup made with melon seeds", "price": 1000, "nutrition": "Calories: 250 kcal, Carbs: 15g, Protein: 8g, Fat: 10g"} ] # --- Utility Functions --- async def log_chat_to_db(user_id: str, direction: str, message: str): """Store chat messages into the database asynchronously.""" async with async_session() as session: entry = ChatHistory(user_id=user_id, direction=direction, message=message) session.add(entry) await session.commit() def google_image_scrape(query: str) -> str: """ Scrape Google Images using BeautifulSoup to get an image URL for the query. Note: This basic scraper may break if Google changes its markup. """ headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" } search_url = f"https://www.google.com/search?tbm=isch&q={query}" try: response = requests.get(search_url, headers=headers, timeout=5) except Exception: return "" if response.status_code == 200: soup = BeautifulSoup(response.text, "html.parser") img_tags = soup.find_all("img") for img in img_tags: src = img.get("src") if src and src.startswith("http"): return src return "" def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict: """ Call Paystack to initialize a transaction. - email: customer's email - amount: in kobo (multiply NGN amount by 100) - reference: unique order reference Returns a dict with the payment link and status. """ url = "https://api.paystack.co/transaction/initialize" headers = { "Authorization": f"Bearer {PAYSTACK_SECRET_KEY}", "Content-Type": "application/json", } data = { "email": email, "amount": amount, "reference": reference, "callback_url": "https://yourdomain.com/payment_callback" # Replace with your callback URL. } try: response = requests.post(url, json=data, headers=headers, timeout=10) if response.status_code == 200: return response.json() else: return {"status": False, "message": "Failed to initialize payment."} except Exception as e: return {"status": False, "message": str(e)} # --- NVIDIA LLM Streaming Functions --- def stream_text_completion(prompt: str): """ Stream text completion using NVIDIA's text-only model. Uses the OpenAI client interface pointed to NVIDIA's endpoint. """ from openai import OpenAI # Using OpenAI client library client = OpenAI( base_url="https://integrate.api.nvidia.com/v1", api_key=NVIDIA_API_KEY ) completion = client.chat.completions.create( model="meta/llama-3.1-405b-instruct", messages=[{"role": "user", "content": prompt}], temperature=0.2, top_p=0.7, max_tokens=1024, stream=True ) for chunk in completion: if chunk.choices[0].delta.content is not None: yield chunk.choices[0].delta.content def stream_image_completion(image_b64: str): """ Stream image-based query using NVIDIA's vision model. The image (in base64) is embedded in an HTML tag. """ invoke_url = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions" headers = { "Authorization": f"Bearer {NVIDIA_API_KEY}", "Accept": "text/event-stream" } payload = { "model": "meta/llama-3.2-90b-vision-instruct", "messages": [ { "role": "user", "content": f'What is in this image? ' } ], "max_tokens": 512, "temperature": 1.00, "top_p": 1.00, "stream": True } response = requests.post(invoke_url, headers=headers, json=payload, stream=True) for line in response.iter_lines(): if line: yield line.decode("utf-8") + "\n" # --- Internal Flow: Order Processing & Payment Integration --- def process_internal_flow(user_id: str, message: str) -> str: """ A simple two-step order flow: - Step 1: Ask for dish. - Step 2: Ask for quantity. After collecting these details, the order is saved and a payment link is generated. """ if user_id in user_state: state = user_state[user_id] flow = state.get("flow") step = state.get("step") data = state.get("data", {}) if flow == "order": if step == 1: data["dish"] = message.title() state["step"] = 2 return f"You selected {data['dish']}. How many servings would you like?" elif step == 2: data["quantity"] = message order_id = f"ORD-{int(time.time())}" data["order_id"] = order_id # Price calculation example (₦1500 per serving) price_per_serving = 1500 total_price = int(data["quantity"]) * price_per_serving data["price"] = str(total_price) # Save order details to the DB asynchronously. import asyncio async def save_order(): async with async_session() as session: order = Order( order_id=order_id, user_id=user_id, dish=data["dish"], quantity=data["quantity"], price=str(total_price), status="Pending Payment" ) session.add(order) await session.commit() asyncio.create_task(save_order()) # Clear the in-memory state. del user_state[user_id] # Assume we have the customer's email; using a placeholder. email = "customer@example.com" payment_data = create_paystack_payment_link(email, total_price * 100, order_id) if payment_data.get("status"): payment_link = payment_data["data"]["authorization_url"] return (f"Thank you for your order of {data['quantity']} serving(s) of {data['dish']}! " f"Your Order ID is {order_id}.\nPlease complete payment here: {payment_link}") else: return f"Your order has been placed with Order ID {order_id}, but we could not initialize payment. Please try again later." else: if "order" in message.lower(): user_state[user_id] = {"flow": "order", "step": 1, "data": {}} return "Sure! What dish would you like to order?" return "" # --- FastAPI Setup & Endpoints --- app = FastAPI() @app.on_event("startup") async def on_startup(): await init_db() @app.post("/chatbot") async def chatbot_response(request: Request, background_tasks: BackgroundTasks): """ Main chatbot endpoint. Expects a JSON payload with: - 'user_id' - 'message' (text query) - Optionally, 'is_image': true and 'image_base64': for image queries. Streaming responses will be returned. """ data = await request.json() user_id = data.get("user_id") user_message = data.get("message", "").strip() is_image = data.get("is_image", False) image_b64 = data.get("image_base64", None) if not user_id: raise HTTPException(status_code=400, detail="Missing user_id in payload.") # Log inbound message if it's a text query (for image queries, you might log separately). if user_message: background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message) if is_image and image_b64 is None: raise HTTPException(status_code=400, detail="is_image is true but no image_base64 provided.") # If an image is provided, use the image model. if is_image and image_b64: # Verify the image is small enough. if len(image_b64) >= 180_000: raise HTTPException(status_code=400, detail="Image too large. Use a smaller image or the assets API.") # Return a streaming response from the image-based LLM. return StreamingResponse(stream_image_completion(image_b64), media_type="text/plain") # --- Process textual queries (menu, nutritional facts, internal flows) --- if "menu" in user_message.lower(): menu_with_images = [] for item in menu_items: image_url = google_image_scrape(item["name"]) menu_with_images.append({"name": item["name"], "description": item["description"], "price": item["price"], "image_url": image_url}) response_payload = { "response": "Here’s our delicious menu:", "menu": menu_with_images, "follow_up": ("Would you like to see nutritional facts for any dish? " "Just type, for example, 'Nutritional facts for Jollof Rice'.") } background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload)) return JSONResponse(content=response_payload) if "nutritional facts for" in user_message.lower(): dish_name = user_message.lower().replace("nutritional facts for", "").strip().title() dish = next((item for item in menu_items if item["name"].lower() == dish_name.lower()), None) if dish: response_text = f"Nutritional facts for {dish['name']}:\n{dish['nutrition']}" else: response_text = f"Sorry, I couldn't find nutritional facts for {dish_name}." background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text) return JSONResponse(content={"response": response_text}) internal_response = process_internal_flow(user_id, user_message) if internal_response: background_tasks.add_task(log_chat_to_db, user_id, "outbound", internal_response) return JSONResponse(content={"response": internal_response}) # --- Fallback: Use NVIDIA text LLM with streaming --- prompt = f"User query: {user_message}\nGenerate a helpful response for a restaurant chatbot." # Create a streaming response using the NVIDIA text model. def stream_response(): for chunk in stream_text_completion(prompt): yield chunk background_tasks.add_task(log_chat_to_db, user_id, "outbound", f"LLM fallback response for prompt: {prompt}") return StreamingResponse(stream_response(), media_type="text/plain") @app.get("/chat_history/{user_id}") async def get_chat_history(user_id: str): """ Retrieve the chat history for a given user from the database. """ async with async_session() as session: result = await session.execute( ChatHistory.__table__.select().where(ChatHistory.user_id == user_id) ) history = result.fetchall() return [dict(row) for row in history] @app.get("/order/{order_id}") async def get_order(order_id: str): """ Retrieve details for a specific order from the database. """ async with async_session() as session: result = await session.execute( Order.__table__.select().where(Order.order_id == order_id) ) order = result.fetchone() if order: return dict(order) else: raise HTTPException(status_code=404, detail="Order not found.") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)