|
import gradio as gr |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import pandas as pd |
|
import plotly.express as px |
|
import datetime |
|
import time |
|
import nltk |
|
from nltk.sentiment.vader import SentimentIntensityAnalyzer |
|
from textblob import TextBlob |
|
import ssl |
|
import certifi |
|
import sqlite3 |
|
|
|
|
|
ssl._create_default_https_context = lambda: ssl.create_default_context(cafile=certifi.where()) |
|
|
|
|
|
try: |
|
nltk.download('vader_lexicon') |
|
print("β
VADER lexicon downloaded successfully!") |
|
except Exception as e: |
|
print(f"β Error downloading VADER lexicon: {str(e)}") |
|
raise |
|
|
|
|
|
sia = SentimentIntensityAnalyzer() |
|
|
|
FINVIZ_URL = 'https://finviz.com/quote.ashx?t=' |
|
session = requests.Session() |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
|
|
|
|
def get_news(ticker): |
|
""" Fetch stock news from FinViz with error handling """ |
|
try: |
|
url = FINVIZ_URL + ticker |
|
for attempt in range(3): |
|
response = session.get(url) |
|
if response.status_code == 200: |
|
break |
|
time.sleep(5) |
|
|
|
if response.status_code != 200: |
|
return None, f"β Error: Received status code {response.status_code}" |
|
|
|
html = BeautifulSoup(response.text, "html.parser") |
|
news_table = html.find('table', class_='fullview-news-outer') |
|
|
|
if not news_table: |
|
return None, "β News table not found!" |
|
|
|
return news_table, None |
|
except Exception as e: |
|
return None, f"β Error fetching stock news: {str(e)}" |
|
|
|
|
|
def parse_news(news_table): |
|
""" Extracts and parses stock news headlines from FinViz """ |
|
parsed_news = [] |
|
today_string = datetime.datetime.today().strftime('%Y-%m-%d') |
|
|
|
for row in news_table.find_all('tr'): |
|
try: |
|
news_container = row.find('div', class_='news-link-container') |
|
link = news_container.find('a', class_='tab-link-news') if news_container else row.find('a') |
|
|
|
if not link or not link.get_text().strip(): |
|
continue |
|
text = link.get_text().strip() |
|
|
|
date_td = row.find('td', align='right') |
|
date_scrape = date_td.text.strip().split() if date_td and date_td.text.strip() else [] |
|
if len(date_scrape) == 1: |
|
date, time_ = today_string, date_scrape[0] |
|
else: |
|
date, time_ = datetime.datetime.strptime(date_scrape[0], '%b-%d-%y').strftime('%Y-%m-%d'), date_scrape[ |
|
1] |
|
|
|
time_ = datetime.datetime.strptime(time_, '%I:%M%p').strftime('%H:%M') |
|
parsed_news.append([date, time_, text]) |
|
except Exception: |
|
continue |
|
|
|
df = pd.DataFrame(parsed_news, columns=['date', 'time', 'headline']) |
|
df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time'], format="%Y-%m-%d %H:%M", errors='coerce') |
|
return df.dropna(subset=['datetime']) |
|
|
|
|
|
def analyze_sentiment(text): |
|
""" Combines VADER and TextBlob for better sentiment accuracy """ |
|
vader_score = sia.polarity_scores(text)['compound'] |
|
textblob_score = TextBlob(text).sentiment.polarity |
|
return (vader_score + textblob_score) / 2 |
|
|
|
|
|
def score_news(df): |
|
""" Applies sentiment analysis using both VADER and TextBlob """ |
|
df['sentiment_score'] = df['headline'].apply(analyze_sentiment) |
|
return df[['datetime', 'headline', 'sentiment_score']] |
|
|
|
|
|
def save_to_db(df, ticker): |
|
""" Stores sentiment analysis results in SQLite for historical tracking """ |
|
conn = sqlite3.connect("sentiment_data.db") |
|
df.to_sql(f"{ticker}_news", conn, if_exists="append", index=False) |
|
conn.close() |
|
|
|
|
|
def plot_sentiment(df, ticker, interval): |
|
""" Generates sentiment trend plots while ensuring correct date range. """ |
|
if df.empty: |
|
return None |
|
|
|
df['datetime'] = pd.to_datetime(df['datetime']) |
|
df = df.set_index('datetime') |
|
|
|
|
|
min_date = df.index.min() |
|
max_date = df.index.max() |
|
print(f"π
Graph Showing Data from {min_date} to {max_date}") |
|
|
|
df_filtered = df.loc[min_date:max_date] |
|
|
|
if interval == 'h': |
|
df_grouped = df_filtered.resample('h').mean(numeric_only=True).dropna() |
|
elif interval == 'D': |
|
df_grouped = df_filtered.resample('D').mean(numeric_only=True).dropna() |
|
else: |
|
df_grouped = df_filtered |
|
|
|
df_grouped['rolling_avg'] = df_grouped['sentiment_score'].rolling(5, min_periods=1).mean() |
|
|
|
fig = px.line(df_grouped, x=df_grouped.index, y='sentiment_score', |
|
labels={"sentiment_score": "Sentiment Score"}, |
|
title=f"{ticker} Sentiment Trends ({interval.capitalize()})") |
|
|
|
fig.add_scatter(x=df_grouped.index, y=df_grouped['rolling_avg'], mode='lines', name='Rolling Avg') |
|
|
|
return fig |
|
|
|
|
|
def analyze_stock_sentiment(ticker, days): |
|
""" Fetches news, analyzes sentiment, and filters by user-selected date range. """ |
|
if not ticker: |
|
return "β Please enter a stock ticker!", None, None, None |
|
|
|
print(f"π
Selected Days: {days}") |
|
|
|
news_table, error = get_news(ticker.upper()) |
|
if error: |
|
return error, None, None, None |
|
|
|
df_news = parse_news(news_table) |
|
|
|
days = int(days) |
|
today = datetime.datetime.today() |
|
start_date = today - datetime.timedelta(days=days) |
|
|
|
print(f"π Filtering News from {start_date} to {today}") |
|
|
|
df_news['datetime'] = pd.to_datetime(df_news['datetime']) |
|
df_news = df_news[df_news['datetime'] >= start_date] |
|
|
|
print(f"π Filtered News Count: {len(df_news)}") |
|
|
|
if df_news.empty: |
|
return f"β οΈ No news found for {ticker.upper()} in the last {days} days.", None, None, None |
|
|
|
df_scored = score_news(df_news).sort_values(by="datetime", ascending=False) |
|
save_to_db(df_scored, ticker.upper()) |
|
|
|
fig_hourly = plot_sentiment(df_scored, ticker, interval='h') |
|
fig_daily = plot_sentiment(df_scored, ticker, interval='D') |
|
|
|
return f"β
Analysis for {ticker.upper()} (Last {days} Days) Complete!", df_scored, fig_hourly, fig_daily |
|
|
|
|
|
|
|
iface = gr.Interface( |
|
fn=analyze_stock_sentiment, |
|
inputs=[gr.Textbox(label="Stock Ticker"), gr.Slider(1, 15, 7, label="Days of News History")], |
|
outputs=[gr.Textbox(label="Status"), gr.Dataframe(), gr.Plot(), gr.Plot()] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
print("π App is running on Hugging Face Spaces...") |
|
iface.launch(server_name="0.0.0.0", server_port=7860) |
|
|