# chatbot_api.py
import os
import time
import requests
import base64
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, UploadFile, File, Form
from fastapi.responses import JSONResponse, StreamingResponse
import openai
# For sentiment analysis using TextBlob
from textblob import TextBlob
# SQLAlchemy Imports (Async)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Text, Float
# --- Configuration & Environment Variables ---
SPOONACULAR_API_KEY = os.getenv("SPOONACULAR_API_KEY", "815bf76e0764456293f0e96e080e8f60")
PAYSTACK_SECRET_KEY = os.getenv("PAYSTACK_SECRET_KEY", "pk_test_3222fb257041f1f2fd5ef33eafd19e1db4bdb634")
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres.lgbnxplydqdymepehirg:YourPassword@aws-0-eu-central-1.pooler.supabase.com:5432/postgres")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "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")
openai.api_key = os.getenv("OPENAI_API_KEY", "your_openai_api_key")
# --- Database Setup ---
Base = declarative_base()
class ChatHistory(Base):
__tablename__ = "chat_history"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
direction = Column(String) # 'inbound' or 'outbound'
message = Column(Text)
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
order_id = Column(String, unique=True, index=True)
user_id = Column(String, index=True)
dish = Column(String)
quantity = Column(String)
price = Column(String, default="0") # Price as string (or numeric type)
status = Column(String, default="Pending Payment") # e.g., Pending Payment, Paid, Completed
payment_reference = Column(String, nullable=True)
timestamp = Column(DateTime, default=datetime.utcnow)
class UserProfile(Base):
__tablename__ = "user_profiles"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, unique=True, index=True)
phone_number = Column(String, unique=True, index=True, nullable=True) # New field for phone numbers
name = Column(String, default="Valued Customer")
email = Column(String, default="unknown@example.com")
preferences = Column(Text, default="") # e.g., favorite dishes, dietary restrictions
last_interaction = Column(DateTime, default=datetime.utcnow)
class SentimentLog(Base):
__tablename__ = "sentiment_logs"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(String, index=True)
timestamp = Column(DateTime, default=datetime.utcnow)
sentiment_score = Column(Float)
message = Column(Text)
# Create the asynchronous engine.
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# --- Global In-Memory Stores ---
user_state = {} # For active conversation flows: { user_id: { "flow": str, "step": int, "data": dict, "last_active": datetime } }
conversation_context = {} # Optionally store extended conversation history (in-memory for demo)
proactive_timer = {} # Track last interaction times to send proactive greetings
# Local menu with nutritional details
menu_items = [
{"name": "Jollof Rice", "description": "A spicy and flavorful rice dish", "price": 1500, "nutrition": "Calories: 300 kcal, Carbs: 50g, Protein: 10g, Fat: 5g"},
{"name": "Fried Rice", "description": "A savory rice dish with vegetables and meat", "price": 1200, "nutrition": "Calories: 350 kcal, Carbs: 55g, Protein: 12g, Fat: 8g"},
{"name": "Chicken Wings", "description": "Crispy fried chicken wings", "price": 2000, "nutrition": "Calories: 400 kcal, Carbs: 20g, Protein: 25g, Fat: 15g"},
{"name": "Egusi Soup", "description": "A rich and hearty soup made with melon seeds", "price": 1000, "nutrition": "Calories: 250 kcal, Carbs: 15g, Protein: 8g, Fat: 10g"}
]
# --- Utility Functions ---
async def log_chat_to_db(user_id: str, direction: str, message: str):
"""Store chat messages into the database asynchronously."""
async with async_session() as session:
entry = ChatHistory(user_id=user_id, direction=direction, message=message)
session.add(entry)
await session.commit()
async def log_sentiment(user_id: str, message: str, score: float):
"""Store sentiment analysis results in the database."""
async with async_session() as session:
entry = SentimentLog(user_id=user_id, sentiment_score=score, message=message)
session.add(entry)
await session.commit()
def analyze_sentiment(text: str) -> float:
"""Analyze text sentiment using TextBlob. Returns polarity between -1 and 1."""
blob = TextBlob(text)
return blob.sentiment.polarity
def google_image_scrape(query: str) -> str:
"""Scrape Google Images using BeautifulSoup to get an image URL."""
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
search_url = f"https://www.google.com/search?tbm=isch&q={query}"
try:
response = requests.get(search_url, headers=headers, timeout=5)
except Exception:
return ""
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
img_tags = soup.find_all("img")
for img in img_tags:
src = img.get("src")
if src and src.startswith("http"):
return src
return ""
def create_paystack_payment_link(email: str, amount: int, reference: str) -> dict:
"""Initialize a transaction via Paystack."""
url = "https://api.paystack.co/transaction/initialize"
headers = {
"Authorization": f"Bearer {PAYSTACK_SECRET_KEY}",
"Content-Type": "application/json",
}
data = {
"email": email,
"amount": amount,
"reference": reference,
"callback_url": "https://yourdomain.com/payment_callback" # Replace with your callback URL.
}
try:
response = requests.post(url, json=data, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
return {"status": False, "message": "Failed to initialize payment."}
except Exception as e:
return {"status": False, "message": str(e)}
# --- NVIDIA LLM Streaming Functions ---
def stream_text_completion(prompt: str):
"""
Stream text completion using NVIDIA's text-only model.
Uses the OpenAI client interface pointed to NVIDIA's endpoint.
"""
from openai import OpenAI # Using OpenAI client library
client = OpenAI(
base_url="https://integrate.api.nvidia.com/v1",
api_key=NVIDIA_API_KEY
)
completion = client.chat.completions.create(
model="meta/llama-3.1-405b-instruct",
messages=[{"role": "user", "content": prompt}],
temperature=0.2,
top_p=0.7,
max_tokens=1024,
stream=True
)
for chunk in completion:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
def stream_image_completion(image_b64: str):
"""
Stream image-based query using NVIDIA's vision model.
The image (in base64) is embedded in an HTML
tag.
"""
invoke_url = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions"
headers = {
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Accept": "text/event-stream"
}
payload = {
"model": "meta/llama-3.2-90b-vision-instruct",
"messages": [
{
"role": "user",
"content": f'What is in this image?
'
}
],
"max_tokens": 512,
"temperature": 1.00,
"top_p": 1.00,
"stream": True
}
response = requests.post(invoke_url, headers=headers, json=payload, stream=True)
for line in response.iter_lines():
if line:
yield line.decode("utf-8") + "\n"
# --- Advanced Internal Flow: Order Processing & Payment Integration ---
def process_order_flow(user_id: str, message: str) -> str:
"""
A multi-step order process:
- Step 1: Ask for dish.
- Step 2: Ask for quantity.
After details are collected, save order and generate payment link.
"""
if user_id in user_state:
state = user_state[user_id]
flow = state.get("flow")
step = state.get("step")
data = state.get("data", {})
if flow == "order":
if step == 1:
data["dish"] = message.title()
state["step"] = 2
return f"You selected {data['dish']}. How many servings would you like?"
elif step == 2:
data["quantity"] = message
order_id = f"ORD-{int(time.time())}"
data["order_id"] = order_id
price_per_serving = 1500 # ₦1500 per serving
total_price = int(data["quantity"]) * price_per_serving
data["price"] = str(total_price)
# Save order asynchronously
import asyncio
async def save_order():
async with async_session() as session:
order = Order(
order_id=order_id,
user_id=user_id,
dish=data["dish"],
quantity=data["quantity"],
price=str(total_price),
status="Pending Payment"
)
session.add(order)
await session.commit()
asyncio.create_task(save_order())
# Clear conversation state for order flow.
del user_state[user_id]
# Retrieve email from user profile if available (here using a placeholder)
email = "customer@example.com"
payment_data = create_paystack_payment_link(email, total_price * 100, order_id)
if payment_data.get("status"):
payment_link = payment_data["data"]["authorization_url"]
return (f"Thank you for your order of {data['quantity']} serving(s) of {data['dish']}! "
f"Your Order ID is {order_id}.\nPlease complete payment here: {payment_link}")
else:
return f"Your order has been placed with Order ID {order_id}, but we could not initialize payment. Please try again later."
else:
if "order" in message.lower():
user_state[user_id] = {"flow": "order", "step": 1, "data": {}, "last_active": datetime.utcnow()}
return "Sure! What dish would you like to order?"
return ""
# --- User Profile Functions ---
async def get_or_create_user_profile(user_id: str, phone_number: str = None) -> UserProfile:
"""Retrieve an existing profile or create a new one with user_id and phone_number."""
async with async_session() as session:
result = await session.execute(
UserProfile.__table__.select().where(UserProfile.user_id == user_id)
)
profile = result.scalar_one_or_none()
if profile is None:
profile = UserProfile(
user_id=user_id,
phone_number=phone_number,
last_interaction=datetime.utcnow()
)
session.add(profile)
await session.commit()
return profile
async def update_user_last_interaction(user_id: str):
"""Update the user's last interaction timestamp."""
async with async_session() as session:
result = await session.execute(
UserProfile.__table__.select().where(UserProfile.user_id == user_id)
)
profile = result.scalar_one_or_none()
if profile:
profile.last_interaction = datetime.utcnow()
await session.commit()
# --- Proactive Engagement: Warm Greetings ---
async def send_proactive_greeting(user_id: str):
"""Simulate sending a proactive greeting if the user has been inactive."""
greeting = "Hi again! We miss you. Would you like to see our new menu items or get personalized recommendations?"
await log_chat_to_db(user_id, "outbound", greeting)
return greeting
# --- FastAPI Setup & Endpoints ---
app = FastAPI()
@app.on_event("startup")
async def on_startup():
await init_db()
# --- Chatbot Endpoint ---
@app.post("/chatbot")
async def chatbot_response(request: Request, background_tasks: BackgroundTasks):
"""
Main chatbot endpoint.
Supports text queries, image queries, and advanced logic.
Expects JSON payload with:
- 'user_id'
- 'phone_number'
- 'message'
- Optionally, 'is_image': true and 'image_base64'
"""
data = await request.json()
user_id = data.get("user_id")
phone_number = data.get("phone_number") # New field for phone number
user_message = data.get("message", "").strip()
is_image = data.get("is_image", False)
image_b64 = data.get("image_base64", None)
if not user_id:
raise HTTPException(status_code=400, detail="Missing user_id in payload.")
# Log inbound message
background_tasks.add_task(log_chat_to_db, user_id, "inbound", user_message)
# Update user last interaction and create profile with phone number
await update_user_last_interaction(user_id)
await get_or_create_user_profile(user_id, phone_number)
# Handle image queries
if is_image and image_b64:
if len(image_b64) >= 180_000:
raise HTTPException(status_code=400, detail="Image too large.")
return StreamingResponse(stream_image_completion(image_b64), media_type="text/plain")
# --- Advanced Textual Processing ---
# Analyze sentiment and log it
sentiment_score = analyze_sentiment(user_message)
background_tasks.add_task(log_sentiment, user_id, user_message, sentiment_score)
# Adjust response tone based on sentiment
sentiment_modifier = ""
if sentiment_score < -0.3:
sentiment_modifier = "I'm sorry if you're having a tough time. "
elif sentiment_score > 0.3:
sentiment_modifier = "Great to hear from you! "
# Check for specialized commands:
if "menu" in user_message.lower():
# Return menu with images
menu_with_images = []
for item in menu_items:
image_url = google_image_scrape(item["name"])
menu_with_images.append({
"name": item["name"],
"description": item["description"],
"price": item["price"],
"image_url": image_url
})
response_payload = {
"response": sentiment_modifier + "Here’s our delicious menu:",
"menu": menu_with_images,
"follow_up": "Would you like nutritional facts for any dish? Just type, for example, 'Nutritional facts for Jollof Rice'."
}
background_tasks.add_task(log_chat_to_db, user_id, "outbound", str(response_payload))
return JSONResponse(content=response_payload)
if "nutritional facts for" in user_message.lower():
dish_name = user_message.lower().replace("nutritional facts for", "").strip().title()
dish = next((item for item in menu_items if item["name"].lower() == dish_name.lower()), None)
if dish:
response_text = f"Nutritional facts for {dish['name']}:\n{dish['nutrition']}"
else:
response_text = f"Sorry, I couldn't find nutritional facts for {dish_name}."
background_tasks.add_task(log_chat_to_db, user_id, "outbound", response_text)
return JSONResponse(content={"response": sentiment_modifier + response_text})
# Check if this is an order flow request
order_response = process_order_flow(user_id, user_message)
if order_response:
background_tasks.add_task(log_chat_to_db, user_id, "outbound", order_response)
return JSONResponse(content={"response": sentiment_modifier + order_response})
# For context-aware conversation: store conversation context
conversation_context.setdefault(user_id, []).append({"timestamp": datetime.utcnow().isoformat(), "message": user_message})
# Fallback: use NVIDIA text LLM streaming for a response
prompt = f"User query: {user_message}\nGenerate a helpful, personalized response for a restaurant chatbot."
def stream_response():
for chunk in stream_text_completion(prompt):
yield chunk
background_tasks.add_task(log_chat_to_db, user_id, "outbound", f"LLM fallback response for prompt: {prompt}")
return StreamingResponse(stream_response(), media_type="text/plain")
# --- Chat History Endpoint ---
@app.get("/chat_history/{user_id}")
async def get_chat_history(user_id: str):
"""
Retrieve chat history for a user.
"""
async with async_session() as session:
result = await session.execute(
ChatHistory.__table__.select().where(ChatHistory.user_id == user_id)
)
history = result.fetchall()
return [dict(row) for row in history]
# --- Order Details Endpoint ---
@app.get("/order/{order_id}")
async def get_order(order_id: str):
"""
Retrieve details for a specific order.
"""
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.fetchone()
if order:
return dict(order)
else:
raise HTTPException(status_code=404, detail="Order not found.")
# --- User Profile Endpoint ---
@app.get("/user_profile/{user_id}")
async def get_user_profile(user_id: str):
"""
Retrieve the user profile.
"""
profile = await get_or_create_user_profile(user_id)
return {
"user_id": profile.user_id,
"phone_number": profile.phone_number,
"name": profile.name,
"email": profile.email,
"preferences": profile.preferences,
"last_interaction": profile.last_interaction.isoformat()
}
# --- Analytics Endpoint ---
@app.get("/analytics")
async def get_analytics():
"""
Simple analytics dashboard endpoint.
Returns counts of messages, orders, and average sentiment.
"""
async with async_session() as session:
# Total messages count
msg_result = await session.execute(ChatHistory.__table__.count())
total_messages = msg_result.scalar() or 0
# Total orders count
order_result = await session.execute(Order.__table__.count())
total_orders = order_result.scalar() or 0
# Average sentiment score
sentiment_result = await session.execute("SELECT AVG(sentiment_score) FROM sentiment_logs")
avg_sentiment = sentiment_result.scalar() or 0
return {
"total_messages": total_messages,
"total_orders": total_orders,
"average_sentiment": avg_sentiment
}
# --- Voice Integration Endpoint ---
@app.post("/voice")
async def process_voice(file: UploadFile = File(...)):
"""
Accept a voice file upload, perform speech-to-text (simulated), and process the resulting text.
In production, integrate with a real STT service.
"""
contents = await file.read()
# Simulated Speech-to-Text: In real implementation, send `contents` to an STT service.
simulated_text = "Simulated speech-to-text conversion result."
return {"transcription": simulated_text}
# --- Payment Callback Endpoint (Stub) ---
@app.post("/payment_callback")
async def payment_callback(request: Request):
"""
Endpoint to handle payment callbacks from Paystack.
Update order status based on callback data.
"""
data = await request.json()
# Extract order reference and update order status accordingly.
# In production, verify callback signature and extract data.
order_id = data.get("reference")
new_status = data.get("status", "Paid")
async with async_session() as session:
result = await session.execute(
Order.__table__.select().where(Order.order_id == order_id)
)
order = result.scalar_one_or_none()
if order:
order.status = new_status
await session.commit()
return JSONResponse(content={"message": "Order updated successfully."})
else:
raise HTTPException(status_code=404, detail="Order not found.")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)