import gradio as gr import yfinance as yf import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error, r2_score import mplfinance as mpf import matplotlib.pyplot as plt def get_stock_data(symbol, timeframe): """Fetches stock data from Yahoo Finance.""" ticker = yf.Ticker(symbol) # Calculate period based on timeframe if timeframe in ['1m', '5m', '15m', '30m']: period = "1d" elif timeframe in ['1h']: period = "5d" else: period = "60d" data = ticker.history(period=period, interval=timeframe) if data.empty: raise ValueError(f"No data found for symbol '{symbol}' with timeframe '{timeframe}'.") return data def calculate_indicators(data): """Calculates technical indicators.""" data['SMA20'] = data['Close'].rolling(window=20).mean() # Simple Moving Average (20 days) data['EMA20'] = data['Close'].ewm(span=20, adjust=False).mean() # Exponential Moving Average (20 days) data['RSI'] = calculate_rsi(data['Close']) # Relative Strength Index data['MACD'], data['MACD_Signal'], _ = calculate_macd(data['Close']) # Moving Average Convergence Divergence data['Stochastic_K'], data['Stochastic_D'] = calculate_stochastic(data['High'], data['Low'], data['Close']) # Stochastic Oscillator return data def calculate_rsi(close_prices, period=14): """Calculates the Relative Strength Index (RSI).""" delta = close_prices.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi def calculate_macd(close_prices, fast_period=12, slow_period=26, signal_period=9): """Calculates the Moving Average Convergence Divergence (MACD).""" fast_ema = close_prices.ewm(span=fast_period, adjust=False).mean() slow_ema = close_prices.ewm(span=slow_period, adjust=False).mean() macd = fast_ema - slow_ema macd_signal = macd.ewm(span=signal_period, adjust=False).mean() macd_histogram = macd - macd_signal return macd, macd_signal, macd_histogram def calculate_stochastic(high_prices, low_prices, close_prices, period=14): """Calculates the Stochastic Oscillator.""" lowest_low = low_prices.rolling(window=period).min() highest_high = high_prices.rolling(window=period).max() k = ((close_prices - lowest_low) / (highest_high - lowest_low)) * 100 d = k.rolling(window=3).mean() return k, d def predict_next_day(symbol, timeframe): """Predicts the next day's closing price.""" data = get_stock_data(symbol, timeframe) data = calculate_indicators(data) # Prepare data for training data = data.dropna() X = data[['SMA20', 'EMA20', 'RSI', 'MACD', 'MACD_Signal', 'Stochastic_K', 'Stochastic_D']] y = data['Close'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Scale the data scaler = MinMaxScaler() # Create a linear regression model model = LinearRegression() model.fit(X_train, y_train) # Predict next day's closing price last_data_point = data.iloc[-1] last_data_point = last_data_point[['SMA20', 'EMA20', 'RSI', 'MACD', 'MACD_Signal', 'Stochastic_K', 'Stochastic_D']] predicted_price = model.predict([last_data_point.values])[0] # Calculate model evaluation metrics y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) rmse = np.sqrt(mse) r2 = r2_score(y_test, y_pred) print(f"Mean Squared Error: {mse:.2f}") print(f"Root Mean Squared Error: {rmse:.2f}") print(f"R-squared: {r2:.2f}") return predicted_price def plot_candlestick(data, symbol, timeframe, predicted_price=None): """Plots the candlestick chart with technical indicators.""" if data.empty: raise ValueError("No valid data to plot. Please check your inputs.") fig, ax = plt.subplots(figsize=(12, 6)) mpf.plot(data, type='candle', style='charles', ax=ax, volume=True, show_nontrading=True) # Add moving averages ax.plot(data.index, data['SMA20'], label='SMA20', color='blue', alpha=0.7) ax.plot(data.index, data['EMA20'], label='EMA20', color='red', alpha=0.7) # Add prediction if predicted_price is not None: last_timestamp = data.index[-1] + pd.Timedelta(timeframe) ax.scatter(last_timestamp, predicted_price, color='green', marker='*', s=100, label='Prediction') ax.legend() ax.set_title(f"{symbol} - {timeframe}") ax.tick_params(axis='x', rotation=45) fig.tight_layout() return fig def main(): """Gradio Interface.""" symbol_input = gr.Textbox("AAPL", label="Symbol", interactive=True) # Moved "AAPL" to the correct position timeframe_input = gr.Dropdown(label="Timeframe", choices=["1m", "5m", "15m", "30m", "1h", "1d"], value="1d") with gr.Blocks() as interface: gr.Markdown("## Real-time Stock Market Analysis") with gr.Row(): symbol_input = gr.Textbox("AAPL", label="Symbol", interactive=True) timeframe_input = gr.Dropdown(label="Timeframe", choices=["1m", "5m", "15m", "30m", "1h", "1d"], value="1d", interactive=True) with gr.Row(): predict_button = gr.Button(value="Predict") predicted_price = gr.Textbox(label="Predicted Price") with gr.Row(): output_plot = gr.Plot(label="Candlestick Chart") predict_button.click(fn=predict_next_day, inputs=[symbol_input, timeframe_input], outputs=predicted_price) predicted_price.change(fn=plot_candlestick, inputs=[symbol_input, timeframe_input, predicted_price], outputs=output_plot) interface.launch() if __name__ == "__main__": main()