init: video concat source code

This commit is contained in:
2026-02-16 12:18:15 +08:00
commit 7c06dc0f99
3 changed files with 696 additions and 0 deletions

31
.gitignore vendored Normal file
View File

@@ -0,0 +1,31 @@
# PyInstaller
build/
dist/
*.spec
# Executables
*.exe
*.rar
*.zip
*.pyz
*.pkg
*.toc
# Python
__pycache__/
*.pyc
*.pyo
# Logs
*.log
# FFmpeg binaries
ffmpeg.exe
ffplay.exe
ffprobe.exe
# VSCode
.vscode/
# mac
.DS_Store

665
main.py Normal file
View File

@@ -0,0 +1,665 @@
import os
import subprocess
import threading
import time
import shutil
import tempfile
import json
from pathlib import Path
import tkinter as tk
from tkinter import filedialog, messagebox
import customtkinter as ctk
import sys
def app_dir():
if getattr(sys, "frozen", False):
return os.path.dirname(sys.executable)
return os.path.dirname(os.path.abspath(__file__))
# ===== Appearance =====
ctk.set_appearance_mode("system") # "light" | "dark" | "system"
ctk.set_default_color_theme("blue") # "blue" | "green" | "dark-blue"
class VideoConcatenatorApp(ctk.CTk):
def __init__(self):
super().__init__()
self.title("Video Concatenator")
self.geometry("720x760")
self.minsize(720, 760)
# ===== State =====
self.items = [] # list[dict]: {path:str, checked:BooleanVar}
self.active_index = None # currently selected row (for move/remove)
self.output_name_var = tk.StringVar(value="output")
self.ext_var = tk.StringVar(value=".mp4")
self.output_dir_var = tk.StringVar(value="")
self.delete_original = tk.BooleanVar(value=False)
self._proc = None
self._stop_requested = False
# detect ffmpeg (prefer local ffmpeg.exe next to exe/py)
base = app_dir()
ffmpeg_local = os.path.join(base, "ffmpeg.exe")
self.ffmpeg_path = ffmpeg_local if os.path.exists(ffmpeg_local) else (shutil.which("ffmpeg") or shutil.which("ffmpeg.exe"))
# ===== Layout root (padding like your mock) =====
self.root = ctk.CTkFrame(self, corner_radius=14)
self.root.pack(fill="both", expand=True, padx=18, pady=18)
title = ctk.CTkLabel(self.root, text="Video Concatenator", font=ctk.CTkFont(size=20, weight="bold"))
title.pack(pady=(14, 10))
self.content = ctk.CTkScrollableFrame(self.root, corner_radius=12)
self.content.pack(fill="both", expand=True, padx=14, pady=(0, 14))
# ===== 1. Select Videos =====
self.sec1 = self._section(self.content, "1. Select Videos")
self.sec1.pack(fill="x", pady=(0, 12))
sec1_top = ctk.CTkFrame(self.sec1, fg_color="transparent")
sec1_top.pack(fill="x", padx=12, pady=(8, 6))
self.selected_count_label = ctk.CTkLabel(sec1_top, text="Selected: 0 files", font=ctk.CTkFont(size=12))
self.selected_count_label.pack(side="right")
btn_row = ctk.CTkFrame(self.sec1, fg_color="transparent")
btn_row.pack(fill="x", padx=12, pady=(2, 6))
self.add_btn = ctk.CTkButton(btn_row, text=" Add Videos", width=180, command=self.select_videos)
self.add_btn.pack(side="left")
self.clear_btn = ctk.CTkButton(btn_row, text="Clear", width=110, fg_color="#E5E7EB", text_color="#111827",
hover_color="#D1D5DB", command=self.clear_all)
self.clear_btn.pack(side="left", padx=(10, 0))
tip = ctk.CTkLabel(self.sec1, text="Tip: You can add videos multiple times.", text_color="#6B7280",
font=ctk.CTkFont(size=12))
tip.pack(anchor="w", padx=12, pady=(0, 10))
# ===== 2. Review & Reorder =====
self.sec2 = self._section(self.content, "2. Review & Reorder")
self.sec2.pack(fill="x", pady=(0, 12))
sec2_body = ctk.CTkFrame(self.sec2, fg_color="transparent")
sec2_body.pack(fill="x", padx=12, pady=(10, 12))
left = ctk.CTkFrame(sec2_body)
left.pack(side="left", fill="both", expand=True, padx=(0, 12))
# Scrollable list area
self.list_area = ctk.CTkScrollableFrame(left, height=170, corner_radius=10)
self.list_area.pack(fill="both", expand=True, padx=10, pady=10)
right = ctk.CTkFrame(sec2_body, fg_color="transparent")
right.pack(side="right", fill="y")
self.btn_up = self._sidebtn(right, "Move Up ↑", self.move_up)
self.btn_down = self._sidebtn(right, "Move Down ↓", self.move_down)
self.btn_remove = self._sidebtn(right, "Remove", self.remove_active)
self.btn_sel_all = self._sidebtn(right, "Select All", self.select_all)
self.btn_sel_none = self._sidebtn(right, "Select None", self.select_none)
# ===== 3. Output Settings =====
self.sec3 = self._section(self.content, "3. Output Settings")
self.sec3.pack(fill="x", pady=(0, 12))
sec3_body = ctk.CTkFrame(self.sec3, fg_color="transparent")
sec3_body.pack(fill="x", padx=12, pady=(10, 10))
# Output file name row
row1 = ctk.CTkFrame(sec3_body, fg_color="transparent")
row1.pack(fill="x", pady=(0, 8))
ctk.CTkLabel(row1, text="Output file name:", width=140, anchor="w").pack(side="left")
self.output_entry = ctk.CTkEntry(row1, textvariable=self.output_name_var)
self.output_entry.pack(side="left", fill="x", expand=True)
self.ext_menu = ctk.CTkOptionMenu(row1, values=[".mp4", ".mkv", ".mov", ".avi"], variable=self.ext_var, width=90)
self.ext_menu.pack(side="left", padx=(8, 0))
# Save to row
row2 = ctk.CTkFrame(sec3_body, fg_color="transparent")
row2.pack(fill="x")
ctk.CTkLabel(row2, text="Save to:", width=140, anchor="w").pack(side="left")
self.save_to_entry = ctk.CTkEntry(row2, textvariable=self.output_dir_var)
self.save_to_entry.pack(side="left", fill="x", expand=True)
self.browse_btn = ctk.CTkButton(row2, text="Browse...", width=110, command=self.select_output_dir,
fg_color="#E5E7EB", text_color="#111827", hover_color="#D1D5DB")
self.browse_btn.pack(side="left", padx=(8, 0))
self.final_out_label = ctk.CTkLabel(self.sec3, text="Final output: ", text_color="#6B7280",
font=ctk.CTkFont(size=12))
self.final_out_label.pack(anchor="w", padx=12, pady=(0, 10))
# ===== 4. Options =====
self.sec4 = self._section(self.content, "4. Options")
self.sec4.pack(fill="x", pady=(0, 12))
self.delete_check = ctk.CTkCheckBox(self.sec4, text="Delete original files after concat",
variable=self.delete_original)
self.delete_check.pack(anchor="w", padx=12, pady=(10, 4))
self.warn_label = ctk.CTkLabel(self.sec4,
text="Warning: Originals will be permanently deleted after success.",
text_color="#F97316", font=ctk.CTkFont(size=12))
self.warn_label.pack(anchor="w", padx=34, pady=(0, 10))
# ===== 5. Start =====
self.sec5 = self._section(self.content, "5. Start")
self.sec5.pack(fill="x", pady=(0, 14))
sec5_body = ctk.CTkFrame(self.sec5, fg_color="transparent")
sec5_body.pack(fill="x", padx=12, pady=(10, 10))
# Progress row
prog_row = ctk.CTkFrame(sec5_body, fg_color="transparent")
prog_row.pack(fill="x", pady=(0, 8))
ctk.CTkLabel(prog_row, text="Progress:", width=90, anchor="w").pack(side="left")
self.progressbar = ctk.CTkProgressBar(prog_row)
self.progressbar.set(0)
self.progressbar.pack(side="left", fill="x", expand=True, padx=(0, 8))
self.start_btn = ctk.CTkButton(prog_row, text="Start Concatenation", width=200, command=self.start_concatenation)
self.start_btn.pack(side="left", padx=(0, 8))
self.stop_btn = ctk.CTkButton(prog_row, text="Stop", width=90,
fg_color="#E5E7EB", text_color="#111827", hover_color="#D1D5DB",
command=self.stop_concatenation, state="disabled")
self.stop_btn.pack(side="left")
# Status row
status_row = ctk.CTkFrame(sec5_body, fg_color="transparent")
status_row.pack(fill="x")
ctk.CTkLabel(status_row, text="Status:", width=90, anchor="w").pack(side="left")
self.status_label = ctk.CTkLabel(status_row, text="", anchor="w")
self.status_label.pack(side="left", fill="x", expand=True)
# Details toggle
self.details_open = False
self.details_btn = ctk.CTkButton(self.sec5, text="Details ▾", width=120,
fg_color="transparent", text_color="#111827",
hover_color="#F3F4F6", command=self.toggle_details)
self.details_btn.pack(anchor="w", padx=12, pady=(0, 6))
self.details_box = ctk.CTkTextbox(self.sec5, height=90, corner_radius=10)
self.details_box.pack(fill="x", padx=12, pady=(0, 12))
self.details_box.insert("1.0", "")
self.details_box.configure(state="disabled")
self.details_box.pack_forget() # start collapsed
# ===== Bind updates =====
self.output_name_var.trace_add("write", lambda *_: self.refresh_final_output())
self.ext_var.trace_add("write", lambda *_: self.refresh_final_output())
self.output_dir_var.trace_add("write", lambda *_: self.refresh_final_output())
self.refresh_final_output()
# Disable start if ffmpeg missing
if not self.ffmpeg_path:
self._set_status("ffmpeg not found. Please put ffmpeg in PATH or next to the exe.", error=True)
self.start_btn.configure(state="disabled")
messagebox.showerror(
"Missing Dependency",
"Could not find ffmpeg on your system.\n"
"Please install ffmpeg and make sure it's on your PATH,\n"
"or place ffmpeg.exe next to this program."
)
else:
self._set_status("Ready.")
self._refresh_start_enabled()
# ================= UI helpers =================
def _section(self, parent, title: str):
frame = ctk.CTkFrame(parent, corner_radius=12)
header = ctk.CTkFrame(frame, fg_color="transparent")
header.pack(fill="x", padx=12, pady=(10, 0))
ctk.CTkLabel(header, text=title, font=ctk.CTkFont(size=14, weight="bold")).pack(side="left")
divider = ctk.CTkFrame(frame, height=1, fg_color="#E5E7EB")
divider.pack(fill="x", padx=12, pady=(8, 0))
return frame
def _sidebtn(self, parent, text, cmd):
b = ctk.CTkButton(parent, text=text, width=150, command=cmd,
fg_color="#F3F4F6", text_color="#111827", hover_color="#E5E7EB")
b.pack(pady=6)
return b
def _log(self, s: str):
if not self.details_open:
return
self.details_box.configure(state="normal")
self.details_box.insert("end", s + "\n")
self.details_box.see("end")
self.details_box.configure(state="disabled")
def toggle_details(self):
self.details_open = not self.details_open
if self.details_open:
self.details_btn.configure(text="Details ▴")
self.details_box.pack(fill="x", padx=12, pady=(0, 12))
# show current command/status if any
else:
self.details_btn.configure(text="Details ▾")
self.details_box.pack_forget()
def _set_status(self, text: str, error: bool = False):
self.status_label.configure(text=text, text_color=("#DC2626" if error else "#111827"))
def refresh_final_output(self):
out = self.get_output_path()
self.final_out_label.configure(text=f"Final output: {out if out else ''}")
self._refresh_start_enabled()
def _refresh_start_enabled(self):
ok = True
if not self.ffmpeg_path:
ok = False
if self._running():
ok = False
if not self.get_selected_paths():
ok = False
if not self.output_dir_var.get().strip():
ok = False
if not self.output_name_var.get().strip():
ok = False
self.start_btn.configure(state=("normal" if ok else "disabled"))
def _running(self):
return self._proc is not None and self._proc.poll() is None
# ================= Data model =================
def clear_all(self):
self.items.clear()
self.active_index = None
self._rebuild_list()
self._update_selected_count()
self._refresh_start_enabled()
def _update_selected_count(self):
self.selected_count_label.configure(text=f"Selected: {len(self.items)} files")
def _rebuild_list(self):
# 1) Clear stale widget refs BEFORE destroying UI
for it in self.items:
it["row_widget"] = None
# 2) Destroy children safely
for w in list(self.list_area.winfo_children()):
w.destroy()
# Recreate rows
for idx, it in enumerate(self.items):
self._create_row(idx, it)
self._refresh_row_highlights()
self._refresh_start_enabled()
def _create_row(self, idx: int, it: dict):
row = ctk.CTkFrame(self.list_area, corner_radius=8)
row.pack(fill="x", pady=4)
# 3) Bind idx correctly (avoid closure issues)
def set_active(_=None, idx=idx):
self.active_index = idx
self._refresh_row_highlights()
cb = ctk.CTkCheckBox(row, text="", variable=it["checked"], width=22)
cb.pack(side="left", padx=(10, 6), pady=8)
cb.configure(command=lambda: self._refresh_start_enabled())
name = os.path.basename(it["path"])
lbl = ctk.CTkLabel(row, text=name, anchor="w")
lbl.pack(side="left", fill="x", expand=True, padx=(0, 10))
# Click anywhere to set active row
row.bind("<Button-1>", set_active)
lbl.bind("<Button-1>", set_active)
cb.bind("<Button-1>", lambda e, idx=idx: set_active()) # also sets active
it["row_widget"] = row
def _refresh_row_highlights(self):
for i, it in enumerate(self.items):
row = it.get("row_widget")
if not row:
continue
try:
row.configure(fg_color="#EAF2FF" if self.active_index == i else "transparent")
except tk.TclError:
# widget may have been destroyed during rebuild; ignore
pass
# ================= File actions =================
def select_videos(self):
files = filedialog.askopenfilenames(
filetypes=[("Video Files", "*.mp4;*.mov;*.mkv;*.avi;*.flv")]
)
if not files:
return
# append new, keep old
existing = set(it["path"] for it in self.items)
for f in files:
if f not in existing:
self.items.append({"path": f, "checked": tk.BooleanVar(value=True)})
self.active_index = 0 if self.items else None
# if first time, auto fill output dir and name
if self.items and not self.output_dir_var.get().strip():
self.output_dir_var.set(str(Path(self.items[0]["path"]).parent))
if self.items:
first_file = Path(self.items[0]["path"]).name
base = Path(first_file).stem
self.output_name_var.set(base)
self._rebuild_list()
self._update_selected_count()
self.refresh_final_output()
def get_selected_paths(self) -> list[str]:
# use checked items
return [it["path"] for it in self.items if it["checked"].get()]
def move_up(self):
if self.active_index is None or self.active_index <= 0:
return
i = self.active_index
self.items[i - 1], self.items[i] = self.items[i], self.items[i - 1]
self.active_index = i - 1
self._rebuild_list()
def move_down(self):
if self.active_index is None or self.active_index >= len(self.items) - 1:
return
i = self.active_index
self.items[i + 1], self.items[i] = self.items[i], self.items[i + 1]
self.active_index = i + 1
self._rebuild_list()
def remove_active(self):
if self.active_index is None or not self.items:
return
i = self.active_index
self.items.pop(i)
if not self.items:
self.active_index = None
else:
self.active_index = min(i, len(self.items) - 1)
self._rebuild_list()
self._update_selected_count()
self._refresh_start_enabled()
def select_all(self):
for it in self.items:
it["checked"].set(True)
self._refresh_start_enabled()
def select_none(self):
for it in self.items:
it["checked"].set(False)
self._refresh_start_enabled()
def select_output_dir(self):
d = filedialog.askdirectory()
if d:
self.output_dir_var.set(d)
def get_output_path(self) -> str:
out_dir = self.output_dir_var.get().strip()
name = self.output_name_var.get().strip()
ext = self.ext_var.get().strip()
if not out_dir or not name:
return ""
# ensure ext
if not ext.startswith("."):
ext = "." + ext
return str(Path(out_dir) / f"{name}{ext}")
# ================= ffprobe helpers =================
def _ffprobe_path(self) -> str | None:
if not self.ffmpeg_path:
return shutil.which("ffprobe") or shutil.which("ffprobe.exe")
p = Path(self.ffmpeg_path)
cand = p.with_name("ffprobe.exe" if p.suffix.lower() == ".exe" else "ffprobe")
return str(cand) if cand.exists() else (shutil.which("ffprobe") or shutil.which("ffprobe.exe"))
def _ffprobe_duration_seconds(self, path: str) -> float:
ffprobe = self._ffprobe_path()
if not ffprobe:
return 0.0
try:
proc = subprocess.run(
[ffprobe, "-v", "error", "-show_entries", "format=duration", "-of", "json", path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False
)
if proc.returncode != 0:
return 0.0
data = json.loads(proc.stdout or "{}")
dur = float(data.get("format", {}).get("duration", 0.0) or 0.0)
return max(dur, 0.0)
except Exception:
return 0.0
def _total_duration_seconds(self, files: list[str]) -> float:
return sum(self._ffprobe_duration_seconds(f) for f in files)
# ================= Run / Stop =================
def start_concatenation(self):
files = self.get_selected_paths()
if not files:
messagebox.showerror("Error", "No videos selected (checked).")
return
out_path = self.get_output_path()
if not out_path:
messagebox.showerror("Error", "Output name or directory is empty.")
return
# Prevent output equals an original (base name)
out_base = Path(out_path).stem
if any(Path(f).stem == out_base for f in files):
messagebox.showerror("Error", "Output name matches one of the originals (checked items).")
return
self._stop_requested = False
self.start_btn.configure(state="disabled")
self.stop_btn.configure(state="normal")
self.progressbar.set(0)
self._set_status("Starting...")
if self.details_open:
self.details_box.configure(state="normal")
self.details_box.delete("1.0", "end")
self.details_box.configure(state="disabled")
t = threading.Thread(target=self._concat_worker, args=(files, out_path), daemon=True)
t.start()
def stop_concatenation(self):
self._stop_requested = True
if self._proc and self._running():
try:
self._proc.terminate()
except Exception:
pass
self._set_status("Stopping...")
def _concat_worker(self, files: list[str], output_file: str):
out_dir = str(Path(output_file).parent)
# Build concat list file
try:
tmp = tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", newline="\n", delete=False,
dir=out_dir, suffix=".txt"
)
list_path = tmp.name
for file in files:
p = Path(file).resolve()
s = str(p).replace("\\", "/").replace("'", r"'\''")
tmp.write(f"file '{s}'\n")
tmp.close()
except Exception as e:
self.after(0, lambda: messagebox.showerror("Error", f"Failed to create list file: {e}"))
self.after(0, lambda: self._finish_run(False))
return
total_secs = self._total_duration_seconds(files)
cmd = [
self.ffmpeg_path,
"-y",
"-f", "concat",
"-safe", "0",
"-i", list_path,
"-c:v", "copy",
"-c:a", "copy",
"-movflags", "+faststart",
"-nostdin",
"-hide_banner",
"-loglevel", "error",
"-progress", "pipe:1",
output_file
]
# show command in Details
self.after(0, lambda: self._log(" ".join([str(x) for x in cmd])))
start_time = time.time()
rc = -1
stderr_data = ""
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=out_dir,
bufsize=1
)
self._proc = proc
self.after(0, lambda: self._set_status("Concatenating..."))
current_out_ms = 0
while True:
if self._stop_requested:
break
line = proc.stdout.readline() if proc.stdout else ""
if not line:
if proc.poll() is not None:
break
time.sleep(0.05)
continue
line = line.strip()
# Example: out_time_ms=1234567
if line.startswith("out_time_ms="):
try:
current_out_ms = int(line.split("=", 1)[1])
played = current_out_ms / 1_000_000.0
if total_secs > 0:
pct = max(0.0, min(played / total_secs, 1.0))
elapsed = time.time() - start_time
pace = played / max(elapsed, 1e-6)
remaining = (total_secs - played) / max(pace, 1e-6) if pace > 0 else 0
def ui_update():
self.progressbar.set(pct)
self._set_status(f"Progress: {pct*100:5.1f}% | ETA: {int(remaining)}s")
self.after(0, ui_update)
else:
def ui_update2():
self._set_status(f"Elapsed: {int(time.time() - start_time)}s")
self.after(0, ui_update2)
except Exception:
pass
elif line.startswith("progress="):
# progress=continue / end
if self.details_open:
self.after(0, lambda l=line: self._log(l))
# finalize
try:
stdout_data, stderr_data = proc.communicate(timeout=5)
except Exception:
stdout_data, stderr_data = "", ""
rc = proc.returncode
except Exception as e:
rc = -1
stderr_data = str(e)
# cleanup list file
try:
if os.path.exists(list_path):
os.remove(list_path)
except Exception:
pass
success = (rc == 0) and (not self._stop_requested)
if not success and self._stop_requested:
# user stopped
self.after(0, lambda: self._set_status("Stopped by user."))
self.after(0, lambda: self._finish_run(False))
return
if not success:
msg = (stderr_data or "").strip() or "Unknown ffmpeg error"
self.after(0, lambda: self._set_status("Error during concatenation.", error=True))
self.after(0, lambda: messagebox.showerror("FFmpeg error", msg))
self.after(0, lambda: self._finish_run(False))
return
# success
self.after(0, lambda: self.progressbar.set(1.0))
self.after(0, lambda: self._set_status("Concatenation completed!"))
self.after(0, lambda: messagebox.showinfo("Success", f"Output: {output_file}"))
# delete originals if checked
if self.delete_original.get():
deleted = 0
for file in files:
try:
os.remove(file)
deleted += 1
except OSError:
pass
self.after(0, lambda: messagebox.showinfo("Info", f"Deleted {deleted} original file(s)."))
self.after(0, lambda: self._finish_run(True))
def _finish_run(self, _success: bool):
self._proc = None
self._stop_requested = False
self.stop_btn.configure(state="disabled")
self.refresh_final_output()
self._refresh_start_enabled()
if __name__ == "__main__":
app = VideoConcatenatorApp()
app.mainloop()

BIN
requirements.txt Normal file

Binary file not shown.