3de5c1e5 by 籽li

first

0 parents
File mode changed
import argparse
from pathlib import Path
import yaml
import sys
class ArgumentParser(argparse.ArgumentParser):
"""Simple implementation of ArgumentParser supporting config file
This class is originated from https://github.com/bw2/ConfigArgParse,
but this class is lack of some features that it has.
- Not supporting multiple config files
- Automatically adding "--config" as an option.
- Not supporting any formats other than yaml
- Not checking argument type
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.add_argument("--config", help="Give config file in yaml format")
def parse_known_args(self, args=None, namespace=None):
# Once parsing for setting from "--config"
_args, _ = super().parse_known_args(args, namespace)
if _args.config is not None:
if not Path(_args.config).exists():
self.error(f"No such file: {_args.config}")
with open(_args.config, "r", encoding="utf-8") as f:
d = yaml.safe_load(f)
if not isinstance(d, dict):
self.error("Config file has non dict value: {_args.config}")
for key in d:
for action in self._actions:
if key == action.dest:
break
else:
self.error(f"unrecognized arguments: {key} (from {_args.config})")
# NOTE(kamo): Ignore "--config" from a config file
# NOTE(kamo): Unlike "configargparse", this module doesn't check type.
# i.e. We can set any type value regardless of argument type.
self.set_defaults(**d)
return super().parse_known_args(args, namespace)
def get_commandline_args():
extra_chars = [
" ",
";",
"&",
"(",
")",
"|",
"^",
"<",
">",
"?",
"*",
"[",
"]",
"$",
"`",
'"',
"\\",
"!",
"{",
"}",
]
# Escape the extra characters for shell
argv = [
arg.replace("'", "'\\''")
if all(char not in arg for char in extra_chars)
else "'" + arg.replace("'", "'\\''") + "'"
for arg in sys.argv
]
return sys.executable + " " + " ".join(argv)
\ No newline at end of file
from moviepy.editor import *
from moviepy.video.tools.subtitles import SubtitlesClip
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=48, color='white')
subs = [((0, 2), 'sub1中文字幕'),
((2, 4), 'subs2'),
((4, 6), 'subs3'),
((6, 8), 'subs4')]
subtitles = SubtitlesClip(subs, generator)
video = VideoFileClip("examples/2022云栖大会_片段.mp4.mp4")
video = video.subclip(0, 8)
video = CompositeVideoClip([video, subtitles.set_pos(('center','bottom'))])
video.write_videofile("test_output.mp4")
\ No newline at end of file
import gradio as gr
from funasr import AutoModel
from videoclipper import VideoClipper
if __name__ == "__main__":
funasr_model = AutoModel(model="iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch",
model_revision="v2.0.4",
vad_model="damo/speech_fsmn_vad_zh-cn-16k-common-pytorch",
vad_model_revision="v2.0.4",
punc_model="damo/punc_ct-transformer_zh-cn-common-vocab272727-pytorch",
punc_model_revision="v2.0.4",
spk_model="damo/speech_campplus_sv_zh-cn_16k-common",
spk_model_revision="v2.0.2",
)
audio_clipper = VideoClipper(funasr_model)
def audio_recog(audio_input, sd_switch, hotwords):
# import pdb; pdb.set_trace()
print(audio_input)
return audio_clipper.recog(audio_input, sd_switch, hotwords=hotwords)
def audio_clip(dest_text, audio_spk_input, start_ost, end_ost, state):
return audio_clipper.clip(dest_text, start_ost, end_ost, state, dest_spk=audio_spk_input)
def video_recog(video_input, sd_switch, hotwords):
return audio_clipper.video_recog(video_input, sd_switch, hotwords)
def video_clip(dest_text, video_spk_input, start_ost, end_ost, state):
return audio_clipper.video_clip(dest_text, start_ost, end_ost, state, dest_spk=video_spk_input)
def video_clip_addsub(dest_text, video_spk_input, start_ost, end_ost, state, font_size, font_color):
return audio_clipper.video_clip(dest_text, start_ost, end_ost, state, font_size, font_color, add_sub=True,
dest_spk=video_spk_input)
top_md_1 = ("""
**<font color="#1785c4"></font>**
<div align="center">
<div style="display:flex; gap: 0.25rem;" align="center">
</div>
</div>
""")
top_md_2 = ("""
<div align="center">
<div style="display:flex; gap: 0.25rem;" align="center">
</div>
</div>
""")
top_md_3 = ("""
* Step1: 上传视频或音频文件(或使用下方的用例体验),点击 **<font color="#f7802b">识别</font>** 按钮
* Step2: 复制识别结果中所需的文字至右上方,或者右设置说话人标识,设置偏移与字幕配置(可选)
* Step3: 点击 **<font color="#f7802b">裁剪</font>** 按钮或 **<font color="#f7802b">裁剪并添加字幕</font>** 按钮获得结果
""")
# gradio interface
with gr.Blocks() as demo:
# gr.Image("./examples/guide.png", show_label=False)
gr.Markdown(top_md_1)
gr.Markdown(top_md_2)
gr.Markdown(top_md_3)
video_state = gr.State()
audio_state = gr.State()
with gr.Tab("🎥✂️视频裁剪 Video Clipping"):
with gr.Row():
with gr.Column():
video_input = gr.Video(label="🎥视频输入 Video Input")
with gr.Row():
video_sd_switch = gr.Radio(["no", "yes"], label="👥是否区分说话人 Recognize Speakers",
value='no')
hotwords_input = gr.Textbox(label="🚒热词 Hotwords")
recog_button2 = gr.Button("👂识别 Recognize")
video_text_output = gr.Textbox(label="✏️识别结果 Recognition Result")
video_srt_output = gr.Textbox(label="📖SRT字幕内容 RST Subtitles")
with gr.Column():
video_text_input = gr.Textbox(label="✏️待裁剪文本 Text to Clip (多段文本使用'#'连接)")
video_spk_input = gr.Textbox(label="✏️待裁剪说话人 Speaker to Clip (多个说话人使用'#'连接)")
with gr.Row():
video_start_ost = gr.Slider(minimum=-500, maximum=1000, value=0, step=50,
label="⏪开始位置偏移 Start Offset (ms)")
video_end_ost = gr.Slider(minimum=-500, maximum=1000, value=100, step=50,
label="⏩结束位置偏移 End Offset (ms)")
with gr.Row():
font_size = gr.Slider(minimum=10, maximum=100, value=32, step=2,
label="🔠字幕字体大小 Subtitle Font Size")
font_color = gr.Radio(["black", "white", "green", "red"], label="🌈字幕颜色 Subtitle Color",
value='white')
# font = gr.Radio(["黑体", "Alibaba Sans"], label="字体 Font")
with gr.Row():
clip_button2 = gr.Button("✂️裁剪\nClip")
clip_button3 = gr.Button("✂️裁剪并添加字幕\nClip and Generate Subtitles")
video_output = gr.Video(label="🎥裁剪结果 Audio Clipped")
video_mess_output = gr.Textbox(label="ℹ️裁剪信息 Clipping Log")
video_srt_clip_output = gr.Textbox(label="📖裁剪部分SRT字幕内容 Clipped RST Subtitles")
with gr.Tab("🔊✂️音频裁剪 Audio Clipping"):
with gr.Row():
with gr.Column():
audio_input = gr.Audio(label="🔊音频输入 Audio Input")
with gr.Row():
audio_sd_switch = gr.Radio(["no", "yes"], label="👥是否区分说话人 Recognize Speakers",
value='no')
hotwords_input2 = gr.Textbox(label="🚒热词 Hotwords")
recog_button1 = gr.Button("👂识别 Recognize")
audio_text_output = gr.Textbox(label="✏️识别结果 Recognition Result")
audio_srt_output = gr.Textbox(label="📖SRT字幕内容 RST Subtitles")
with gr.Column():
audio_text_input = gr.Textbox(label="✏️待裁剪文本 Text to Clip (多段文本使用'#'连接)")
audio_spk_input = gr.Textbox(label="✏️待裁剪说话人 Speaker to Clip (多个说话人使用'#'连接)")
with gr.Row():
audio_start_ost = gr.Slider(minimum=-500, maximum=1000, value=0, step=50,
label="⏪开始位置偏移 Start Offset (ms)")
audio_end_ost = gr.Slider(minimum=-500, maximum=1000, value=0, step=50,
label="⏩结束位置偏移 End Offset (ms)")
with gr.Row():
clip_button1 = gr.Button("✂️裁剪 Clip")
audio_output = gr.Audio(label="🔊裁剪结果 Audio Clipped")
audio_mess_output = gr.Textbox(label="ℹ️裁剪信息 Clipping Log")
audio_srt_clip_output = gr.Textbox(label="📖裁剪部分SRT字幕内容 Clipped RST Subtitles")
recog_button1.click(audio_recog,
inputs=[audio_input, audio_sd_switch, hotwords_input2],
outputs=[audio_text_output, audio_srt_output, audio_state])
clip_button1.click(audio_clip,
inputs=[audio_text_input, audio_spk_input, audio_start_ost, audio_end_ost, audio_state],
outputs=[audio_output, audio_mess_output, audio_srt_clip_output])
recog_button2.click(video_recog,
inputs=[video_input, video_sd_switch, hotwords_input],
outputs=[video_text_output, video_srt_output, video_state])
clip_button2.click(video_clip,
inputs=[video_text_input, video_spk_input, video_start_ost, video_end_ost, video_state],
outputs=[video_output, video_mess_output, video_srt_clip_output])
clip_button3.click(video_clip_addsub,
inputs=[video_text_input, video_spk_input, video_start_ost, video_end_ost, video_state,
font_size, font_color],
outputs=[video_output, video_mess_output, video_srt_clip_output])
# start gradio service in local
demo.launch()
def time_convert(ms):
ms = int(ms)
tail = ms % 1000
s = ms // 1000
mi = s // 60
s = s % 60
h = mi // 60
mi = mi % 60
h = "00" if h == 0 else str(h)
mi = "00" if mi == 0 else str(mi)
s = "00" if s == 0 else str(s)
tail = str(tail)
if len(h) == 1: h = '0' + h
if len(mi) == 1: mi = '0' + mi
if len(s) == 1: s = '0' + s
return "{}:{}:{},{}".format(h, mi, s, tail)
class Text2SRT():
def __init__(self, text, timestamp, offset=0):
self.token_list = [i for i in text.split() if len(i)]
self.timestamp = timestamp
start, end = timestamp[0][0] - offset, timestamp[-1][1] - offset
self.start_sec, self.end_sec = start, end
self.start_time = time_convert(start)
self.end_time = time_convert(end)
def text(self):
res = ""
for word in self.token_list:
if '\u4e00' <= word <= '\u9fff':
res += word
else:
res += " " + word
return res
def len(self):
return len(self.token_list)
def srt(self, acc_ost=0.0):
return "{} --> {}\n{}\n".format(
time_convert(self.start_sec+acc_ost*1000),
time_convert(self.end_sec+acc_ost*1000),
self.text())
def time(self, acc_ost=0.0):
return (self.start_sec/1000+acc_ost, self.end_sec/1000+acc_ost)
def generate_srt(sentence_list):
srt_total = ''
for i, d in enumerate(sentence_list):
t2s = Text2SRT(d['text'], d['timestamp'])
if 'spk' in d:
srt_total += "{} spk{}\n{}".format(i, d['spk'], t2s.srt())
else:
srt_total += "{}\n{}".format(i, t2s.srt())
return srt_total
def generate_srt_clip(sentence_list, start, end, begin_index=0, time_acc_ost=0.0):
start, end = int(start * 1000), int(end * 1000)
srt_total = ''
cc = 1 + begin_index
subs = []
for i, d in enumerate(sentence_list):
if d['timestamp'][-1][1] <= start:
continue
if d['timestamp'][0][0] >= end:
break
# parts in between
if (d['timestamp'][-1][1] <= end and d['timestamp'][0][0] > start) or (d['timestamp'][-1][1] == end and d['timestamp'][0][0] == start):
t2s = Text2SRT(d['text'], d['timestamp'], offset=start)
srt_total += "{}\n{}".format(cc, t2s.srt(time_acc_ost))
subs.append((t2s.time(time_acc_ost), t2s.text()))
cc += 1
continue
if d['timestamp'][0][0] <= start:
if not d['timestamp'][-1][1] > end:
for j, ts in enumerate(d['timestamp']):
if ts[1] > start:
break
_text = " ".join(d['text'].split()[j:])
_ts = d['timestamp'][j:]
else:
for j, ts in enumerate(d['timestamp']):
if ts[1] > start:
_start = j
break
for j, ts in enumerate(d['timestamp']):
if ts[1] > end:
_end = j
break
_text = " ".join(d['text'].split()[_start:_end])
_ts = d['timestamp'][_start:_end]
if len(ts):
t2s = Text2SRT(_text, _ts, offset=start)
srt_total += "{}\n{}".format(cc, t2s.srt(time_acc_ost))
subs.append((t2s.time(time_acc_ost), t2s.text()))
cc += 1
continue
if d['timestamp'][-1][1] > end:
for j, ts in enumerate(d['timestamp']):
if ts[1] > end:
break
_text = " ".join(d['text'].split()[:j])
_ts = d['timestamp'][:j]
if len(_ts):
t2s = Text2SRT(_text, _ts, offset=start)
srt_total += "{}\n{}".format(cc, t2s.srt(time_acc_ost))
subs.append(
(t2s.time(time_acc_ost), t2s.text())
)
cc += 1
continue
return srt_total, subs, cc
# step1: Recognize
python videoclipper.py --stage 1 \
--file ../examples/2022云栖大会_片段.mp4 \
--sd_switch yes \
--output_dir ./output
# now you can find recognition results and entire SRT file in ./output/
# step2: Clip
python videoclipper.py --stage 2 \
--file ../examples/2022云栖大会_片段.mp4 \
--output_dir ./output \
--dest_text '所以这个是我们办这个奖的初心啊,我们也会一届一届的办下去' \
# --dest_spk spk0 \
--start_ost 0 \
--end_ost 100 \
--output_file './output/res.mp4'
\ No newline at end of file
PUNC_LIST = [',', '。', '!', '?', '、']
def pre_proc(text):
res = ''
for i in range(len(text)):
if text[i] in PUNC_LIST:
continue
if '\u4e00' <= text[i] <= '\u9fff':
if len(res) and res[-1] != " ":
res += ' ' + text[i]+' '
else:
res += text[i]+' '
else:
res += text[i]
if res[-1] == ' ':
res = res[:-1]
return res
def proc(raw_text, timestamp, dest_text):
# simple matching
ld = len(dest_text.split())
mi, ts = [], []
offset = 0
while True:
fi = raw_text.find(dest_text, offset, len(raw_text))
# import pdb; pdb.set_trace()
ti = raw_text[:fi].count(' ')
if fi == -1:
break
offset = fi + ld
mi.append(fi)
ts.append([timestamp[ti][0]*16, timestamp[ti+ld-1][1]*16])
# import pdb; pdb.set_trace()
return ts
def proc_spk(dest_spk, sd_sentences):
ts = []
for d in sd_sentences:
d_start = d['timestamp'][0][0]
d_end = d['timestamp'][-1][1]
spkid=dest_spk[3:]
# import pdb; pdb.set_trace()
if str(d['spk']) == spkid and d_end-d_start>999:
ts.append([d['start']*16, d['end']*16])
return ts
def generate_vad_data(data, sd_sentences, sr=16000):
assert len(data.shape) == 1
vad_data = []
for d in sd_sentences:
d_start = round(d['ts_list'][0][0]/1000, 2)
d_end = round(d['ts_list'][-1][1]/1000, 2)
vad_data.append([d_start, d_end, data[int(d_start * sr):int(d_end * sr)]])
return vad_data
def write_state(output_dir, state):
for key in ['/recog_res_raw', '/timestamp', '/sentences']:#, '/sd_sentences']:
with open(output_dir+key, 'w') as fout:
fout.write(str(state[key[1:]]))
if 'sd_sentences' in state:
with open(output_dir+'/sd_sentences', 'w') as fout:
fout.write(str(state['sd_sentences']))
import os
def load_state(output_dir):
state = {}
with open(output_dir+'/recog_res_raw') as fin:
line = fin.read()
state['recog_res_raw'] = line
with open(output_dir+'/timestamp') as fin:
line = fin.read()
state['timestamp'] = eval(line)
with open(output_dir+'/sentences') as fin:
line = fin.read()
state['sentences'] = eval(line)
if os.path.exists(output_dir+'/sd_sentences'):
with open(output_dir+'/sd_sentences') as fin:
line = fin.read()
state['sd_sentences'] = eval(line)
return state
import numpy as np
def convert_pcm_to_float(data):
if data.dtype == np.float64:
return data
elif data.dtype == np.float32:
return data.astype(np.float64)
elif data.dtype == np.int16:
bit_depth = 16
elif data.dtype == np.int32:
bit_depth = 32
elif data.dtype == np.int8:
bit_depth = 8
else:
raise ValueError("Unsupported audio data type")
# Now handle the integer types
max_int_value = float(2 ** (bit_depth - 1))
if bit_depth == 8:
data = data - 128
return (data.astype(np.float64) / max_int_value)
\ No newline at end of file
import re
import os
import sys
import copy
import librosa
import logging
import argparse
import numpy as np
import soundfile as sf
import moviepy.editor as mpy
from moviepy.editor import *
from moviepy.video.tools.subtitles import SubtitlesClip
from subtitle_utils import generate_srt, generate_srt_clip
from argparse_tools import ArgumentParser, get_commandline_args
from trans_utils import pre_proc, proc, write_state, load_state, proc_spk, convert_pcm_to_float
class VideoClipper():
def __init__(self, funasr_model):
logging.warning("Initializing VideoClipper.")
self.funasr_model = funasr_model
self.GLOBAL_COUNT = 0
def recog(self, audio_input, sd_switch='no', state=None, hotwords=""):
if state is None:
state = {}
sr, data = audio_input
# Convert to float64 consistently (includes data type checking)
data = convert_pcm_to_float(data)
# assert sr == 16000, "16kHz sample rate required, {} given.".format(sr)
if sr != 16000: # resample with librosa
data = librosa.resample(data, orig_sr=sr, target_sr=16000)
if len(data.shape) == 2: # multi-channel wav input
logging.warning("Input wav shape: {}, only first channel reserved.").format(data.shape)
data = data[:,0]
state['audio_input'] = (sr, data)
if sd_switch == 'yes':
rec_result = self.funasr_model.generate(data, return_raw_text=True, is_final=True, hotword=hotwords)
res_srt = generate_srt(rec_result[0]['sentence_info'])
state['sd_sentences'] = rec_result[0]['sentence_info']
else:
rec_result = self.funasr_model.generate(data,
return_spk_res=False,
sentence_timestamp=True,
return_raw_text=True,
is_final=True,
hotword=hotwords)
res_srt = generate_srt(rec_result[0]['sentence_info'])
state['recog_res_raw'] = rec_result[0]['raw_text']
state['timestamp'] = rec_result[0]['timestamp']
state['sentences'] = rec_result[0]['sentence_info']
res_text = rec_result[0]['text']
return res_text, res_srt, state
def clip(self, dest_text, start_ost, end_ost, state, dest_spk=None):
# get from state
audio_input = state['audio_input']
recog_res_raw = state['recog_res_raw']
timestamp = state['timestamp']
sentences = state['sentences']
sr, data = audio_input
data = data.astype(np.float64)
all_ts = []
if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state:
for _dest_text in dest_text.split('#'):
if '[' in _dest_text:
match = re.search(r'\[(\d+),\s*(\d+)\]', _dest_text)
if match:
offset_b, offset_e = map(int, match.groups())
log_append = ""
else:
offset_b, offset_e = 0, 0
log_append = "(Bracket detected in dest_text but offset time matching failed)"
_dest_text = _dest_text[:_dest_text.find('[')]
else:
log_append = ""
offset_b, offset_e = 0, 0
_dest_text = pre_proc(_dest_text)
ts = proc(recog_res_raw, timestamp, _dest_text)
for _ts in ts: all_ts.append([_ts[0]+offset_b*16, _ts[1]+offset_e*16])
if len(ts) > 1 and match:
log_append += '(offsets detected but No.{} sub-sentence matched to {} periods in audio, \
offsets are applied to all periods)'
else:
for _dest_spk in dest_spk.split('#'):
ts = proc_spk(_dest_spk, state['sd_sentences'])
for _ts in ts: all_ts.append(_ts)
log_append = ""
ts = all_ts
# ts.sort()
srt_index = 0
clip_srt = ""
if len(ts):
start, end = ts[0]
start = min(max(0, start+start_ost*16), len(data))
end = min(max(0, end+end_ost*16), len(data))
res_audio = data[start:end]
start_end_info = "from {} to {}".format(start/16000, end/16000)
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index)
clip_srt += srt_clip
for _ts in ts[1:]: # multiple sentence input or multiple output matched
start, end = _ts
start = min(max(0, start+start_ost*16), len(data))
end = min(max(0, end+end_ost*16), len(data))
start_end_info += ", from {} to {}".format(start, end)
res_audio = np.concatenate([res_audio, data[start+start_ost*16:end+end_ost*16]], -1)
srt_clip, _, srt_index = generate_srt_clip(sentences, start/16000.0, end/16000.0, begin_index=srt_index-1)
clip_srt += srt_clip
if len(ts):
message = "{} periods found in the speech: ".format(len(ts)) + start_end_info + log_append
else:
message = "No period found in the speech, return raw speech. You may check the recognition result and try other destination text."
res_audio = data
return (sr, res_audio), message, clip_srt
def video_recog(self, vedio_filename, sd_switch='no', hotwords=""):
video = mpy.VideoFileClip(vedio_filename)
# Extract the base name, add '_clip.mp4', and 'wav'
base_name, _ = os.path.splitext(vedio_filename)
clip_video_file = base_name + '_clip.mp4'
audio_file = base_name + '.wav'
video.audio.write_audiofile(audio_file)
wav = librosa.load(audio_file, sr=16000)[0]
# delete the audio file after processing
if os.path.exists(audio_file):
os.remove(audio_file)
state = {
'vedio_filename': vedio_filename,
'clip_video_file': clip_video_file,
'video': video,
}
# res_text, res_srt = self.recog((16000, wav), state)
return self.recog((16000, wav), sd_switch, state, hotwords)
def video_clip(self, dest_text, start_ost, end_ost, state, font_size=32, font_color='white', add_sub=False, dest_spk=None):
# get from state
recog_res_raw = state['recog_res_raw']
timestamp = state['timestamp']
sentences = state['sentences']
video = state['video']
clip_video_file = state['clip_video_file']
vedio_filename = state['vedio_filename']
all_ts = []
srt_index = 0
if dest_spk is None or dest_spk == '' or 'sd_sentences' not in state:
for _dest_text in dest_text.split('#'):
if '[' in _dest_text:
match = re.search(r'\[(\d+),\s*(\d+)\]', _dest_text)
if match:
offset_b, offset_e = map(int, match.groups())
log_append = ""
else:
offset_b, offset_e = 0, 0
log_append = "(Bracket detected in dest_text but offset time matching failed)"
_dest_text = _dest_text[:_dest_text.find('[')]
else:
offset_b, offset_e = 0, 0
log_append = ""
_dest_text = pre_proc(_dest_text)
ts = proc(recog_res_raw, timestamp, _dest_text)
for _ts in ts: all_ts.append([_ts[0]+offset_b*16, _ts[1]+offset_e*16])
if len(ts) > 1 and match:
log_append += '(offsets detected but No.{} sub-sentence matched to {} periods in audio, \
offsets are applied to all periods)'
else:
for _dest_spk in dest_spk.split('#'):
ts = proc_spk(_dest_spk, state['sd_sentences'])
for _ts in ts: all_ts.append(_ts)
time_acc_ost = 0.0
ts = all_ts
# ts.sort()
clip_srt = ""
if len(ts):
start, end = ts[0][0] / 16000, ts[0][1] / 16000
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index, time_acc_ost=time_acc_ost)
start, end = start+start_ost/1000.0, end+end_ost/1000.0
video_clip = video.subclip(start, end)
start_end_info = "from {} to {}".format(start, end)
clip_srt += srt_clip
if add_sub:
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color)
subtitles = SubtitlesClip(subs, generator)
video_clip = CompositeVideoClip([video_clip, subtitles.set_pos(('center','bottom'))])
concate_clip = [video_clip]
time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0)
for _ts in ts[1:]:
start, end = _ts[0] / 16000, _ts[1] / 16000
srt_clip, subs, srt_index = generate_srt_clip(sentences, start, end, begin_index=srt_index-1, time_acc_ost=time_acc_ost)
chi_subs = []
sub_starts = subs[0][0][0]
for sub in subs:
chi_subs.append(((sub[0][0]-sub_starts, sub[0][1]-sub_starts), sub[1]))
start, end = start+start_ost/1000.0, end+end_ost/1000.0
_video_clip = video.subclip(start, end)
start_end_info += ", from {} to {}".format(start, end)
clip_srt += srt_clip
if add_sub:
generator = lambda txt: TextClip(txt, font='./font/STHeitiMedium.ttc', fontsize=font_size, color=font_color)
subtitles = SubtitlesClip(chi_subs, generator)
_video_clip = CompositeVideoClip([_video_clip, subtitles.set_pos(('center','bottom'))])
# _video_clip.write_videofile("debug.mp4", audio_codec="aac")
concate_clip.append(copy.copy(_video_clip))
time_acc_ost += end+end_ost/1000.0 - (start+start_ost/1000.0)
message = "{} periods found in the audio: ".format(len(ts)) + start_end_info
logging.warning("Concating...")
if len(concate_clip) > 1:
video_clip = concatenate_videoclips(concate_clip)
clip_video_file = clip_video_file[:-4] + '_no{}.mp4'.format(self.GLOBAL_COUNT)
video_clip.write_videofile(clip_video_file, audio_codec="aac", temp_audiofile="video_no{}.mp4".format(self.GLOBAL_COUNT))
self.GLOBAL_COUNT += 1
else:
clip_video_file = vedio_filename
message = "No period found in the audio, return raw speech. You may check the recognition result and try other destination text."
srt_clip = ''
return clip_video_file, message, clip_srt
def get_parser():
parser = ArgumentParser(
description="ClipVideo Argument",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--stage",
type=int,
choices=(1, 2),
help="Stage, 0 for recognizing and 1 for clipping",
required=True
)
parser.add_argument(
"--file",
type=str,
default=None,
help="Input file path",
required=True
)
parser.add_argument(
"--sd_switch",
type=str,
choices=("no", "yes"),
default="no",
help="Turn on the speaker diarization or not",
)
parser.add_argument(
"--output_dir",
type=str,
default='./output',
help="Output files path",
)
parser.add_argument(
"--dest_text",
type=str,
default=None,
help="Destination text string for clipping",
)
parser.add_argument(
"--dest_spk",
type=str,
default=None,
help="Destination spk id for clipping",
)
parser.add_argument(
"--start_ost",
type=int,
default=0,
help="Offset time in ms at beginning for clipping"
)
parser.add_argument(
"--end_ost",
type=int,
default=0,
help="Offset time in ms at ending for clipping"
)
parser.add_argument(
"--output_file",
type=str,
default=None,
help="Output file path"
)
return parser
def runner(stage, file, sd_switch, output_dir, dest_text, dest_spk, start_ost, end_ost, output_file, config=None):
audio_suffixs = ['.wav','.mp3','.aac','.m4a','.flac']
video_suffixs = ['.mp4','.avi','.mkv','.flv','.mov','.webm','.ts','.mpeg']
_,ext = os.path.splitext(file)
if ext.lower() in audio_suffixs:
mode = 'audio'
elif ext.lower() in video_suffixs:
mode = 'video'
else:
logging.error("Unsupported file format: {}\n\nplease choise one of the following: {}".format(file),audio_suffixs+video_suffixs)
sys.exit(1) # exit if the file is not supported
while output_dir.endswith('/'):
output_dir = output_dir[:-1]
if not os.path.exists(output_dir):
os.mkdir(output_dir)
if stage == 1:
from funasr import AutoModel
# initialize funasr automodel
logging.warning("Initializing modelscope asr pipeline.")
funasr_model = AutoModel(model="iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch",
model_revision="v2.0.4",
vad_model="damo/speech_fsmn_vad_zh-cn-16k-common-pytorch",
vad_model_revision="v2.0.4",
punc_model="damo/punc_ct-transformer_zh-cn-common-vocab272727-pytorch",
punc_model_revision="v2.0.4",
spk_model="damo/speech_campplus_sv_zh-cn_16k-common",
spk_model_revision="v2.0.2",
)
audio_clipper = VideoClipper(funasr_model)
if mode == 'audio':
logging.warning("Recognizing audio file: {}".format(file))
wav, sr = librosa.load(file, sr=16000)
res_text, res_srt, state = audio_clipper.recog((sr, wav), sd_switch)
if mode == 'video':
logging.warning("Recognizing video file: {}".format(file))
res_text, res_srt, state = audio_clipper.video_recog(file, sd_switch)
total_srt_file = output_dir + '/total.srt'
with open(total_srt_file, 'w') as fout:
fout.write(res_srt)
logging.warning("Write total subtitle to {}".format(total_srt_file))
write_state(output_dir, state)
logging.warning("Recognition successed. You can copy the text segment from below and use stage 2.")
print(res_text)
if stage == 2:
audio_clipper = VideoClipper(None)
if mode == 'audio':
state = load_state(output_dir)
wav, sr = librosa.load(file, sr=16000)
state['audio_input'] = (sr, wav)
(sr, audio), message, srt_clip = audio_clipper.clip(dest_text, start_ost, end_ost, state, dest_spk=dest_spk)
if output_file is None:
output_file = output_dir + '/result.wav'
clip_srt_file = output_file[:-3] + 'srt'
logging.warning(message)
sf.write(output_file, audio, 16000)
assert output_file.endswith('.wav'), "output_file must ends with '.wav'"
logging.warning("Save clipped wav file to {}".format(output_file))
with open(clip_srt_file, 'w') as fout:
fout.write(srt_clip)
logging.warning("Write clipped subtitle to {}".format(clip_srt_file))
if mode == 'video':
state = load_state(output_dir)
state['vedio_filename'] = file
if output_file is None:
state['clip_video_file'] = file[:-4] + '_clip.mp4'
else:
state['clip_video_file'] = output_file
clip_srt_file = state['clip_video_file'][:-3] + 'srt'
state['video'] = mpy.VideoFileClip(file)
clip_video_file, message, srt_clip = audio_clipper.video_clip(dest_text, start_ost, end_ost, state, dest_spk=dest_spk)
logging.warning("Clipping Log: {}".format(message))
logging.warning("Save clipped mp4 file to {}".format(clip_video_file))
with open(clip_srt_file, 'w') as fout:
fout.write(srt_clip)
logging.warning("Write clipped subtitle to {}".format(clip_srt_file))
def main(cmd=None):
print(get_commandline_args(), file=sys.stderr)
parser = get_parser()
args = parser.parse_args(cmd)
kwargs = vars(args)
runner(**kwargs)
if __name__ == '__main__':
main()
\ No newline at end of file
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!