From 5566d656a2ffd102fc257077058833bc9d424966 Mon Sep 17 00:00:00 2001 From: kouloumos Date: Thu, 7 Dec 2023 20:09:23 +0200 Subject: [PATCH] cli: finalize the `postprocess` command - support postprocessing for both whisper and deepgram How this works: - When a source is added for transcription it's preprocessed by default. Preprocessing outputs a JSON file with all the available metadata. - At the transcription stage, the output generated by the tranascription is also stored as JSON and also referenced in the initial metadata JSON - So having both JSON files available, the user can manually run the postprocess stage. --- Readme.md | 10 ++-- app/services/deepgram.py | 99 ++++++++++++++++++++++++++-------------- app/services/whisper.py | 89 ++++++++++++++++++++++-------------- app/transcript.py | 5 +- app/transcription.py | 12 +++-- app/utils.py | 8 ++++ transcriber.py | 93 +++++++++++++++++++++++-------------- 7 files changed, 203 insertions(+), 113 deletions(-) diff --git a/Readme.md b/Readme.md index 46813c3..28c422c 100644 --- a/Readme.md +++ b/Readme.md @@ -17,11 +17,11 @@ This transcription tool operates through a structured four-stage process: 2. Process: Downloads and converts sources for transcription preparation 3. Transcription: Utilizes [`openai-whisper`](https://github.com/openai/whisper) or [Deepgram](https://deepgram.com/) to generate transcripts. 1. Converts audio to text. - - Preserves raw wisper transcript in SRT - - Preserves raw deepgram output in JSON - 2. Summarize: Generates a summary of the transcript. [only available with deepgram] - 3. Upload: Saves raw transcript files in an AWS S3 Bucket [optional] - 4. Constructs the resulting transcript. + - Save as JSON: Preserves the output of the transcription service for future use. + - Save as SRT: Generates SRT file [whisper only] + 2. Summarize: Generates a summary of the transcript. [deepgram only] + 3. Upload: Saves transcription service output in an AWS S3 Bucket [optional] + 4. Finalizes the resulting transcript. - Process diarization. [deepgram only] - Process chapters. 4. Postprocess: Offers multiple options for further actions: diff --git a/app/services/deepgram.py b/app/services/deepgram.py index 4060e1d..5a231f5 100644 --- a/app/services/deepgram.py +++ b/app/services/deepgram.py @@ -1,3 +1,4 @@ +import json import mimetypes import deepgram @@ -45,14 +46,32 @@ def audio_to_text(self, audio_file): except Exception as e: raise Exception(f"(deepgram) Error transcribing audio to text: {e}") - def process_with_diarization_and_chapters(self, raw_transcript, chapters): + def write_to_json_file(self, transcription_service_output, transcript: Transcript): + transcription_service_output_file = utils.write_to_json( + transcription_service_output, f"{self.output_dir}/{transcript.source.loc}", transcript.title, is_metadata=True) + logger.info( + f"(deepgram) Model stored at: {transcription_service_output_file}") + # Add deepgram output file path to transcript's metadata file + if transcript.metadata_file is not None: + # Read existing content of the metadata file + with open(transcript.metadata_file, 'r') as file: + data = json.load(file) + # Add deepgram output + data['deepgram_output'] = transcription_service_output_file + # Write the updated dictionary back to the JSON file + with open(transcript.metadata_file, 'w') as file: + json.dump(data, file, indent=4) + + return transcription_service_output_file + + def process_with_diarization_and_chapters(self, transcription_service_output, chapters): logger.info( "(deepgram) Processing diarization with detected chapters...") try: para = "" string = "" curr_speaker = None - words = raw_transcript["results"]["channels"][0]["alternatives"][0][ + words = transcription_service_output["results"]["channels"][0]["alternatives"][0][ "words" ] words_pointer = 0 @@ -102,12 +121,12 @@ def process_with_diarization_and_chapters(self, raw_transcript, chapters): except Exception as e: raise Exception(f"Error combining deepgram chapters: {e}") - def process_with_diarization(self, raw_transcript): + def process_with_diarization(self, transcription_service_output): logger.info(f"(deepgram) Processing diarization...") para = "" string = "" curr_speaker = None - for word in raw_transcript["results"]["channels"][0]["alternatives"][0][ + for word in transcription_service_output["results"]["channels"][0]["alternatives"][0][ "words" ]: if word["speaker"] != curr_speaker: @@ -127,13 +146,13 @@ def process_with_diarization(self, raw_transcript): string = string + para return string - def process_with_chapters(self, raw_transcript, chapters): + def process_with_chapters(self, transcription_service_output, chapters): logger.info("(deepgram) Combining transcript with detected chapters...") try: chapters_pointer = 0 words_pointer = 0 result = "" - words = raw_transcript["results"]["channels"][0]["alternatives"][0][ + words = transcription_service_output["results"]["channels"][0]["alternatives"][0][ "words" ] # chapters index, start time, name @@ -163,9 +182,12 @@ def process_with_chapters(self, raw_transcript, chapters): except Exception as e: raise Exception(f"Error combining deepgram with chapters: {e}") - def process_summary(self, raw_transcript): + def process_summary(self, transcript: Transcript): + with open(transcript.transcription_service_output_file, "r") as outfile: + transcription_service_output = json.load(outfile) + try: - summaries = raw_transcript["results"]["channels"][0]["alternatives"][0][ + summaries = transcription_service_output["results"]["channels"][0]["alternatives"][0][ "summaries" ] summary = "" @@ -175,39 +197,48 @@ def process_summary(self, raw_transcript): except Exception as e: logger.error(f"Error getting summary: {e}") - def construct_transcript(self, raw_transcript, chapters): - if len(chapters) > 0: - # With chapters - if self.diarize: - # With diarization - return self.process_with_diarization_and_chapters(raw_transcript, chapters) - else: - # Without diarization - return self.process_with_chapters(raw_transcript, chapters) - else: - # Without chapters - if self.diarize: - # With diarization - return self.process_with_diarization(raw_transcript) + def finalize_transcript(self, transcript: Transcript): + try: + with open(transcript.transcription_service_output_file, "r") as outfile: + transcription_service_output = json.load(outfile) + + has_diarization = any( + 'speaker' in word for word in transcription_service_output['results']['channels'][0]['alternatives'][0]['words']) + has_chapters = len(transcript.source.chapters) > 0 + + if has_chapters: + # With chapters + if has_diarization: + # With diarization + return self.process_with_diarization_and_chapters(transcription_service_output, chapters) + else: + # Without diarization + return self.process_with_chapters(transcription_service_output, transcript.source.chapters) else: - # Without diarization - return raw_transcript["results"]["channels"][0]["alternatives"][0]["transcript"] + # Without chapters + if has_diarization: + # With diarization + return self.process_with_diarization(transcription_service_output) + else: + # Without diarization + return transcription_service_output["results"]["channels"][0]["alternatives"][0]["transcript"] - return result + return result + except Exception as e: + raise Exception(f"(deepgram) Error finalizing transcript: {e}") def transcribe(self, transcript: Transcript): try: - raw_transcript = self.audio_to_text(transcript.audio_file) - raw_transcript_file = utils.write_to_json( - raw_transcript, f"{self.output_dir}/{transcript.source.loc}", transcript.title, is_metadata=True) - logger.info( - f"(deepgram) Model stored at: {raw_transcript_file}") + transcription_service_output = self.audio_to_text( + transcript.audio_file) + transcript.transcription_service_output_file = self.write_to_json_file( + transcription_service_output, transcript) if self.upload: - application.upload_file_to_s3(raw_transcript_file) + application.upload_file_to_s3( + transcript.transcription_service_output_file) if self.summarize: - transcript.summary = self.process_summary(raw_transcript) - transcript.result = self.construct_transcript( - raw_transcript, transcript.source.chapters) + transcript.summary = self.process_summary(transcript) + transcript.result = self.finalize_transcript(transcript) return transcript except Exception as e: diff --git a/app/services/whisper.py b/app/services/whisper.py index 358523b..d988c75 100644 --- a/app/services/whisper.py +++ b/app/services/whisper.py @@ -1,3 +1,5 @@ +import json + import whisper from app import ( @@ -22,15 +24,31 @@ def audio_to_text(self, audio_file): try: my_model = whisper.load_model(self.model) result = my_model.transcribe(audio_file) - data = [] - for x in result["segments"]: - data.append(tuple((x["start"], x["end"], x["text"]))) - return data + + return result except Exception as e: logger.error( f"(wisper,{service}) Error transcribing audio to text: {e}") return + def write_to_json_file(self, transcription_service_output, transcript: Transcript): + transcription_service_output_file = utils.write_to_json( + transcription_service_output, f"{self.output_dir}/{transcript.source.loc}", transcript.title, is_metadata=True) + logger.info( + f"(whisper) Model stored at: {transcription_service_output_file}") + # Add whisper output file path to transcript's metadata file + if transcript.metadata_file is not None: + # Read existing content of the metadata file + with open(transcript.metadata_file, 'r') as file: + data = json.load(file) + # Add whisper output + data['whisper_output'] = transcription_service_output_file + # Write the updated dictionary back to the JSON file + with open(transcript.metadata_file, 'w') as file: + json.dump(data, file, indent=4) + + return transcription_service_output_file + def generate_srt(self, data, filename, loc): def format_time(time): hours = int(time / 3600) @@ -42,28 +60,28 @@ def format_time(time): output_file = f"{utils.configure_output_file_path(f'{self.output_dir}/{loc}', filename, is_metadata=True)}.srt" logger.info(f"(whisper) Writing srt to {output_file}...") with open(output_file, "w") as f: - for index, segment in enumerate(data): - start_time, end_time, text = segment + for index, segment in enumerate(data["segments"]): f.write(f"{index+1}\n") f.write( - f"{format_time(start_time)} --> {format_time(end_time)}\n") - f.write(f"{text.strip()}\n\n") + f"{format_time(segment['start'])} --> {format_time(segment['end'])}\n") + f.write(f"{segment['text'].strip()}\n\n") return output_file - def process_with_chapters(self, raw_transcript, chapters): + def process_with_chapters(self, transcription_service_output, chapters): + logger.info("(whisper) Combining transcript with detected chapters...") try: chapters_pointer = 0 transcript_pointer = 0 result = "" + segments = transcription_service_output["segments"] # chapters index, start time, name - # transcript start time, end time, text while chapters_pointer < len(chapters) and transcript_pointer < len( - raw_transcript + segments ): if ( chapters[chapters_pointer][1] - <= raw_transcript[transcript_pointer][0] + <= segments[transcript_pointer]["start"] ): result = ( result + "\n\n## " + @@ -71,11 +89,11 @@ def process_with_chapters(self, raw_transcript, chapters): ) chapters_pointer += 1 else: - result = result + raw_transcript[transcript_pointer][2] + result = result + segments[transcript_pointer]["text"] transcript_pointer += 1 - while transcript_pointer < len(raw_transcript): - result = result + raw_transcript[transcript_pointer][2] + while transcript_pointer < len(segments): + result = result + segments[transcript_pointer]["text"] transcript_pointer += 1 return result @@ -83,30 +101,31 @@ def process_with_chapters(self, raw_transcript, chapters): logger.error("Error combining chapters") logger.error(e) - def process_default(self): - result = "" - for x in self.result: - result = result + x[2] + " " - - return result - - def construct_transcript(self, raw_transcript, chapters): - if len(chapters) > 0: - # Source has chapters, add them to transcript - return self.process_with_chapters(raw_transcript, chapters) - else: - return self.process_default(raw_transcript) + def finalize_transcript(self, transcript: Transcript): + try: + with open(transcript.transcription_service_output_file, "r") as outfile: + transcription_service_output = json.load(outfile) + + has_chapters = len(transcript.source.chapters) > 0 + if has_chapters: + # Source has chapters, add them to transcript + return self.process_with_chapters(transcription_service_output, transcript.source.chapters) + else: + return transcription_service_output["text"] + except Exception as e: + raise Exception(f"(whisper) Error finalizing transcript: {e}") def transcribe(self, transcript: Transcript): try: - raw_transcript = self.audio_to_text(transcript.audio_file) - raw_transcript_file = self.generate_srt( - raw_transcript, transcript.title, transcript.source.loc) + transcription_service_output = self.audio_to_text( + transcript.audio_file) + transcript.transcription_service_output_file = self.write_to_json_file( + transcription_service_output, transcript) + transcript_srt_file = self.generate_srt( + transcription_service_output, transcript.title, transcript.source.loc) if self.upload: - application.upload_file_to_s3(raw_transcript_file) - - transcript.result = construct_transcript( - raw_transcript, transcript.source.chapters) + application.upload_file_to_s3(transcript_srt_file) + transcript.result = self.finalize_transcript(transcript) return transcript except Exception as e: diff --git a/app/transcript.py b/app/transcript.py index afadf04..ef5a5b5 100644 --- a/app/transcript.py +++ b/app/transcript.py @@ -24,8 +24,11 @@ class Transcript: - def __init__(self, source, test_mode=False): + def __init__(self, source, test_mode=False, metadata_file=None): self.source = source + self.metadata_file = metadata_file + # The output generated by the transcription service + self.transcription_service_output_file = None self.summary = None self.test_mode = test_mode self.logger = get_logger() diff --git a/app/transcription.py b/app/transcription.py index 14c049f..c622b07 100644 --- a/app/transcription.py +++ b/app/transcription.py @@ -118,7 +118,7 @@ def check_if_youtube(source: Source): raise Exception(f"Invalid source: {e}") try: if source.source_file.endswith(".mp3") or source.source_file.endswith(".wav") or source.source_file.endswith(".m4a"): - return Audio(source=source) + return Audio(source=source, chapters=chapters) if source.source_file.endswith("rss") or source.source_file.endswith(".xml"): return RSS(source=source) @@ -139,19 +139,21 @@ def check_if_youtube(source: Source): def _new_transcript_from_source(self, source: Source): """Helper method to initialize a new Transcript from source""" - self.transcripts.append(Transcript(source, self.test_mode)) - + metadata_file = None if source.preprocess: if self.preprocessing_output is None: # Save preprocessing output for each individual source - write_to_json( + metadata_file = utils.write_to_json( source.to_json(), f"{self.model_output_dir}/{source.loc}", - f"{source.title}_preprocess", is_metadata=True + f"{source.title}_metadata", is_metadata=True ) else: # Keep preprocessing outputs for later use self.preprocessing_output.append(source.to_json()) + # Initialize new transcript from source + self.transcripts.append(Transcript( + source=source, test_mode=self.test_mode, metadata_file=metadata_file)) def add_transcription_source( self, diff --git a/app/utils.py b/app/utils.py index dcda1c5..465b5b6 100644 --- a/app/utils.py +++ b/app/utils.py @@ -77,6 +77,14 @@ def configure_metadata_given_from_JSON(source): "existing_entries_not_covered_by_btctranscripts/status.json", []) metadata["excluded_media"] = [entry["media"] for entry in excluded_media] + # transcription service output + services = ["whisper", "deepgram"] + for service in services: + key = f"{service}_output" + metadata[key] = source.get(key, None) + if metadata[key] is not None: + check_if_valid_file_path(metadata[key]) + return metadata except KeyError as e: raise Exception(f"Parsing JSON: {e} is required") diff --git a/transcriber.py b/transcriber.py index 0afaa60..022b6fb 100644 --- a/transcriber.py +++ b/transcriber.py @@ -5,11 +5,15 @@ import click -from app import __app_name__, __version__, application +from app import ( + __app_name__, + __version__, + application, + utils +) from app.transcript import Transcript from app.transcription import Transcription from app.logging import configure_logger, get_logger -from app.utils import check_if_valid_file_path, write_to_json, configure_metadata_given_from_JSON logger = get_logger() @@ -174,6 +178,9 @@ def print_help(ctx, param, value): # Available transcription models and services @whisper @deepgram +# Available features for transcription services +@diarize +@summarize # Options for adding metadata @add_title @add_date @@ -181,13 +188,12 @@ def print_help(ctx, param, value): @add_speakers @add_category @add_loc -# Options for configuring the transcription process -@diarize -@summarize +# Options for configuring the transcription postprocess @open_pr @upload_to_s3 @save_to_markdown @noqueue +# Configuration options @model_output_dir @nocleanup @verbose_logging @@ -319,37 +325,56 @@ def preprocess( ) if not no_batched_output: # Batch write all preprocessed sources to JSON - write_to_json([preprocessed_source for preprocessed_source in transcription.preprocessing_output], - transcription.model_output_dir, "preprocessed_sources") + utils.write_to_json([preprocessed_source for preprocessed_source in transcription.preprocessing_output], + transcription.model_output_dir, "preprocessed_sources") except Exception as e: logger.info(f"Exited with error: {e}") @cli.command() -@click.argument("deepgram_json_file", nargs=1) -@click.argument("preprocess_json_file", nargs=1) -@diarize -def postprocess_deepgram_transcript( - deepgram_json_file, - preprocess_json_file, - diarize +@click.argument( + "service", + nargs=1, + type=click.Choice( + [ + "whisper", + "deepgram" + ] + ) +) +@click.argument("metadata_json_file", nargs=1) +# Options for configuring the transcription postprocess +@open_pr +@upload_to_s3 +@save_to_markdown +@noqueue +def postprocess( + metadata_json_file, + service, + pr: bool, + upload: bool, + markdown: bool, + noqueue: bool, ): - """Supply required metadata to postprocess a transcript. + """Postprocess the output of a transcription service. + Requires the metadata JSON file that is the output of the previous stage + of the transcription process. """ try: configure_logger(log_level=logging.INFO) - check_if_valid_file_path(deepgram_json_file) - check_if_valid_file_path(preprocess_json_file) - logger.info(f"Processing deepgram output from {deepgram_json_file}") + utils.check_if_valid_file_path(metadata_json_file) + logger.info( + f"Postprocessing {service} transcript from {metadata_json_file}") transcription = Transcription( - deepgram=True, queue=False, diarize=diarize) - with open(deepgram_json_file, "r") as outfile: - deepgram_output = json.load(outfile) - outfile.close() - with open(preprocess_json_file, "r") as outfile: - preprocess_output = json.load(outfile) - outfile.close() - metadata = configure_metadata_given_from_JSON(preprocess_output) + deepgram=service == "deepgram", + pr=pr, + upload=upload, + markdown=markdown, + queue=not noqueue, + ) + with open(metadata_json_file, "r") as outfile: + metadata_json = json.load(outfile) + metadata = utils.configure_metadata_given_from_JSON(metadata_json) transcription.add_transcription_source( source_file=metadata["source_file"], loc=metadata["loc"], @@ -361,14 +386,16 @@ def postprocess_deepgram_transcript( youtube_metadata=metadata["youtube_metadata"], chapters=metadata["chapters"], link=metadata["media"], - preprocess=False + preprocess=False, ) - # Process raw deepgram transcript - transcript_from_deepgram = transcription.transcripts[0] - transcript_from_deepgram.title = metadata["title"] - transcript_from_deepgram.result = transcription.service.construct_transcript( - deepgram_output, metadata["chapters"]) - transcription.postprocess(transcript_from_deepgram) + # Finalize transcription service output + transcript_to_postprocess = transcription.transcripts[0] + transcript_to_postprocess.title = metadata["title"] + transcript_to_postprocess.transcription_service_output_file = metadata[ + f"{service}_output"] + transcript_to_postprocess.result = transcription.service.finalize_transcript( + transcript_to_postprocess) + transcription.postprocess(transcript_to_postprocess) except Exception as e: logger.error(e) traceback.print_exc()