|
import os |
|
import json |
|
import requests |
|
import streamlit as st |
|
|
|
from google.oauth2 import service_account |
|
from json_repair import repair_json |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
|
import dto.user_story as us |
|
import dto.release_notes as rs |
|
import dto.requirement_gathering as rq |
|
import prompts as pt |
|
|
|
from langchain_community.llms import HuggingFaceEndpoint |
|
from langchain_groq import ChatGroq |
|
from langchain_cohere import ChatCohere |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
from langchain_google_vertexai import ChatVertexAI |
|
from langchain_openai import ChatOpenAI |
|
from langchain.prompts import PromptTemplate |
|
|
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
from langchain_community.document_loaders import WebBaseLoader |
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain.output_parsers import PydanticOutputParser |
|
|
|
|
|
if "lv_response" not in st.session_state: |
|
lv_response = None |
|
st.session_state.lv_response = lv_response |
|
else: |
|
lv_response = st.session_state.lv_response |
|
|
|
|
|
if "lv_extracted_text" not in st.session_state: |
|
lv_extracted_text = "" |
|
st.session_state.lv_extracted_text = lv_extracted_text |
|
else: |
|
lv_extracted_text = st.session_state.lv_extracted_text |
|
|
|
|
|
if "lv_model_session" not in st.session_state: |
|
st.session_state.lv_model_session = None |
|
|
|
|
|
def fn_display_user_messages(lv_extracted_text, lv_type, mv_processing_message): |
|
"""Display user Info, Error, Warning or Success Messages""" |
|
|
|
if lv_type == "Success": |
|
with mv_processing_message.container(): |
|
st.success(lv_extracted_text) |
|
elif lv_type == "Error": |
|
with mv_processing_message.container(): |
|
st.error(lv_extracted_text) |
|
elif lv_type == "Warning": |
|
with mv_processing_message.container(): |
|
st.warning(lv_extracted_text) |
|
else: |
|
with mv_processing_message.container(): |
|
st.info(lv_extracted_text) |
|
|
|
|
|
def fn_set_proxy(ui_proxy_url, ui_no_proxy_url): |
|
"""Configure http and https proxy programmatically""" |
|
|
|
os.environ['HTTP_PROXY'] = ui_proxy_url |
|
os.environ['HTTPS_PROXY'] = ui_proxy_url |
|
os.environ['NO_PROXY'] = ui_no_proxy_url |
|
|
|
print("=== Proxy SET ===") |
|
print("HTTP_PROXY:", os.environ.get('HTTP_PROXY')) |
|
print("HTTPS_PROXY:", os.environ.get('HTTPS_PROXY')) |
|
print("NO_PROXY:", os.environ.get('NO_PROXY')) |
|
print("=================") |
|
|
|
|
|
def fn_scrape_website(ui_grounding_url): |
|
"""Function to convert Website URL content into text""" |
|
|
|
lv_html_loader = WebBaseLoader(ui_grounding_url) |
|
lv_html = lv_html_loader.load() |
|
|
|
return lv_html |
|
|
|
|
|
def fn_scraper_pdf(ui_grounding_pdf): |
|
"""Function to convert PDF content into text""" |
|
|
|
|
|
lv_temp_file_path = os.path.join("pdf-data",ui_grounding_pdf.name) |
|
if not os.path.exists(lv_temp_file_path): |
|
with open(lv_temp_file_path,"wb") as lv_file: |
|
lv_file.write(ui_grounding_pdf.getbuffer()) |
|
|
|
|
|
lv_pdf_loader = PyPDFLoader(lv_temp_file_path) |
|
lv_pdf_content = lv_pdf_loader.load() |
|
|
|
return lv_pdf_content |
|
|
|
|
|
def fn_search_web(ui_search_web_input): |
|
"""Search internet for information""" |
|
|
|
lv_search_run = DuckDuckGoSearchRun() |
|
lv_result = lv_search_run.run(ui_search_web_input) |
|
|
|
return lv_result |
|
|
|
|
|
def fn_you_tube_video_transcript(ui_youtube_url,ui_processing_message): |
|
"""Extract YouTube Video Transcript""" |
|
|
|
fn_display_user_messages("Generating Youtube Transcript","Info", ui_processing_message) |
|
|
|
try: |
|
lv_youtube_transcript = YouTubeTranscriptApi.get_transcript(ui_youtube_url) |
|
lv_response = ' '.join([item['text'] for item in lv_youtube_transcript]) |
|
|
|
fn_display_user_messages("Successfully generated Youtube transcript","Success", ui_processing_message) |
|
|
|
return lv_response |
|
except Exception as error: |
|
print('Error Generating Youtube Transcript', error) |
|
fn_display_user_messages("Error Generating Youtube Transcript","Error", ui_processing_message) |
|
raise error |
|
|
|
|
|
def fn_unset_proxy(): |
|
"""Unset http and https proxy""" |
|
|
|
os.environ.pop('HTTP_PROXY', None) |
|
os.environ.pop('HTTPS_PROXY', None) |
|
os.environ.pop('NO_PROXY', None) |
|
|
|
print("=== Proxy UNSET ===") |
|
print("HTTP_PROXY:", os.environ.get('HTTP_PROXY')) |
|
print("HTTPS_PROXY:", os.environ.get('HTTPS_PROXY')) |
|
print("NO_PROXY:", os.environ.get('NO_PROXY')) |
|
print("===================") |
|
|
|
|
|
@st.cache_resource |
|
def fn_create_chatllm(ui_llm_provider, ui_api_key, ui_model_details): |
|
"""Create Chat LLM Instance""" |
|
|
|
lv_model = None |
|
|
|
try: |
|
if(ui_llm_provider == 'Huggingface'): |
|
lv_model = HuggingFaceEndpoint( |
|
repo_id=ui_model_details, |
|
temperature=1.0, |
|
huggingfacehub_api_token=ui_api_key |
|
) |
|
elif(ui_llm_provider == 'Groq'): |
|
lv_model = ChatGroq( |
|
temperature=1.0, |
|
model_name=ui_model_details |
|
) |
|
|
|
elif(ui_llm_provider == 'Cohere'): |
|
lv_model = ChatCohere( |
|
temperature=1.0, |
|
model=ui_model_details |
|
|
|
) |
|
elif(ui_llm_provider == 'Google'): |
|
lv_model = ChatGoogleGenerativeAI( |
|
temperature=1.0, |
|
model=ui_model_details, |
|
max_output_tokens=1000000 |
|
) |
|
elif(ui_llm_provider == 'OpenAI'): |
|
lv_model = ChatOpenAI( |
|
temperature=1.0, |
|
model=ui_model_details |
|
) |
|
elif(ui_llm_provider == 'Google VertexAI'): |
|
lv_api_key = json.loads(ui_api_key) |
|
|
|
with open('key.json', 'w') as f: |
|
json.dump(lv_api_key, f) |
|
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'key.json' |
|
|
|
g_creds = service_account.Credentials.from_service_account_info(lv_api_key) |
|
lv_model = ChatVertexAI( |
|
project=lv_api_key.get("project_id"), |
|
temperature=1.0, |
|
model=ui_model_details, |
|
credentials=g_creds |
|
) |
|
|
|
print("Returning new model") |
|
|
|
except Exception as e: |
|
print("Error Configuring Model"+str(e)) |
|
|
|
return lv_model |
|
|
|
|
|
@st.cache_resource |
|
def fn_generate_speech_to_text(ui_audio_bytes,ui_api_key): |
|
"""Generate Speech to Text""" |
|
lv_extracted_text = None |
|
|
|
try: |
|
lv_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" |
|
lv_headers = { |
|
'Authorization': "Bearer "+ui_api_key, |
|
'Content-Type': "audio/wav" |
|
} |
|
response = requests.request("POST", lv_url, data=ui_audio_bytes, headers=lv_headers) |
|
lv_extracted_text = response.json().get('text') |
|
|
|
print(lv_extracted_text) |
|
|
|
return lv_extracted_text |
|
except Exception as error: |
|
print('Error Generating Speech to Text', error) |
|
raise error |
|
|
|
|
|
def fn_chatllm_response(ui_llm_provider, lv_summarize_prompt_formatted, lv_model, ui_processing_message): |
|
"""Generate LLM response""" |
|
|
|
fn_display_user_messages("Generating LLM Response","Info", ui_processing_message) |
|
lv_response = None |
|
|
|
try: |
|
|
|
if(ui_llm_provider == 'Google VertexAI' or ui_llm_provider=='Google' or ui_llm_provider=='OpenAI' or ui_llm_provider=='Groq' or ui_llm_provider=='Cohere'): |
|
lv_response = lv_model.invoke(lv_summarize_prompt_formatted).content |
|
else: |
|
lv_response = lv_model.invoke(lv_summarize_prompt_formatted) |
|
|
|
lv_response = str(lv_response).replace("```json","") |
|
lv_response = lv_response.replace("```","") |
|
|
|
fn_display_user_messages("Generated LLM Response","Success", ui_processing_message) |
|
return lv_response |
|
except Exception as error: |
|
print('Error Generating LLM Response', error) |
|
fn_display_user_messages("Error Generating LLM Response","Error", ui_processing_message) |
|
|
|
raise error |
|
|
|
|
|
def fn_convert_user_story_json_to_markdown(lv_json): |
|
"""Convert User Story JSON to Markdown""" |
|
|
|
lv_markdown = "" |
|
try: |
|
|
|
lv_markdown = f"# {lv_json['title']}\n\n" |
|
lv_markdown += f"**Role:** {lv_json['role']}\n\n" |
|
lv_markdown += f"**Feature:** {lv_json['feature']}\n\n" |
|
lv_markdown += f"**Benefit:** {lv_json['benefit']}\n\n" |
|
lv_markdown += "## User Story Scenarios\n" |
|
|
|
for lv_scenario in lv_json['user_story_scenarios']: |
|
lv_markdown += f"### {lv_scenario['scenario_title']}\n\n" |
|
lv_markdown += f"**Pre-conditions:** {lv_scenario['pre_conditions']}\n\n" |
|
lv_markdown += f"**Action Details:** {lv_scenario['action_details']}\n\n" |
|
lv_markdown += f"**Expected Outcome:** {lv_scenario['expected_outcome']}\n\n" |
|
except Exception as e: |
|
print("UserStory - Error converting JSON to Markdown",str(e)) |
|
|
|
return lv_markdown |
|
|
|
|
|
def fn_convert_release_notes_json_to_markdown(lv_json): |
|
"""Convert Release Notes JSON to Markdown""" |
|
|
|
lv_markdown = "" |
|
try: |
|
|
|
lv_markdown = f"# Release Notes\n\n" |
|
lv_markdown += f"**Release Date:** {lv_json['release_date']}\n\n" |
|
lv_markdown += f"**Product Name:** {lv_json['product_name']}\n\n" |
|
lv_markdown += f"**Summary:** {lv_json['summary']}\n\n" |
|
lv_markdown += "## Enhancements\n" |
|
|
|
for lv_enhancement in lv_json['enhancements']: |
|
lv_markdown += f"### {lv_enhancement['title']}\n\n" |
|
lv_markdown += f"**Description:** {lv_enhancement['description']}\n\n" |
|
lv_markdown += f"**Benefits:** {lv_enhancement['benefits']}\n\n" |
|
lv_markdown += f"**Reason:** {lv_enhancement['reason']}\n\n" |
|
except Exception as e: |
|
print("ReleaseNotes - Error converting JSON to Markdown",str(e)) |
|
|
|
return lv_markdown |
|
|
|
|
|
def fn_convert_requirement_generation_json_to_markdown(lv_json): |
|
"""Convert Requirement Generation JSON to Markdown""" |
|
|
|
lv_markdown = "" |
|
try: |
|
|
|
lv_markdown = f"# {lv_json['header']}\n\n" |
|
lv_markdown += "## Requirements\n" |
|
|
|
for requirement in lv_json['requirements']: |
|
lv_markdown += f"### {requirement['overview']}\n\n" |
|
lv_markdown += f"**Description:** {requirement['description']}\n\n" |
|
lv_markdown += f"**Benefits:** {requirement['benefits']}\n\n" |
|
lv_markdown += f"**Reason:** {requirement['reason']}\n\n" |
|
lv_markdown += f"**Priority:** {requirement['priority']}\n\n" |
|
if requirement['tags']: |
|
tags = ', '.join(requirement['tags']) |
|
lv_markdown += f"**Tags:** {tags}\n\n" |
|
except Exception as e: |
|
print("Requirement Gathering - Error converting JSON to Markdown",str(e)) |
|
|
|
return lv_markdown |
|
|
|
|
|
def main(): |
|
|
|
|
|
st.set_page_config( |
|
page_title="OBMA AI Assist", |
|
page_icon="🧊", |
|
layout="wide", |
|
initial_sidebar_state="expanded" |
|
) |
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
ui_processing_message = col2.empty() |
|
ui_search_web_input =st.empty() |
|
if "lv_model_session" in st.session_state: |
|
lv_model = st.session_state.lv_model_session |
|
else: |
|
lv_model= None |
|
|
|
global lv_response |
|
global lv_extracted_text |
|
|
|
col2.text("") |
|
|
|
col2.header("OBMA - AI Assist") |
|
col2.text("") |
|
col2.text("") |
|
col2.text("") |
|
|
|
|
|
cn_llm_providers_lov_values = ['Huggingface','Groq','Cohere','Google','Google VertexAI','OpenAI'] |
|
cn_huggingface_models_lov_values = ['deepseek-ai/DeepSeek-R1-Distill-Qwen-32B','Qwen/Qwen2.5-72B-Instruct','meta-llama/Llama-3.3-70B-Instruct','CohereForAI/c4ai-command-r-plus-08-2024','nvidia/Llama-3.1-Nemotron-70B-Instruct-HF'] |
|
lv_user_actions = ["User Story","Release Notes","Requirement Generation","Summarization"] |
|
|
|
|
|
with st.sidebar: |
|
st.header("Configurations") |
|
st.text("") |
|
|
|
|
|
st.subheader("Output") |
|
ui_user_actions = st.multiselect( |
|
label='User Actions', |
|
options=lv_user_actions, |
|
default="User Story" |
|
) |
|
ui_show_json = st.toggle("Show JSON", value=False) |
|
st.text("") |
|
|
|
|
|
st.text("") |
|
try: |
|
st.subheader("LLM") |
|
ui_llm_provider = st.selectbox(label='LLM Provider',options=cn_llm_providers_lov_values) |
|
ui_api_key = st.empty() |
|
ui_model_details = st.empty() |
|
|
|
if ui_llm_provider: |
|
|
|
if ui_llm_provider == 'Huggingface': |
|
ui_api_key = st.text_input("HUGGINGFACEHUB_API_TOKEN",type="password") |
|
ui_model_details = st.selectbox("Model Details",options=cn_huggingface_models_lov_values) |
|
os.environ["HUGGINGFACEHUB_API_TOKEN"] = ui_api_key |
|
elif(ui_llm_provider == 'Groq'): |
|
ui_api_key = st.text_input("GROQ_API_KEY",type="password") |
|
ui_model_details = st.text_input("Model Details","mixtral-8x7b-32768") |
|
os.environ["GROQ_API_KEY"] = ui_api_key |
|
elif(ui_llm_provider == 'Cohere'): |
|
ui_api_key = st.text_input("COHERE_API_KEY", type="password") |
|
ui_model_details = st.text_input("Model Details","command-r-plus") |
|
os.environ["COHERE_API_KEY"] = ui_api_key |
|
elif(ui_llm_provider == 'Google'): |
|
ui_api_key = st.text_input("GOOGLE_API_KEY",type="password") |
|
ui_model_details = st.text_input("Model Details","gemini-1.5-pro-latest") |
|
os.environ["GOOGLE_API_KEY"] = ui_api_key |
|
elif(ui_llm_provider == 'Google VertexAI'): |
|
ui_api_key = st.text_area("GOOGLE_APPLICATION_CREDENTIALS") |
|
ui_model_details = st.text_input("Model Details","gemini-1.5-pro-preview-0409") |
|
elif(ui_llm_provider == 'OpenAI'): |
|
ui_api_key = st.text_input("OPENAI_API_KEY", type="password") |
|
ui_model_details = st.text_input("Model Details","gpt-4o") |
|
os.environ["OPENAI_API_KEY"] = ui_api_key |
|
else: |
|
st.error('Please configure LLM Details') |
|
fn_display_user_messages("Please configure LLM Details","Error", ui_processing_message) |
|
|
|
if st.button("Configure LLM"): |
|
|
|
if ui_llm_provider and ui_api_key and ui_model_details: |
|
print("Configuring LLM") |
|
lv_model = fn_create_chatllm(ui_llm_provider, ui_api_key, ui_model_details) |
|
st.session_state.lv_model_session = lv_model |
|
else: |
|
st.error('Please configure LLM Details') |
|
fn_display_user_messages("Please configure LLM Details","Error", ui_processing_message) |
|
except Exception as e: |
|
st.error('Error Configuring LLM Details'+str(e)) |
|
fn_display_user_messages("Error Configuring LLM Details","Error", ui_processing_message) |
|
|
|
|
|
st.text("") |
|
try: |
|
st.subheader("Knowledge Base") |
|
ui_grounding_url = st.text_input("Grounding URL") |
|
ui_youtube_url = st.text_input("Youtube Video ID") |
|
ui_grounding_pdf = st.file_uploader("Grounding PDF",type="pdf",accept_multiple_files=False) |
|
ui_grounding_wav = st.file_uploader("Grounding WAV",type="wav",accept_multiple_files=False) |
|
ui_search_web = st.checkbox("Search Web") |
|
if ui_search_web: |
|
ui_search_web_input = st.text_input("Search Details") |
|
|
|
col1, col2, col3 = st.columns([0.85,0.80,1.40]) |
|
|
|
if col1.button("Extract"): |
|
|
|
lv_extracted_text = "" |
|
st.session_state.lv_extracted_text = lv_extracted_text |
|
lv_response = "" |
|
st.session_state.lv_response = lv_response |
|
|
|
if ui_youtube_url: |
|
lv_extracted_text +=fn_you_tube_video_transcript(ui_youtube_url,ui_processing_message) |
|
|
|
if ui_grounding_url: |
|
lv_extracted_text += ' '.join(doc.page_content for doc in fn_scrape_website(ui_grounding_url)) |
|
|
|
if ui_grounding_pdf: |
|
lv_extracted_text += ' '.join(doc.page_content for doc in fn_scraper_pdf(ui_grounding_pdf)) |
|
|
|
if ui_search_web: |
|
if ui_search_web_input: |
|
lv_extracted_text += fn_search_web(ui_search_web_input) |
|
|
|
if ui_grounding_wav: |
|
lv_extracted_text += fn_generate_speech_to_text(ui_grounding_wav.getvalue(),ui_api_key) |
|
|
|
st.session_state.lv_extracted_text = lv_extracted_text |
|
|
|
if col2.button("Clear"): |
|
lv_extracted_text = "" |
|
st.session_state.lv_extracted_text = lv_extracted_text |
|
lv_response = "" |
|
st.session_state.lv_response = lv_response |
|
except Exception as e: |
|
st.error('Error extracting data - '+str(e)) |
|
fn_display_user_messages("Error extracting data","Error", ui_processing_message) |
|
|
|
|
|
user_story, release_notes, requirement_generation, summarization = st.tabs(lv_user_actions) |
|
|
|
with user_story: |
|
|
|
if ui_llm_provider and lv_extracted_text and not(lv_response) and "User Story" in ui_user_actions: |
|
|
|
lv_parser = PydanticOutputParser(pydantic_object=us.UserStory) |
|
|
|
|
|
lv_template = pt.CN_USER_STORY |
|
lv_summarize_prompt = PromptTemplate( |
|
template=lv_template, |
|
input_variables=["context"], |
|
partial_variables={"format_instructions": lv_parser.get_format_instructions()}, |
|
) |
|
lv_summarize_prompt_formatted = lv_summarize_prompt.format( |
|
context=lv_extracted_text |
|
) |
|
|
|
|
|
if lv_model: |
|
lv_response = fn_chatllm_response(ui_llm_provider, lv_summarize_prompt_formatted, lv_model, ui_processing_message) |
|
st.session_state.lv_response = lv_response |
|
|
|
|
|
if lv_response and "User Story" in ui_user_actions: |
|
lv_repaired = repair_json(lv_response, skip_json_loads=True) |
|
|
|
if ui_show_json: |
|
st.header("User Story") |
|
st.json(lv_repaired) |
|
else: |
|
lv_markdown = fn_convert_user_story_json_to_markdown(json.loads(lv_repaired)) |
|
st.markdown(lv_markdown) |
|
|
|
|
|
with release_notes: |
|
if ui_llm_provider and lv_extracted_text and not(lv_response) and "Release Notes" in ui_user_actions: |
|
|
|
lv_parser = PydanticOutputParser(pydantic_object=rs.ReleaseNotes) |
|
|
|
|
|
lv_template = pt.CN_RELEASE_NOTES |
|
lv_summarize_prompt = PromptTemplate( |
|
template=lv_template, |
|
input_variables=["context"], |
|
partial_variables={"format_instructions": lv_parser.get_format_instructions()}, |
|
) |
|
lv_summarize_prompt_formatted = lv_summarize_prompt.format( |
|
context=lv_extracted_text |
|
) |
|
|
|
|
|
if lv_model: |
|
lv_response = fn_chatllm_response(ui_llm_provider, lv_summarize_prompt_formatted, lv_model, ui_processing_message) |
|
st.session_state.lv_response = lv_response |
|
|
|
|
|
if lv_response and "Release Notes" in ui_user_actions: |
|
lv_repaired = repair_json(lv_response, skip_json_loads=True) |
|
if ui_show_json: |
|
st.header("Release Notes") |
|
st.json(lv_repaired) |
|
else: |
|
lv_markdown = fn_convert_release_notes_json_to_markdown(json.loads(lv_repaired)) |
|
st.markdown(lv_markdown) |
|
|
|
with requirement_generation: |
|
if ui_llm_provider and lv_extracted_text and not(lv_response) and "Requirement Generation" in ui_user_actions: |
|
|
|
lv_parser = PydanticOutputParser(pydantic_object=rq.RequirementGatheringDetails) |
|
|
|
|
|
lv_template = pt.CN_REQUIREMENT_GATHERING |
|
lv_summarize_prompt = PromptTemplate( |
|
template=lv_template, |
|
input_variables=["context"], |
|
partial_variables={"format_instructions": lv_parser.get_format_instructions()}, |
|
) |
|
lv_summarize_prompt_formatted = lv_summarize_prompt.format( |
|
context=lv_extracted_text |
|
) |
|
|
|
|
|
if lv_model: |
|
lv_response = fn_chatllm_response(ui_llm_provider, lv_summarize_prompt_formatted, lv_model, ui_processing_message) |
|
st.session_state.lv_response = lv_response |
|
|
|
|
|
if lv_response and "Requirement Generation" in ui_user_actions: |
|
lv_repaired = repair_json(lv_response, skip_json_loads=True) |
|
|
|
if ui_show_json: |
|
st.header("Requirement Generation") |
|
st.json(lv_repaired) |
|
else: |
|
lv_markdown = fn_convert_requirement_generation_json_to_markdown(json.loads(lv_repaired)) |
|
st.markdown(lv_markdown) |
|
|
|
with summarization: |
|
if ui_llm_provider and "Summarization" in ui_user_actions: |
|
st.header("Summarization") |
|
st.text("") |
|
st.text("") |
|
|
|
ui_summary_input = st.text_area("Input Text", value=lv_extracted_text) |
|
if st.button("Summarize",key="summary"): |
|
|
|
lv_template = pt.CN_SUMMARY |
|
lv_summarize_prompt = PromptTemplate( |
|
template=lv_template, |
|
input_variables=["context"] |
|
) |
|
lv_summarize_prompt_formatted = lv_summarize_prompt.format( |
|
context=ui_summary_input |
|
) |
|
|
|
|
|
if lv_model: |
|
lv_response = fn_chatllm_response(ui_llm_provider, lv_summarize_prompt_formatted, lv_model, ui_processing_message) |
|
st.session_state.lv_response = lv_response |
|
|
|
|
|
if lv_response: |
|
st.subheader("Summary") |
|
st.markdown(lv_response) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |