import gradio as gr import requests from bs4 import BeautifulSoup import pandas as pd import plotly.express as px import datetime import time import nltk from nltk.sentiment.vader import SentimentIntensityAnalyzer from textblob import TextBlob import ssl import certifi import sqlite3 # Configure SSL to use certifi certificates ssl._create_default_https_context = lambda: ssl.create_default_context(cafile=certifi.where()) # Download VADER lexicon try: nltk.download('vader_lexicon') print("✅ VADER lexicon downloaded successfully!") except Exception as e: print(f"❌ Error downloading VADER lexicon: {str(e)}") raise # Initialize VADER sentiment analyzer sia = SentimentIntensityAnalyzer() FINVIZ_URL = 'https://finviz.com/quote.ashx?t=' session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) def get_news(ticker): """ Fetch stock news from FinViz with error handling """ try: url = FINVIZ_URL + ticker for attempt in range(3): response = session.get(url) if response.status_code == 200: break time.sleep(5) if response.status_code != 200: return None, f"❌ Error: Received status code {response.status_code}" html = BeautifulSoup(response.text, "html.parser") news_table = html.find('table', class_='fullview-news-outer') if not news_table: return None, "❌ News table not found!" return news_table, None except Exception as e: return None, f"❌ Error fetching stock news: {str(e)}" def parse_news(news_table): """ Extracts and parses stock news headlines from FinViz """ parsed_news = [] today_string = datetime.datetime.today().strftime('%Y-%m-%d') for row in news_table.find_all('tr'): try: news_container = row.find('div', class_='news-link-container') link = news_container.find('a', class_='tab-link-news') if news_container else row.find('a') if not link or not link.get_text().strip(): continue text = link.get_text().strip() date_td = row.find('td', align='right') date_scrape = date_td.text.strip().split() if date_td and date_td.text.strip() else [] if len(date_scrape) == 1: date, time_ = today_string, date_scrape[0] else: date, time_ = datetime.datetime.strptime(date_scrape[0], '%b-%d-%y').strftime('%Y-%m-%d'), date_scrape[ 1] time_ = datetime.datetime.strptime(time_, '%I:%M%p').strftime('%H:%M') parsed_news.append([date, time_, text]) except Exception: continue df = pd.DataFrame(parsed_news, columns=['date', 'time', 'headline']) df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time'], format="%Y-%m-%d %H:%M", errors='coerce') return df.dropna(subset=['datetime']) def analyze_sentiment(text): """ Combines VADER and TextBlob for better sentiment accuracy """ vader_score = sia.polarity_scores(text)['compound'] textblob_score = TextBlob(text).sentiment.polarity # -1 to 1 range return (vader_score + textblob_score) / 2 # Averaging both def score_news(df): """ Applies sentiment analysis using both VADER and TextBlob """ df['sentiment_score'] = df['headline'].apply(analyze_sentiment) return df[['datetime', 'headline', 'sentiment_score']] def save_to_db(df, ticker): """ Stores sentiment analysis results in SQLite for historical tracking """ conn = sqlite3.connect("sentiment_data.db") df.to_sql(f"{ticker}_news", conn, if_exists="append", index=False) conn.close() def plot_sentiment(df, ticker, interval): """ Generates sentiment trend plots while ensuring correct date range. """ if df.empty: return None df['datetime'] = pd.to_datetime(df['datetime']) df = df.set_index('datetime') # ✅ Ensure the graph strictly uses only the filtered range min_date = df.index.min() max_date = df.index.max() print(f"📅 Graph Showing Data from {min_date} to {max_date}") # Debugging df_filtered = df.loc[min_date:max_date] if interval == 'h': df_grouped = df_filtered.resample('h').mean(numeric_only=True).dropna() elif interval == 'D': df_grouped = df_filtered.resample('D').mean(numeric_only=True).dropna() else: df_grouped = df_filtered df_grouped['rolling_avg'] = df_grouped['sentiment_score'].rolling(5, min_periods=1).mean() fig = px.line(df_grouped, x=df_grouped.index, y='sentiment_score', labels={"sentiment_score": "Sentiment Score"}, title=f"{ticker} Sentiment Trends ({interval.capitalize()})") fig.add_scatter(x=df_grouped.index, y=df_grouped['rolling_avg'], mode='lines', name='Rolling Avg') return fig def analyze_stock_sentiment(ticker, days): """ Fetches news, analyzes sentiment, and filters by user-selected date range. """ if not ticker: return "❌ Please enter a stock ticker!", None, None, None print(f"📅 Selected Days: {days}") # Debugging news_table, error = get_news(ticker.upper()) if error: return error, None, None, None df_news = parse_news(news_table) days = int(days) today = datetime.datetime.today() start_date = today - datetime.timedelta(days=days) print(f"🔍 Filtering News from {start_date} to {today}") # Debugging df_news['datetime'] = pd.to_datetime(df_news['datetime']) df_news = df_news[df_news['datetime'] >= start_date] print(f"📊 Filtered News Count: {len(df_news)}") # Debugging if df_news.empty: return f"⚠️ No news found for {ticker.upper()} in the last {days} days.", None, None, None df_scored = score_news(df_news).sort_values(by="datetime", ascending=False) save_to_db(df_scored, ticker.upper()) fig_hourly = plot_sentiment(df_scored, ticker, interval='h') fig_daily = plot_sentiment(df_scored, ticker, interval='D') return f"✅ Analysis for {ticker.upper()} (Last {days} Days) Complete!", df_scored, fig_hourly, fig_daily # ✅ Gradio UI iface = gr.Interface( fn=analyze_stock_sentiment, inputs=[gr.Textbox(label="Stock Ticker"), gr.Slider(1, 15, 7, label="Days of News History")], outputs=[gr.Textbox(label="Status"), gr.Dataframe(), gr.Plot(), gr.Plot()] ) # ✅ Keeps the App Running on Hugging Face if __name__ == "__main__": print("🚀 App is running on Hugging Face Spaces...") iface.launch(server_name="0.0.0.0", server_port=7860)