Spaces:
Sleeping
Sleeping
File size: 4,990 Bytes
660f424 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import gradio as gr
import whisper
import yt_dlp
import os
import traceback
from pydub import AudioSegment
from threading import Thread
from queue import Queue
# Global variable to store the selected model
selected_model = None
def load_whisper_model(model_name):
global selected_model
selected_model = whisper.load_model(model_name)
return f"Loaded {model_name} model"
def chunk_audio(audio_file, chunk_size_ms=30000):
audio = AudioSegment.from_file(audio_file)
chunks = [audio[i:i+chunk_size_ms] for i in range(0, len(audio), chunk_size_ms)]
return chunks
def stream_transcription(audio_file):
segment_queue = Queue()
def transcribe_worker():
try:
chunks = chunk_audio(audio_file)
for i, chunk in enumerate(chunks):
chunk_file = f"temp_chunk_{i}.wav"
chunk.export(chunk_file, format="wav")
result = selected_model.transcribe(chunk_file)
os.remove(chunk_file)
for segment in result['segments']:
segment_text = f"[{segment['start'] + i*30:.2f}s -> {segment['end'] + i*30:.2f}s] {segment['text']}\n"
segment_queue.put(segment_text)
segment_queue.put(None) # Signal end of transcription
except Exception as e:
segment_queue.put(f"Error: {str(e)}")
segment_queue.put(None)
Thread(target=transcribe_worker).start()
full_transcript = ""
while True:
segment_text = segment_queue.get()
if segment_text is None:
break
if segment_text.startswith("Error"):
yield segment_text
break
full_transcript += segment_text
yield full_transcript
def download_youtube_audio(youtube_url):
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': 'temp_audio.%(ext)s',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
return "temp_audio.mp3"
def process_input(model, input_type, youtube_url=None, audio_file=None):
try:
yield "Loading Whisper model..."
load_whisper_model(model)
yield f"Loaded {model} model. "
if input_type == "YouTube URL":
if youtube_url:
yield "Downloading audio from YouTube..."
audio_file = download_youtube_audio(youtube_url)
yield "Download complete. Starting transcription...\n"
else:
yield "Please provide a valid YouTube URL."
return
elif input_type == "Audio File":
if not audio_file:
yield "Please upload an audio file."
return
else:
yield "Starting transcription...\n"
yield from stream_transcription(audio_file)
except Exception as e:
error_msg = f"An error occurred: {str(e)}\n"
error_msg += traceback.format_exc()
print(error_msg)
yield f"Error: {str(e)}"
finally:
if input_type == "YouTube URL" and audio_file:
os.remove(audio_file)
# Define the Gradio interface
with gr.Blocks() as iface:
gr.Markdown("# Whisper Transcription App")
gr.Markdown("Transcribe YouTube videos or audio files using OpenAI's Whisper model. Large files and long videos can take a very long time to process.")
with gr.Row():
with gr.Column():
model = gr.Radio(
choices=["tiny", "base", "small", "medium", "large"],
label="Whisper Model",
value="base"
)
gr.Markdown("""
- tiny: very fast, less accurate
- base: medium speed and accuracy
- small: balanced speed and accuracy
- medium: more accurate, slower
- large: most accurate, very slow
""")
input_type = gr.Radio(["YouTube URL", "Audio File"], label="Input Type")
youtube_url = gr.Textbox(label="YouTube URL")
audio_file = gr.Audio(label="Audio File", type="filepath")
with gr.Row():
submit_button = gr.Button("Submit")
clear_button = gr.Button("Clear")
with gr.Column():
output = gr.Textbox(label="Transcription", lines=25)
submit_button.click(
fn=process_input,
inputs=[model, input_type, youtube_url, audio_file],
outputs=output,
api_name="transcribe"
)
def clear_outputs():
return {youtube_url: "", audio_file: None, output: ""}
clear_button.click(
fn=clear_outputs,
inputs=[],
outputs=[youtube_url, audio_file, output],
api_name="clear"
)
# Launch the interface
iface.queue().launch(share=True)
|