666 lines
25 KiB
Python
666 lines
25 KiB
Python
import os
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
import shutil
|
||
import tempfile
|
||
import json
|
||
from pathlib import Path
|
||
import tkinter as tk
|
||
from tkinter import filedialog, messagebox
|
||
import customtkinter as ctk
|
||
import sys
|
||
|
||
|
||
def app_dir():
|
||
if getattr(sys, "frozen", False):
|
||
return os.path.dirname(sys.executable)
|
||
return os.path.dirname(os.path.abspath(__file__))
|
||
|
||
|
||
# ===== Appearance =====
|
||
ctk.set_appearance_mode("system") # "light" | "dark" | "system"
|
||
ctk.set_default_color_theme("blue") # "blue" | "green" | "dark-blue"
|
||
|
||
|
||
class VideoConcatenatorApp(ctk.CTk):
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
self.title("Video Concatenator")
|
||
self.geometry("720x760")
|
||
self.minsize(720, 760)
|
||
|
||
# ===== State =====
|
||
self.items = [] # list[dict]: {path:str, checked:BooleanVar}
|
||
self.active_index = None # currently selected row (for move/remove)
|
||
|
||
self.output_name_var = tk.StringVar(value="output")
|
||
self.ext_var = tk.StringVar(value=".mp4")
|
||
self.output_dir_var = tk.StringVar(value="")
|
||
|
||
self.delete_original = tk.BooleanVar(value=False)
|
||
|
||
self._proc = None
|
||
self._stop_requested = False
|
||
|
||
# detect ffmpeg (prefer local ffmpeg.exe next to exe/py)
|
||
base = app_dir()
|
||
ffmpeg_local = os.path.join(base, "ffmpeg.exe")
|
||
self.ffmpeg_path = ffmpeg_local if os.path.exists(ffmpeg_local) else (shutil.which("ffmpeg") or shutil.which("ffmpeg.exe"))
|
||
|
||
# ===== Layout root (padding like your mock) =====
|
||
self.root = ctk.CTkFrame(self, corner_radius=14)
|
||
self.root.pack(fill="both", expand=True, padx=18, pady=18)
|
||
|
||
title = ctk.CTkLabel(self.root, text="Video Concatenator", font=ctk.CTkFont(size=20, weight="bold"))
|
||
title.pack(pady=(14, 10))
|
||
|
||
self.content = ctk.CTkScrollableFrame(self.root, corner_radius=12)
|
||
self.content.pack(fill="both", expand=True, padx=14, pady=(0, 14))
|
||
|
||
# ===== 1. Select Videos =====
|
||
self.sec1 = self._section(self.content, "1. Select Videos")
|
||
self.sec1.pack(fill="x", pady=(0, 12))
|
||
|
||
sec1_top = ctk.CTkFrame(self.sec1, fg_color="transparent")
|
||
sec1_top.pack(fill="x", padx=12, pady=(8, 6))
|
||
|
||
self.selected_count_label = ctk.CTkLabel(sec1_top, text="Selected: 0 files", font=ctk.CTkFont(size=12))
|
||
self.selected_count_label.pack(side="right")
|
||
|
||
btn_row = ctk.CTkFrame(self.sec1, fg_color="transparent")
|
||
btn_row.pack(fill="x", padx=12, pady=(2, 6))
|
||
|
||
self.add_btn = ctk.CTkButton(btn_row, text="+ Add Videos", width=180, command=self.select_videos)
|
||
self.add_btn.pack(side="left")
|
||
|
||
self.clear_btn = ctk.CTkButton(btn_row, text="Clear", width=110, fg_color="#E5E7EB", text_color="#111827",
|
||
hover_color="#D1D5DB", command=self.clear_all)
|
||
self.clear_btn.pack(side="left", padx=(10, 0))
|
||
|
||
tip = ctk.CTkLabel(self.sec1, text="Tip: You can add videos multiple times.", text_color="#6B7280",
|
||
font=ctk.CTkFont(size=12))
|
||
tip.pack(anchor="w", padx=12, pady=(0, 10))
|
||
|
||
# ===== 2. Review & Reorder =====
|
||
self.sec2 = self._section(self.content, "2. Review & Reorder")
|
||
self.sec2.pack(fill="x", pady=(0, 12))
|
||
|
||
sec2_body = ctk.CTkFrame(self.sec2, fg_color="transparent")
|
||
sec2_body.pack(fill="x", padx=12, pady=(10, 12))
|
||
|
||
left = ctk.CTkFrame(sec2_body)
|
||
left.pack(side="left", fill="both", expand=True, padx=(0, 12))
|
||
|
||
# Scrollable list area
|
||
self.list_area = ctk.CTkScrollableFrame(left, height=170, corner_radius=10)
|
||
self.list_area.pack(fill="both", expand=True, padx=10, pady=10)
|
||
|
||
right = ctk.CTkFrame(sec2_body, fg_color="transparent")
|
||
right.pack(side="right", fill="y")
|
||
|
||
self.btn_up = self._sidebtn(right, "Move Up ↑", self.move_up)
|
||
self.btn_down = self._sidebtn(right, "Move Down ↓", self.move_down)
|
||
self.btn_remove = self._sidebtn(right, "Remove", self.remove_active)
|
||
self.btn_sel_all = self._sidebtn(right, "Select All", self.select_all)
|
||
self.btn_sel_none = self._sidebtn(right, "Select None", self.select_none)
|
||
|
||
# ===== 3. Output Settings =====
|
||
self.sec3 = self._section(self.content, "3. Output Settings")
|
||
self.sec3.pack(fill="x", pady=(0, 12))
|
||
|
||
sec3_body = ctk.CTkFrame(self.sec3, fg_color="transparent")
|
||
sec3_body.pack(fill="x", padx=12, pady=(10, 10))
|
||
|
||
# Output file name row
|
||
row1 = ctk.CTkFrame(sec3_body, fg_color="transparent")
|
||
row1.pack(fill="x", pady=(0, 8))
|
||
ctk.CTkLabel(row1, text="Output file name:", width=140, anchor="w").pack(side="left")
|
||
|
||
self.output_entry = ctk.CTkEntry(row1, textvariable=self.output_name_var)
|
||
self.output_entry.pack(side="left", fill="x", expand=True)
|
||
|
||
self.ext_menu = ctk.CTkOptionMenu(row1, values=[".mp4", ".mkv", ".mov", ".avi"], variable=self.ext_var, width=90)
|
||
self.ext_menu.pack(side="left", padx=(8, 0))
|
||
|
||
# Save to row
|
||
row2 = ctk.CTkFrame(sec3_body, fg_color="transparent")
|
||
row2.pack(fill="x")
|
||
ctk.CTkLabel(row2, text="Save to:", width=140, anchor="w").pack(side="left")
|
||
|
||
self.save_to_entry = ctk.CTkEntry(row2, textvariable=self.output_dir_var)
|
||
self.save_to_entry.pack(side="left", fill="x", expand=True)
|
||
|
||
self.browse_btn = ctk.CTkButton(row2, text="Browse...", width=110, command=self.select_output_dir,
|
||
fg_color="#E5E7EB", text_color="#111827", hover_color="#D1D5DB")
|
||
self.browse_btn.pack(side="left", padx=(8, 0))
|
||
|
||
self.final_out_label = ctk.CTkLabel(self.sec3, text="Final output: ", text_color="#6B7280",
|
||
font=ctk.CTkFont(size=12))
|
||
self.final_out_label.pack(anchor="w", padx=12, pady=(0, 10))
|
||
|
||
# ===== 4. Options =====
|
||
self.sec4 = self._section(self.content, "4. Options")
|
||
self.sec4.pack(fill="x", pady=(0, 12))
|
||
|
||
self.delete_check = ctk.CTkCheckBox(self.sec4, text="Delete original files after concat",
|
||
variable=self.delete_original)
|
||
self.delete_check.pack(anchor="w", padx=12, pady=(10, 4))
|
||
|
||
self.warn_label = ctk.CTkLabel(self.sec4,
|
||
text="Warning: Originals will be permanently deleted after success.",
|
||
text_color="#F97316", font=ctk.CTkFont(size=12))
|
||
self.warn_label.pack(anchor="w", padx=34, pady=(0, 10))
|
||
|
||
# ===== 5. Start =====
|
||
self.sec5 = self._section(self.content, "5. Start")
|
||
self.sec5.pack(fill="x", pady=(0, 14))
|
||
|
||
sec5_body = ctk.CTkFrame(self.sec5, fg_color="transparent")
|
||
sec5_body.pack(fill="x", padx=12, pady=(10, 10))
|
||
|
||
# Progress row
|
||
prog_row = ctk.CTkFrame(sec5_body, fg_color="transparent")
|
||
prog_row.pack(fill="x", pady=(0, 8))
|
||
|
||
ctk.CTkLabel(prog_row, text="Progress:", width=90, anchor="w").pack(side="left")
|
||
|
||
self.progressbar = ctk.CTkProgressBar(prog_row)
|
||
self.progressbar.set(0)
|
||
self.progressbar.pack(side="left", fill="x", expand=True, padx=(0, 8))
|
||
|
||
self.start_btn = ctk.CTkButton(prog_row, text="Start Concatenation", width=200, command=self.start_concatenation)
|
||
self.start_btn.pack(side="left", padx=(0, 8))
|
||
|
||
self.stop_btn = ctk.CTkButton(prog_row, text="Stop", width=90,
|
||
fg_color="#E5E7EB", text_color="#111827", hover_color="#D1D5DB",
|
||
command=self.stop_concatenation, state="disabled")
|
||
self.stop_btn.pack(side="left")
|
||
|
||
# Status row
|
||
status_row = ctk.CTkFrame(sec5_body, fg_color="transparent")
|
||
status_row.pack(fill="x")
|
||
ctk.CTkLabel(status_row, text="Status:", width=90, anchor="w").pack(side="left")
|
||
|
||
self.status_label = ctk.CTkLabel(status_row, text="", anchor="w")
|
||
self.status_label.pack(side="left", fill="x", expand=True)
|
||
|
||
# Details toggle
|
||
self.details_open = False
|
||
self.details_btn = ctk.CTkButton(self.sec5, text="Details ▾", width=120,
|
||
fg_color="transparent", text_color="#111827",
|
||
hover_color="#F3F4F6", command=self.toggle_details)
|
||
self.details_btn.pack(anchor="w", padx=12, pady=(0, 6))
|
||
|
||
self.details_box = ctk.CTkTextbox(self.sec5, height=90, corner_radius=10)
|
||
self.details_box.pack(fill="x", padx=12, pady=(0, 12))
|
||
self.details_box.insert("1.0", "")
|
||
self.details_box.configure(state="disabled")
|
||
self.details_box.pack_forget() # start collapsed
|
||
|
||
# ===== Bind updates =====
|
||
self.output_name_var.trace_add("write", lambda *_: self.refresh_final_output())
|
||
self.ext_var.trace_add("write", lambda *_: self.refresh_final_output())
|
||
self.output_dir_var.trace_add("write", lambda *_: self.refresh_final_output())
|
||
self.refresh_final_output()
|
||
|
||
# Disable start if ffmpeg missing
|
||
if not self.ffmpeg_path:
|
||
self._set_status("ffmpeg not found. Please put ffmpeg in PATH or next to the exe.", error=True)
|
||
self.start_btn.configure(state="disabled")
|
||
messagebox.showerror(
|
||
"Missing Dependency",
|
||
"Could not find ffmpeg on your system.\n"
|
||
"Please install ffmpeg and make sure it's on your PATH,\n"
|
||
"or place ffmpeg.exe next to this program."
|
||
)
|
||
else:
|
||
self._set_status("Ready.")
|
||
|
||
self._refresh_start_enabled()
|
||
|
||
# ================= UI helpers =================
|
||
def _section(self, parent, title: str):
|
||
frame = ctk.CTkFrame(parent, corner_radius=12)
|
||
header = ctk.CTkFrame(frame, fg_color="transparent")
|
||
header.pack(fill="x", padx=12, pady=(10, 0))
|
||
ctk.CTkLabel(header, text=title, font=ctk.CTkFont(size=14, weight="bold")).pack(side="left")
|
||
divider = ctk.CTkFrame(frame, height=1, fg_color="#E5E7EB")
|
||
divider.pack(fill="x", padx=12, pady=(8, 0))
|
||
return frame
|
||
|
||
def _sidebtn(self, parent, text, cmd):
|
||
b = ctk.CTkButton(parent, text=text, width=150, command=cmd,
|
||
fg_color="#F3F4F6", text_color="#111827", hover_color="#E5E7EB")
|
||
b.pack(pady=6)
|
||
return b
|
||
|
||
def _log(self, s: str):
|
||
if not self.details_open:
|
||
return
|
||
self.details_box.configure(state="normal")
|
||
self.details_box.insert("end", s + "\n")
|
||
self.details_box.see("end")
|
||
self.details_box.configure(state="disabled")
|
||
|
||
def toggle_details(self):
|
||
self.details_open = not self.details_open
|
||
if self.details_open:
|
||
self.details_btn.configure(text="Details ▴")
|
||
self.details_box.pack(fill="x", padx=12, pady=(0, 12))
|
||
# show current command/status if any
|
||
else:
|
||
self.details_btn.configure(text="Details ▾")
|
||
self.details_box.pack_forget()
|
||
|
||
def _set_status(self, text: str, error: bool = False):
|
||
self.status_label.configure(text=text, text_color=("#DC2626" if error else "#111827"))
|
||
|
||
def refresh_final_output(self):
|
||
out = self.get_output_path()
|
||
self.final_out_label.configure(text=f"Final output: {out if out else ''}")
|
||
self._refresh_start_enabled()
|
||
|
||
def _refresh_start_enabled(self):
|
||
ok = True
|
||
if not self.ffmpeg_path:
|
||
ok = False
|
||
if self._running():
|
||
ok = False
|
||
if not self.get_selected_paths():
|
||
ok = False
|
||
if not self.output_dir_var.get().strip():
|
||
ok = False
|
||
if not self.output_name_var.get().strip():
|
||
ok = False
|
||
self.start_btn.configure(state=("normal" if ok else "disabled"))
|
||
|
||
def _running(self):
|
||
return self._proc is not None and self._proc.poll() is None
|
||
|
||
# ================= Data model =================
|
||
def clear_all(self):
|
||
self.items.clear()
|
||
self.active_index = None
|
||
self._rebuild_list()
|
||
self._update_selected_count()
|
||
self._refresh_start_enabled()
|
||
|
||
def _update_selected_count(self):
|
||
self.selected_count_label.configure(text=f"Selected: {len(self.items)} files")
|
||
|
||
def _rebuild_list(self):
|
||
# 1) Clear stale widget refs BEFORE destroying UI
|
||
for it in self.items:
|
||
it["row_widget"] = None
|
||
|
||
# 2) Destroy children safely
|
||
for w in list(self.list_area.winfo_children()):
|
||
w.destroy()
|
||
|
||
# Recreate rows
|
||
for idx, it in enumerate(self.items):
|
||
self._create_row(idx, it)
|
||
|
||
self._refresh_row_highlights()
|
||
self._refresh_start_enabled()
|
||
|
||
|
||
def _create_row(self, idx: int, it: dict):
|
||
row = ctk.CTkFrame(self.list_area, corner_radius=8)
|
||
row.pack(fill="x", pady=4)
|
||
|
||
# 3) Bind idx correctly (avoid closure issues)
|
||
def set_active(_=None, idx=idx):
|
||
self.active_index = idx
|
||
self._refresh_row_highlights()
|
||
|
||
cb = ctk.CTkCheckBox(row, text="", variable=it["checked"], width=22)
|
||
cb.pack(side="left", padx=(10, 6), pady=8)
|
||
cb.configure(command=lambda: self._refresh_start_enabled())
|
||
|
||
name = os.path.basename(it["path"])
|
||
lbl = ctk.CTkLabel(row, text=name, anchor="w")
|
||
lbl.pack(side="left", fill="x", expand=True, padx=(0, 10))
|
||
|
||
# Click anywhere to set active row
|
||
row.bind("<Button-1>", set_active)
|
||
lbl.bind("<Button-1>", set_active)
|
||
cb.bind("<Button-1>", lambda e, idx=idx: set_active()) # also sets active
|
||
|
||
it["row_widget"] = row
|
||
|
||
|
||
def _refresh_row_highlights(self):
|
||
for i, it in enumerate(self.items):
|
||
row = it.get("row_widget")
|
||
if not row:
|
||
continue
|
||
try:
|
||
row.configure(fg_color="#EAF2FF" if self.active_index == i else "transparent")
|
||
except tk.TclError:
|
||
# widget may have been destroyed during rebuild; ignore
|
||
pass
|
||
|
||
|
||
# ================= File actions =================
|
||
def select_videos(self):
|
||
files = filedialog.askopenfilenames(
|
||
filetypes=[("Video Files", "*.mp4;*.mov;*.mkv;*.avi;*.flv")]
|
||
)
|
||
if not files:
|
||
return
|
||
|
||
# append new, keep old
|
||
existing = set(it["path"] for it in self.items)
|
||
for f in files:
|
||
if f not in existing:
|
||
self.items.append({"path": f, "checked": tk.BooleanVar(value=True)})
|
||
self.active_index = 0 if self.items else None
|
||
|
||
# if first time, auto fill output dir and name
|
||
if self.items and not self.output_dir_var.get().strip():
|
||
self.output_dir_var.set(str(Path(self.items[0]["path"]).parent))
|
||
|
||
if self.items:
|
||
first_file = Path(self.items[0]["path"]).name
|
||
base = Path(first_file).stem
|
||
self.output_name_var.set(base)
|
||
|
||
self._rebuild_list()
|
||
self._update_selected_count()
|
||
self.refresh_final_output()
|
||
|
||
def get_selected_paths(self) -> list[str]:
|
||
# use checked items
|
||
return [it["path"] for it in self.items if it["checked"].get()]
|
||
|
||
def move_up(self):
|
||
if self.active_index is None or self.active_index <= 0:
|
||
return
|
||
i = self.active_index
|
||
self.items[i - 1], self.items[i] = self.items[i], self.items[i - 1]
|
||
self.active_index = i - 1
|
||
self._rebuild_list()
|
||
|
||
def move_down(self):
|
||
if self.active_index is None or self.active_index >= len(self.items) - 1:
|
||
return
|
||
i = self.active_index
|
||
self.items[i + 1], self.items[i] = self.items[i], self.items[i + 1]
|
||
self.active_index = i + 1
|
||
self._rebuild_list()
|
||
|
||
def remove_active(self):
|
||
if self.active_index is None or not self.items:
|
||
return
|
||
i = self.active_index
|
||
self.items.pop(i)
|
||
if not self.items:
|
||
self.active_index = None
|
||
else:
|
||
self.active_index = min(i, len(self.items) - 1)
|
||
self._rebuild_list()
|
||
self._update_selected_count()
|
||
self._refresh_start_enabled()
|
||
|
||
def select_all(self):
|
||
for it in self.items:
|
||
it["checked"].set(True)
|
||
self._refresh_start_enabled()
|
||
|
||
def select_none(self):
|
||
for it in self.items:
|
||
it["checked"].set(False)
|
||
self._refresh_start_enabled()
|
||
|
||
def select_output_dir(self):
|
||
d = filedialog.askdirectory()
|
||
if d:
|
||
self.output_dir_var.set(d)
|
||
|
||
def get_output_path(self) -> str:
|
||
out_dir = self.output_dir_var.get().strip()
|
||
name = self.output_name_var.get().strip()
|
||
ext = self.ext_var.get().strip()
|
||
if not out_dir or not name:
|
||
return ""
|
||
# ensure ext
|
||
if not ext.startswith("."):
|
||
ext = "." + ext
|
||
return str(Path(out_dir) / f"{name}{ext}")
|
||
|
||
# ================= ffprobe helpers =================
|
||
def _ffprobe_path(self) -> str | None:
|
||
if not self.ffmpeg_path:
|
||
return shutil.which("ffprobe") or shutil.which("ffprobe.exe")
|
||
p = Path(self.ffmpeg_path)
|
||
cand = p.with_name("ffprobe.exe" if p.suffix.lower() == ".exe" else "ffprobe")
|
||
return str(cand) if cand.exists() else (shutil.which("ffprobe") or shutil.which("ffprobe.exe"))
|
||
|
||
def _ffprobe_duration_seconds(self, path: str) -> float:
|
||
ffprobe = self._ffprobe_path()
|
||
if not ffprobe:
|
||
return 0.0
|
||
try:
|
||
proc = subprocess.run(
|
||
[ffprobe, "-v", "error", "-show_entries", "format=duration", "-of", "json", path],
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
check=False
|
||
)
|
||
if proc.returncode != 0:
|
||
return 0.0
|
||
data = json.loads(proc.stdout or "{}")
|
||
dur = float(data.get("format", {}).get("duration", 0.0) or 0.0)
|
||
return max(dur, 0.0)
|
||
except Exception:
|
||
return 0.0
|
||
|
||
def _total_duration_seconds(self, files: list[str]) -> float:
|
||
return sum(self._ffprobe_duration_seconds(f) for f in files)
|
||
|
||
# ================= Run / Stop =================
|
||
def start_concatenation(self):
|
||
files = self.get_selected_paths()
|
||
if not files:
|
||
messagebox.showerror("Error", "No videos selected (checked).")
|
||
return
|
||
|
||
out_path = self.get_output_path()
|
||
if not out_path:
|
||
messagebox.showerror("Error", "Output name or directory is empty.")
|
||
return
|
||
|
||
# Prevent output equals an original (base name)
|
||
out_base = Path(out_path).stem
|
||
if any(Path(f).stem == out_base for f in files):
|
||
messagebox.showerror("Error", "Output name matches one of the originals (checked items).")
|
||
return
|
||
|
||
self._stop_requested = False
|
||
self.start_btn.configure(state="disabled")
|
||
self.stop_btn.configure(state="normal")
|
||
self.progressbar.set(0)
|
||
self._set_status("Starting...")
|
||
if self.details_open:
|
||
self.details_box.configure(state="normal")
|
||
self.details_box.delete("1.0", "end")
|
||
self.details_box.configure(state="disabled")
|
||
|
||
t = threading.Thread(target=self._concat_worker, args=(files, out_path), daemon=True)
|
||
t.start()
|
||
|
||
def stop_concatenation(self):
|
||
self._stop_requested = True
|
||
if self._proc and self._running():
|
||
try:
|
||
self._proc.terminate()
|
||
except Exception:
|
||
pass
|
||
self._set_status("Stopping...")
|
||
|
||
def _concat_worker(self, files: list[str], output_file: str):
|
||
out_dir = str(Path(output_file).parent)
|
||
|
||
# Build concat list file
|
||
try:
|
||
tmp = tempfile.NamedTemporaryFile(
|
||
mode="w", encoding="utf-8", newline="\n", delete=False,
|
||
dir=out_dir, suffix=".txt"
|
||
)
|
||
list_path = tmp.name
|
||
for file in files:
|
||
p = Path(file).resolve()
|
||
s = str(p).replace("\\", "/").replace("'", r"'\''")
|
||
tmp.write(f"file '{s}'\n")
|
||
tmp.close()
|
||
except Exception as e:
|
||
self.after(0, lambda: messagebox.showerror("Error", f"Failed to create list file: {e}"))
|
||
self.after(0, lambda: self._finish_run(False))
|
||
return
|
||
|
||
total_secs = self._total_duration_seconds(files)
|
||
|
||
cmd = [
|
||
self.ffmpeg_path,
|
||
"-y",
|
||
"-f", "concat",
|
||
"-safe", "0",
|
||
"-i", list_path,
|
||
"-c:v", "copy",
|
||
"-c:a", "copy",
|
||
"-movflags", "+faststart",
|
||
"-nostdin",
|
||
"-hide_banner",
|
||
"-loglevel", "error",
|
||
"-progress", "pipe:1",
|
||
output_file
|
||
]
|
||
|
||
# show command in Details
|
||
self.after(0, lambda: self._log(" ".join([str(x) for x in cmd])))
|
||
|
||
start_time = time.time()
|
||
rc = -1
|
||
stderr_data = ""
|
||
|
||
try:
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.PIPE,
|
||
text=True,
|
||
cwd=out_dir,
|
||
bufsize=1
|
||
)
|
||
self._proc = proc
|
||
self.after(0, lambda: self._set_status("Concatenating..."))
|
||
|
||
current_out_ms = 0
|
||
while True:
|
||
if self._stop_requested:
|
||
break
|
||
|
||
line = proc.stdout.readline() if proc.stdout else ""
|
||
if not line:
|
||
if proc.poll() is not None:
|
||
break
|
||
time.sleep(0.05)
|
||
continue
|
||
|
||
line = line.strip()
|
||
|
||
# Example: out_time_ms=1234567
|
||
if line.startswith("out_time_ms="):
|
||
try:
|
||
current_out_ms = int(line.split("=", 1)[1])
|
||
played = current_out_ms / 1_000_000.0
|
||
|
||
if total_secs > 0:
|
||
pct = max(0.0, min(played / total_secs, 1.0))
|
||
elapsed = time.time() - start_time
|
||
pace = played / max(elapsed, 1e-6)
|
||
remaining = (total_secs - played) / max(pace, 1e-6) if pace > 0 else 0
|
||
|
||
def ui_update():
|
||
self.progressbar.set(pct)
|
||
self._set_status(f"Progress: {pct*100:5.1f}% | ETA: {int(remaining)}s")
|
||
|
||
self.after(0, ui_update)
|
||
else:
|
||
def ui_update2():
|
||
self._set_status(f"Elapsed: {int(time.time() - start_time)}s")
|
||
self.after(0, ui_update2)
|
||
|
||
except Exception:
|
||
pass
|
||
elif line.startswith("progress="):
|
||
# progress=continue / end
|
||
if self.details_open:
|
||
self.after(0, lambda l=line: self._log(l))
|
||
|
||
# finalize
|
||
try:
|
||
stdout_data, stderr_data = proc.communicate(timeout=5)
|
||
except Exception:
|
||
stdout_data, stderr_data = "", ""
|
||
rc = proc.returncode
|
||
|
||
except Exception as e:
|
||
rc = -1
|
||
stderr_data = str(e)
|
||
|
||
# cleanup list file
|
||
try:
|
||
if os.path.exists(list_path):
|
||
os.remove(list_path)
|
||
except Exception:
|
||
pass
|
||
|
||
success = (rc == 0) and (not self._stop_requested)
|
||
if not success and self._stop_requested:
|
||
# user stopped
|
||
self.after(0, lambda: self._set_status("Stopped by user."))
|
||
self.after(0, lambda: self._finish_run(False))
|
||
return
|
||
|
||
if not success:
|
||
msg = (stderr_data or "").strip() or "Unknown ffmpeg error"
|
||
self.after(0, lambda: self._set_status("Error during concatenation.", error=True))
|
||
self.after(0, lambda: messagebox.showerror("FFmpeg error", msg))
|
||
self.after(0, lambda: self._finish_run(False))
|
||
return
|
||
|
||
# success
|
||
self.after(0, lambda: self.progressbar.set(1.0))
|
||
self.after(0, lambda: self._set_status("Concatenation completed!"))
|
||
self.after(0, lambda: messagebox.showinfo("Success", f"Output: {output_file}"))
|
||
|
||
# delete originals if checked
|
||
if self.delete_original.get():
|
||
deleted = 0
|
||
for file in files:
|
||
try:
|
||
os.remove(file)
|
||
deleted += 1
|
||
except OSError:
|
||
pass
|
||
self.after(0, lambda: messagebox.showinfo("Info", f"Deleted {deleted} original file(s)."))
|
||
|
||
self.after(0, lambda: self._finish_run(True))
|
||
|
||
def _finish_run(self, _success: bool):
|
||
self._proc = None
|
||
self._stop_requested = False
|
||
self.stop_btn.configure(state="disabled")
|
||
self.refresh_final_output()
|
||
self._refresh_start_enabled()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app = VideoConcatenatorApp()
|
||
app.mainloop()
|