#!/usr/bin/env python3
import re, json
import os
from datetime import timedelta
def from_vtt(vtt_string):
VTT_TIMECODE_PATTERN = (
r"((?:\d{2}:)?\d{2}:\d{2}\.\d{3}) --> ((?:\d{2}:)?\d{2}:\d{2}\.\d{3})"
)
VTT_LINE_NUMBER_PATTERN = r"^\d+$"
parts = re.split(r"\n\n+", vtt_string.strip())
if parts[0].startswith("WEBVTT"):
parts.pop(0)
subtitles = []
for part in parts:
lines = part.split("\n")
match = re.match(VTT_TIMECODE_PATTERN, lines[0])
if not match:
if re.match(VTT_LINE_NUMBER_PATTERN, lines[0]):
lines.pop(0)
match = re.match(VTT_TIMECODE_PATTERN, lines[0])
if not match:
continue
start, end = match.groups()
content = "\n".join(lines[1:]) + "\n"
subtitles.append(
{
"start": start,
"end": end,
"content": (
content.replace("-\n", "\n")
.replace("-\n", "\n")
.replace("-", " ")
.replace("%", " ")
.replace(" ", " ")
.replace(" ", " ")
.replace(" ", "")
.replace("", "")
.replace(" \n", "\n")
)[:-1],
}
)
# def sanitizevttwordlevel(subtitles):
# errorwords = []
# newords = {}
# for subtitle in subtitles:
# for word in subtitle["content"].split(" "):
# if ("" in word):
# newword = None
# if (len(word.split("")) > 1):
# newword = word.replace("", " ")
# if (len(word.split("")) > 1):
# newword = word.replace("", " ")
# if newword:
# original = word.split("")[1].split("")[0]
# if original in errorwords:
# for i in errorwords[original]:
# else:
# errorwords[orig].append(word)
# error = False
# if "" in subtitle["content"]:
# if len(subtitle["content"].split("")) > 1:
# if subtitle["content"].split("")[0][-1] != " ":
# error = True
# if len(subtitle["content"].split("")) > 1:
# if subtitle["content"].split("")[1][0] != " ":
# error = True
# if error:
# word = subtitle["content"].split("")[1].split("")[0]
# errorwords.append(word)
# for word in subtitle["content"].split(" "):
# if word.replace("")
# for subtitle in subtitles:
# for words in subtitle["content"].split(" "):
# if word in errorwords:
# subtitle["content"]
# sanitizevttwordlevel(subtitles)
return subtitles
def to_vtt(subtitles):
vtt_content = "WEBVTT\n\n\n"
for idx, subtitle in enumerate(subtitles):
content = subtitle["content"]
if not subtitle.get("split", False):
start = subtitle["start"]
end = subtitle["end"]
vtt_content += f"{start} --> {end}\n{content}\n\n\n"
else:
vtt_content += f"NOTE {content}\n\n\n"
return vtt_content.strip()
def to_stacked_vtt(subtitles):
vtt_content = "WEBVTT\n\n\n"
buffer = ""
for subtitle in subtitles:
if subtitle.get("split", False):
buffer = ""
continue
if len(buffer) != 0:
if str(subtitle["content"].strip())[-1] == ".":
buffer += "\n"
else:
buffer += " "
buffer += subtitle["content"].strip()
vtt_content += f"{subtitle['start']} --> {subtitle['end']}\n"
vtt_content += buffer
vtt_content += "\n\n\n"
return vtt_content
def script_from_word_vtt(wordvtt):
subtitles = from_vtt(wordvtt)
print(f"VTT {len(subtitles)} lines. Generating script file from VTT.")
sentences = []
EXCEPTION_FLAG, ADD_NEXT_SENTENCE = "", 0
for n, subtitle in enumerate(subtitles):
sentence = subtitle["content"].replace("", "").replace("", "")
if ((sentences[-1] if sentences else None) != sentence) or ADD_NEXT_SENTENCE:
sentences.append(sentence)
ADD_NEXT_SENTENCE = 0
if subtitle["content"][-4:] == "":
# print(f"{len(sentences)} END {subtitle["content"]}")
ADD_NEXT_SENTENCE = 1
if n + 2 < len(subtitles):
if (
subtitles[n + 2]["content"].replace("", "").replace("", "")
!= sentence
):
ADD_NEXT_SENTENCE = 0
return sentences
def create_word_scenes(wordvtt, scriptraw):
subtitles = from_vtt(wordvtt)
scripts = [i for i in scriptraw.split("\n") if i]
print(f"VTT {len(subtitles)} lines, Script {len(scripts)} lines")
scenes = []
for n, script in enumerate(scripts):
if len(script.split(" ")) == 1:
continue
scenes.append({"scene": script, "timestamp": []})
scenes_cur = 0
for n, subtitle in enumerate(subtitles):
sentence = subtitle["content"].replace("", "").replace("", "")
if len(sentence.split(" ")) == 1:
continue
if sentence != scenes[scenes_cur].get("scene"):
if sentence == scenes[scenes_cur + 1].get("scene"):
scenes_cur += 1
else:
print(
f"Error, Mismatch in scenes\n=>\"[{scenes_cur}] {scenes[scenes_cur].get("scene")}\" or \"[{scenes_cur+1}] {scenes[scenes_cur+1].get("scene")}\" != \"{sentence}\""
)
return
current_scene = scenes[scenes_cur]
if current_scene["timestamp"]:
word_idx = current_scene["timestamp"][-1]["index"] + 1
else:
word_idx = 0
# print(scenes_cur, subtitle, word_idx, sentence)
if ("" not in subtitle["content"]) and word_idx >= len(sentence.split(" ")):
pass
if ("" in subtitle["content"]) and word_idx >= len(sentence.split(" ")):
print(
f"Error, index wrong. {scenes_cur}, word: {word_idx}, total words: {len(sentence.split(" "))}\n{subtitle}"
)
word_idx = 0
scenes_cur += 1
current_scene = scenes[scenes_cur]
if current_scene["timestamp"]:
word_idx = current_scene["timestamp"][-1]["index"] + 1
else:
word_idx = 0
print(f"Changed to {word_idx}, {scenes_cur}")
if "" in subtitle["content"]:
# print(subtitle["content"])
word = subtitle["content"].split("")[1].split("")[0]
if word not in sentence.split(" "):
print(f'Error, Mismatch\n=> "{word}" not in "{sentence}"')
return
try:
assert sentence.split(" ")[word_idx] == word
except:
print(f'Error, Mismatch\n=> "{word}" != [{word_idx}] of "{sentence}"')
return
word_time = {
"start": subtitle["start"],
"end": subtitle["end"],
"index": word_idx,
"word": word,
}
current_scene["timestamp"].append(word_time)
# print(json.dumps(scenes, indent=2))
for scene in scenes:
if len(scene["scene"].split(" ")) != len(scene["timestamp"]):
print("Error, Mismatch length")
return
if "" in scene["scene"].split(" "):
print(repr(scene["scene"]))
full_script, full_scenes = [], []
for scene in scenes:
full_script += scene["scene"].split(" ")[:-1]
full_script.append(scene["scene"].split(" ")[-1] + "##")
full_scenes += scene["timestamp"]
for i, j in zip(full_script, full_scenes):
if i.replace("##", "") != j["word"]:
print("Error, Mismatch")
return
assert len(full_scenes) == len(full_script)
return full_script, full_scenes
# Detect long break or change in context, inserts section break into script.
def autobreak(lines, times):
from datetime import timedelta
def parsetime(time_str):
minutes, seconds = time_str.split(":")
seconds, milliseconds = seconds.split(".")
td = timedelta(
minutes=int(minutes), seconds=int(seconds), milliseconds=int(milliseconds)
)
return td
script = []
long_breaks = []
tmark = parsetime("0:0.0")
for i, j in zip(lines, times):
tdiff = parsetime(j["start"]) - tmark
tmark = parsetime(j["end"])
if tdiff > parsetime("0:0.0"):
long_breaks.append(tdiff)
# print()
# print(i, end=" ")
# print()
mean_break = parsetime("0:0.0")
for i in long_breaks:
mean_break += i / len(long_breaks)
print(mean_break)
script = ""
tmark = parsetime("0:0.0")
tmp = " "
continous_line = 0
for i, j in zip(lines, times):
tdiff = parsetime(j["start"]) - tmark
tmark = parsetime(j["end"])
if tdiff > mean_break and tmp[-1] != ".":
script += "\n"
if tdiff >= mean_break and tmp[-1] == ".":
script += "\n"
continous_line = 0
else:
continous_line += 1
script += i.replace("##", "")
if i[-1] == ".":
script += "\n"
elif "##" in i:
script += "\n"
else:
script += " "
tmp = i
return script
def scene_from_new_script(raw_script, full_script, full_scenes):
mod_script = raw_script.replace("\n", " \n ").split(" ")
mod_script = [i for i in mod_script if i]
n = 0
while True:
if mod_script[n] == "\n":
mod_script[n - 1] += "\n"
del mod_script[n]
n -= 1
n += 1
if n == len(mod_script):
break
# print(mod_script)
print(f"Original: {len(full_script)}, Modded: {len(mod_script)}")
allowed_list = [".", "\n", "\n\n", ",", "?", "##"]
def normalized(x):
for i in allowed_list:
x = x.replace(i, "")
return x.upper()
same = lambda a, b: normalized(a) == normalized(b)
new_script, new_timestamp, orig_index, n = [], [], 0, 0
fail = 0
while n < len(mod_script):
print(f"{repr(mod_script[n]):>20} ? {repr(full_script[orig_index])}")
word = mod_script[n]
if same(word, full_script[orig_index].replace("##", "")):
cur = full_scenes[orig_index]
new_script.append(word.replace("##", ""))
new_timestamp.append({"start": cur["start"], "end": cur["end"]})
fail = 0
else:
if fail > 10:
print("Error: Failed to match words,")
return
# print("Back")
fail += 1
n -= 1
n, orig_index = n + 1, orig_index + 1
assert len(new_script) == len(new_timestamp)
return new_script, new_timestamp
def build_new_subtitle(new_script, new_timestamp):
buffer, new_scenes, start, end = [], [], None, None
current_scene = []
# print(" ".join(new_script).split("\n"))
for i, j in zip(new_script, new_timestamp):
if "\n" in i:
buffer.append(i.replace("\n", ""))
current_scene.append(
{
"content": " ".join(buffer).replace("##", ""),
"start": start,
"end": j["end"],
}
)
buffer, start = [], None
if "\n\n" in i:
print(
f"Section break at line #{len(current_scene):<3}| \"{current_scene[-1]["content"]}\""
)
new_scenes.append(current_scene)
current_scene = []
else:
buffer.append(i)
if not start:
start = j["start"]
if start:
buffer.append(i.replace("\n", ""))
current_scene.append(
{"content": " ".join(buffer), "start": start, "end": j["end"]}
)
if current_scene != (new_scenes[-1] if new_scenes else None):
new_scenes.append(current_scene)
# print("\n\n".join(["\n".join([j["content"] for j in i]) for i in new_scenes]))
newsub = []
for n, i in enumerate(new_scenes):
newsub += i
if n < len(new_scenes) - 1:
newsub.append(
{"content": "Break", "start": None, "end": None, "split": True}
)
return newsub
def saveFile(filename, data, override=False):
if os.path.exists(filename) and not override:
print(f"File {filename} already exists.")
return -1
with open(filename, "w") as f:
f.write(data)
def openFile(filename):
with open(filename, "r") as f:
data = f.read()
if not data:
return -1
return data
def main(vttfile, scriptfile):
modfile = ".".join(scriptfile.split(".")[:-1]) + ".script"
x = create_word_scenes(openFile(vttfile), openFile(scriptfile))
if not x:
sys.exit(-1)
full_script, full_scenes = x
if not os.path.exists(modfile):
genscript = autobreak(full_script, full_scenes)
saveFile(modfile, genscript)
print(f"Saved modification file as {modfile}. Modify it and return back.")
else:
x = scene_from_new_script(openFile(modfile), full_script, full_scenes)
if not x:
sys.exit(-1)
a, b = x
final_vtt = build_new_subtitle(a, b)
jsonfile = ".".join(vttfile.split(".")[:-1]) + ".json"
saveFile(jsonfile, json.dumps(final_vtt, indent=2), True)
print(f"Saved JSON file as {jsonfile}. Fix it, and convert it to VTT.")
if __name__ == "__main__":
import sys
if len(sys.argv) not in (2, 3):
print(
f"Usage: {sys.argv[0].split("/")[-1]} [vtt file] (txt file)\n"
f" {sys.argv[0].split("/")[-1]} [JSON file]\n"
"** Only output from openai-whisper with '--word-timestamp true' is accepted.)\n"
"** You have to run this for first time, and then fix .script file, and then re-run this script.\n"
"** Adding newline/period/commas are onlt permitted. Fix else in JSON file."
)
sys.exit()
vtt = sys.argv[1]
print(f"\n[{vtt}]")
if len(sys.argv) == 3:
script = sys.argv[2]
if (not os.path.exists(vtt)) or (not os.path.exists(script)):
print(f"Input file doesnt exists.")
sys.exit(-1)
main(vtt, script)
else:
if ".json" in vtt:
final_vtt = json.loads(openFile(vtt))
orgf = ".".join(vtt.split(".")[:-1])
print(f"Saved VTT file as {orgf}.final.vtt.")
saveFile(orgf + ".final.vtt", to_vtt(final_vtt), True)
saveFile(orgf + ".stacked.vtt", to_stacked_vtt(final_vtt), True)
sys.exit(0)
if not os.path.exists(vtt):
print(f"Input file doesnt exists.")
sys.exit(-1)
script = ".".join(vtt.split(".")[:-1]) + ".txt"
saveFile(script, "\n".join(script_from_word_vtt(openFile(vtt))))
main(vtt, script)