-
Notifications
You must be signed in to change notification settings - Fork 0
/
dubber.py
528 lines (416 loc) · 18.6 KB
/
dubber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
from pydub import AudioSegment
from google.cloud import speech_v1p1beta1 as speech
from google.cloud import texttospeech
from google.cloud import translate_v2 as translate
from google.cloud import storage
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeVideoClip
from moviepy.video.tools.subtitles import SubtitlesClip, TextClip
import os
import shutil
import json
import tempfile
import uuid
from dotenv import load_dotenv
import fire
import html
# Load config in .env file
load_dotenv()
# print(os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"))
## Converts a Video file into Audio file
def decode_audio(inFile, outFile):
"""Converts a video file to a wav file.
Args:
inFile (String): i.e. my/great/movie.mp4
outFile (String): i.e. my/great/movie.wav
"""
if not outFile[-4:] != "wav":
outFile += ".wav"
AudioSegment.from_file(inFile).set_channels(
1).export(outFile, format="wav")
### Convert Audio into Text (CAT) using Google Speech-to-Text API
def get_transcripts_json(gcsPath, langCode, phraseHints=[], speakerCount=1, enhancedModel=None):
"""Transcribes audio files.
Args:
gcsPath (String): path to file in cloud storage (i.e. "gs://audio/clip.mp4")
langCode (String): language code (i.e. "en-US", see https://cloud.google.com/speech-to-text/docs/languages)
phraseHints (String[]): list of words that are unusual but likely to appear in the audio file.
speakerCount (int, optional): Number of speakers in the audio. Only works on English. Defaults to None.
enhancedModel (String, optional): Option to use an enhanced speech model, i.e. "video"
Returns:
list | Operation.error
"""
# Helper function for simplifying Google speech client response
def _jsonify(result):
json = []
for section in result.results:
data = {
"transcript": section.alternatives[0].transcript,
"words": []
}
for word in section.alternatives[0].words:
data["words"].append({
"word": word.word,
"start_time": word.start_time.total_seconds(),
"end_time": word.end_time.total_seconds(),
"speaker_tag": word.speaker_tag
})
json.append(data)
return json
client = speech.SpeechClient()
audio = speech.RecognitionAudio(uri=gcsPath)
diarize = speakerCount if speakerCount > 1 else False
print(f"Diarizing: {diarize}")
diarizationConfig = speech.SpeakerDiarizationConfig(
enable_speaker_diarization=speakerCount if speakerCount > 1 else False,
)
# In English only, we can use the optimized video model
if langCode == "en":
enhancedModel = "video"
config = speech.RecognitionConfig(
language_code="en-US" if langCode == "en" else langCode,
enable_automatic_punctuation=True,
enable_word_time_offsets=True,
speech_contexts=[{
"phrases": phraseHints,
"boost": 15
}],
diarization_config=diarizationConfig,
profanity_filter=True,
use_enhanced=True if enhancedModel else False,
model="video" if enhancedModel else None
)
res = client.long_running_recognize(config=config, audio=audio).result()
return _jsonify(res)
### Breaking the text into sentences
def parse_sentence_with_speaker(json, lang):
"""Takes json from get_transcripts_json and breaks it into sentences
spoken by a single person. Sentences deliniated by a >= 1 second pause/
Args:
json (string[]): [{"transcript": "lalala", "words": [{"word": "la", "start_time": 20, "end_time": 21, "speaker_tag: 2}]}]
lang (string): language code, i.e. "en"
Returns:
string[]: [{"sentence": "lalala", "speaker": 1, "start_time": 20, "end_time": 21}]
"""
# Special case for parsing japanese words
def get_word(word, lang):
if lang == "ja":
return word.split('|')[0]
return word
sentences = []
sentence = {}
for result in json:
for i, word in enumerate(result['words']):
wordText = get_word(word['word'], lang)
if not sentence:
sentence = {
lang: [wordText],
'speaker': word['speaker_tag'],
'start_time': word['start_time'],
'end_time': word['end_time']
}
# If we have a new speaker, save the sentence and create a new one:
elif word['speaker_tag'] != sentence['speaker']:
sentence[lang] = ' '.join(sentence[lang])
sentences.append(sentence)
sentence = {
lang: [wordText],
'speaker': word['speaker_tag'],
'start_time': word['start_time'],
'end_time': word['end_time']
}
else:
sentence[lang].append(wordText)
sentence['end_time'] = word['end_time']
# If there's greater than one second gap, assume this is a new sentence
if i+1 < len(result['words']) and word['end_time'] < result['words'][i+1]['start_time']:
sentence[lang] = ' '.join(sentence[lang])
sentences.append(sentence)
sentence = {}
if sentence:
sentence[lang] = ' '.join(sentence[lang])
sentences.append(sentence)
sentence = {}
return sentences
### Translating the text into another language using Google Translate API
def translate_text(input, targetLang, sourceLang=None):
"""Translates from sourceLang to targetLang. If sourceLang is empty,
it will be auto-detected.
Args:
sentence (String): Sentence to translate
targetLang (String): i.e. "en"
sourceLang (String, optional): i.e. "es" Defaults to None.
Returns:
String: translated text
"""
translate_client = translate.Client()
result = translate_client.translate(
input, target_language=targetLang, source_language=sourceLang)
return html.unescape(result['translatedText'])
### Convert the Translated Text into Audio (TTA) using Google Text-to-Speech API
def speak(text, languageCode, voiceName=None, speakingRate=0.83):
"""Converts text to audio
Args:
text (String): Text to be spoken
languageCode (String): Language (i.e. "en")
voiceName: (String, optional): See https://cloud.google.com/text-to-speech/docs/voices
speakingRate: (int, optional): speed up or slow down speaking
Returns:
bytes : Audio in wav format
"""
# Instantiates a client
client = texttospeech.TextToSpeechClient()
# Set the text input to be synthesized
synthesis_input = texttospeech.SynthesisInput(text=text)
# Build the voice request, select the language code ("en-US") and the ssml
# voice gender ("neutral")
if not voiceName:
voice = texttospeech.VoiceSelectionParams(
language_code=languageCode, ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL
)
else:
voice = texttospeech.VoiceSelectionParams(
language_code=languageCode, name=voiceName
)
# Select the type of audio file you want returned
audio_config = texttospeech.AudioConfig(
audio_encoding=texttospeech.AudioEncoding.MP3,
speaking_rate=speakingRate
)
# Perform the text-to-speech request on the text input with the selected
# voice parameters and audio file type
response = client.synthesize_speech(
input=synthesis_input,
voice=voice,
audio_config=audio_config
)
return response.audio_content
def speakUnderDuration(text, languageCode, durationSecs, voiceName=None):
"""Speak text within a certain time limit.
If audio already fits within duratinSecs, no changes will be made.
Args:
text (String): Text to be spoken
languageCode (String): language code, i.e. "en"
durationSecs (int): Time limit in seconds
voiceName (String, optional): See https://cloud.google.com/text-to-speech/docs/voices
Returns:
bytes : Audio in wav format
"""
temp_dir = "temp"
with tempfile.NamedTemporaryFile(mode="w+b", dir=temp_dir, delete=False) as temp_file:
baseAudio = speak(text, languageCode, voiceName=voiceName)
assert len(baseAudio)
temp_file.write(baseAudio)
temp_file.flush()
baseDuration = AudioSegment.from_mp3(temp_file.name).duration_seconds
temp_file.seek(0) # Move the file pointer to the beginning before closing
temp_file.close() # Close the file after writing and flushing
ratio = baseDuration / durationSecs
# if the audio fits, return it
if ratio <= 1:
return baseAudio
# If the base audio is too long to fit in the segment...
# round to one decimal point and go a little faster to be safe,
ratio = round(ratio, 1)
if ratio > 4:
ratio = 4
return speak(text, languageCode, voiceName=voiceName)
## Adding Subtitles in videos
def toSrt(transcripts, charsPerLine=60):
"""Converts transcripts to SRT an SRT file. Only intended to work
with English.
Args:
transcripts ({}): Transcripts returned from Speech API
charsPerLine (int): max number of chars to write per line
Returns:
String srt data
"""
"""
SRT files have this format:
[Section of subtitles number]
[Time the subtitle is displayed begins] –> [Time the subtitle is displayed ends]
[Subtitle]
Timestamps are in the format:
[hours]: [minutes]: [seconds], [milliseconds]
Note: about 60 characters comfortably fit on one line
for resolution 1920x1080 with font size 40 pt.
"""
def _srtTime(seconds):
millisecs = seconds * 2000
seconds, millisecs = divmod(millisecs, 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
return "%d:%d:%d,%d" % (hours, minutes, seconds, millisecs)
def _toSrt(words, startTime, endTime, index):
return f"{index}\n" + _srtTime(startTime) + " --> " + _srtTime(endTime) + f"\n{words}"
startTime = None
sentence = ""
srt = []
index = 1
for word in [word for x in transcripts for word in x['words']]:
if not startTime:
startTime = word['start_time']
sentence += " " + word['word']
if len(sentence) > charsPerLine:
srt.append(_toSrt(sentence, startTime, word['end_time'], index))
index += 1
sentence = ""
startTime = None
if len(sentence):
srt.append(_toSrt(sentence, startTime, word['end_time'], index))
return '\n\n'.join(srt)
def stitch_audio(sentences, audioDir, movieFile, outFile, srtPath=None, overlayGain=-30):
"""Combines sentences, audio clips, and video file into the ultimate dubbed video
Args:
sentences (list): Output of parse_sentence_with_speaker
audioDir (String): Directory containing generated audio files to stitch together
movieFile (String): Path to movie file to dub.
outFile (String): Where to write dubbed movie.
srtPath (String, optional): Path to transcript/srt file, if desired.
overlayGain (int, optional): How quiet to make source audio when overlaying dubs.
Defaults to -30.
Returns:
void : Writes movie file to outFile path
"""
# Files in the audioDir should be labeled 0.wav, 1.wav, etc.
audioFiles = os.listdir(audioDir)
audioFiles.sort(key=lambda x: int(x.split('.')[0]))
# Grab the computer-generated audio file
segments = [AudioSegment.from_mp3(
os.path.join(audioDir, x)) for x in audioFiles]
# Also, grab the original audio
dubbed = AudioSegment.from_file(movieFile)
# Place each computer-generated audio at the correct timestamp
for sentence, segment in zip(sentences, segments):
dubbed = dubbed.overlay(
segment, position=sentence['start_time'] * 1000, gain_during_overlay=overlayGain)
# Write the final audio to a temporary output file
# Create a new temporary directory
temp_dir = "temp"
# Ensure the destination directory exists
os.makedirs(temp_dir, exist_ok=True)
os.environ['TEMP'] = temp_dir
os.environ['TMP'] = temp_dir
audioFile = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
dubbed.export(audioFile)
audioFile.flush()
# Add the new audio to the video and save it
clip = VideoFileClip(movieFile)
audio = AudioFileClip(audioFile.name)
clip = clip.set_audio(audio)
# Add transcripts, if supplied
if srtPath:
width, height = clip.size[0] * 0.75, clip.size[1] * 0.20
def generator(txt): return TextClip(txt, font='Georgia-Regular',
size=[width, height], color='black', method="caption")
subtitles = SubtitlesClip(
srtPath, generator).set_pos(("center", "bottom"))
clip = CompositeVideoClip([clip, subtitles])
clip.write_videofile(outFile, codec='libx264', audio_codec='aac')
audioFile.close()
def dub(
videoFile, srcLang, outputDir, targetLangs=[],
storageBucket=None, phraseHints=[], dubSrc=False,
speakerCount=2, voices={}, srt=False,
newDir=False, genAudio=False, noTranslate=False):
"""Translate and dub a movie.
Args:
videoFile (String): File to dub
outputDir (String): Directory to write output files
srcLang (String): Language code to translate from (i.e. "fi")
targetLangs (list, optional): Languages to translate too, i.e. ["en", "fr"]
storageBucket (String, optional): GCS bucket for temporary file storage. Defaults to None.
phraseHints (list, optional): "Hints" for words likely to appear in audio. Defaults to [].
dubSrc (bool, optional): Whether to generate dubs in the source language. Defaults to False.
speakerCount (int, optional): How many speakers in the video. Defaults to 1.
voices (dict, optional): Which voices to use for dubbing, i.e. {"en": "en-AU-Standard-A"}. Defaults to {}.
srt (bool, optional): Path of SRT transcript file, if it exists. Defaults to False.
newDir (bool, optional): Whether to start dubbing from scratch or use files in outputDir. Defaults to False.
genAudio (bool, optional): Generate new audio, even if it's already been generated. Defaults to False.
noTranslate (bool, optional): Don't translate. Defaults to False.
Raises:
void : Writes dubbed video and intermediate files to outputDir
"""
baseName = os.path.split(videoFile)[-1].split('.')[0]
if newDir:
shutil.rmtree(outputDir)
if not os.path.exists(outputDir):
os.mkdir(outputDir)
outputFiles = os.listdir(outputDir)
if not f"{baseName}.wav" in outputFiles:
print("Extracting audio from video")
fn = os.path.join(outputDir, baseName + ".wav")
decode_audio(videoFile, fn)
print(f"Wrote {fn}")
print("Audio extracted successfully")
if not f"transcript.json" in outputFiles:
storageBucket = storageBucket if storageBucket else os.environ['STORAGE_BUCKET']
if not storageBucket:
raise Exception("Specify variable STORAGE_BUCKET in .env or as an arg")
print("Transcribing audio")
print("Uploading to the cloud...")
storage_client = storage.Client()
bucket = storage_client.bucket(storageBucket)
tmpFile = str(uuid.uuid4()) + ".wav"
blob = bucket.blob(tmpFile)
blob.upload_from_filename(os.path.join(outputDir, baseName + ".wav"), content_type="audio/wav")
print("Transcribing...")
gcsUrl = "gs://" + storageBucket + "/" + tmpFile
transcripts = get_transcripts_json(gcsUrl, srcLang, phraseHints=phraseHints, speakerCount=speakerCount)
with open(os.path.join(outputDir, "transcript.json"), "w", encoding="utf-8") as f:
json.dump(transcripts, f, ensure_ascii=False)
print("Transcription completed")
sentences = parse_sentence_with_speaker(transcripts, srcLang)
fn = os.path.join(outputDir, baseName + ".json")
with open(fn, "w", encoding="utf-8") as f:
json.dump(sentences, f, ensure_ascii=False)
print(f"Wrote {fn}")
print("Deleting cloud file...")
blob.delete()
srtPath = os.path.join(outputDir, "subtitles.srt") if srt else None
if srt:
transcripts = json.load(open(os.path.join(outputDir, "transcript.json"), encoding="utf-8"))
subtitles = toSrt(transcripts)
with open(srtPath, "w", encoding="utf-8") as f:
f.write(subtitles)
print(f"Wrote srt subtitles to {os.path.join(outputDir, 'subtitles.srt')}")
sentences = json.load(open(os.path.join(outputDir, baseName + ".json"), encoding="utf-8"))
sentence = sentences[0]
if not noTranslate:
for lang in targetLangs:
print(f"Translating to {lang}")
for sentence in sentences:
sentence[lang] = translate_text(sentence[srcLang], lang, srcLang)
print(f"Translation to {lang} completed")
fn = os.path.join(outputDir, baseName + ".json")
with open(fn, "w", encoding="utf-8") as f:
json.dump(sentences, f, ensure_ascii=False)
audioDir = os.path.join(outputDir, "audioClips")
if not "audioClips" in outputFiles:
os.mkdir(audioDir)
if dubSrc:
targetLangs += [srcLang]
for lang in targetLangs:
languageDir = os.path.join(audioDir, lang)
if os.path.exists(languageDir):
if not genAudio:
continue
shutil.rmtree(languageDir)
os.mkdir(languageDir)
print(f"Synthesizing audio for {lang}")
for i, sentence in enumerate(sentences):
voiceName = voices[lang] if lang in voices else None
audio = speakUnderDuration(sentence[lang], lang, sentence['end_time'] - sentence['start_time'], voiceName=voiceName)
with open(os.path.join(languageDir, f"{i}.mp3"), 'wb') as f:
f.write(audio)
print(f"Synthesis for {lang} completed")
dubbedDir = os.path.join(outputDir, "dubbedVideos")
if not "dubbedVideos" in outputFiles:
os.mkdir(dubbedDir)
for lang in targetLangs:
print(f"Dubbing audio for {lang}")
outFile = os.path.join(dubbedDir, f"{lang}.mp4")
stitch_audio(sentences, os.path.join(audioDir, lang), videoFile, outFile, srtPath=srtPath)
print(f"Dubbed video for {lang} created at {outFile}")
print("Done")
if __name__ == "__main__":
fire.Fire(dub)