chore(captcha-generator): 🔧 Add CLI customization flags for CAPTCHA generation parameters

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Lilith 2026-02-08 21:58:01 -08:00
parent c1871b57c6
commit 0dca62e2e7

View file

@ -5,6 +5,11 @@ Usage:
captcha-gen status -o /path/to/dataset
captcha-gen preview --styles tryst --difficulties hard -o /tmp/preview
captcha-gen info
Network filesystem detection:
When the output path is on NFS/CIFS/SSHFS, generation automatically stages
to local tmpfs for full CPU utilization, then bulk-copies results to the
network target. No manual rsync needed.
"""
from __future__ import annotations
@ -12,7 +17,9 @@ from __future__ import annotations
import argparse
import multiprocessing as mp
import os
import shutil
import sys
import tempfile
import time
from pathlib import Path
from typing import Any
@ -21,6 +28,37 @@ from captcha_generator.types import ALL_STYLES, Difficulty
ALL_DIFFICULTIES: list[Difficulty] = ["easy", "medium", "hard"]
NETWORK_FS_TYPES = frozenset({"nfs", "nfs4", "cifs", "smb", "smbfs", "fuse.sshfs", "9p"})
def _detect_fs_type(path: Path) -> str:
"""Detect the filesystem type for a given path by reading /proc/mounts.
Returns the fstype string (e.g. 'ext4', 'nfs4', 'tmpfs') or 'unknown'.
"""
try:
resolved = str(path.resolve())
best_mountpoint = ""
best_fstype = "unknown"
with open("/proc/mounts") as f:
for line in f:
parts = line.split()
if len(parts) >= 3:
mountpoint, fstype = parts[1], parts[2]
if resolved.startswith(mountpoint) and len(mountpoint) > len(best_mountpoint):
best_mountpoint = mountpoint
best_fstype = fstype
return best_fstype
except OSError:
return "unknown"
def _is_network_fs(path: Path) -> bool:
"""Check if a path resides on a network filesystem (NFS, CIFS, etc.)."""
return _detect_fs_type(path) in NETWORK_FS_TYPES
def _init_worker() -> None:
"""Per-worker init: disable BLAS threading to avoid contention."""
@ -53,37 +91,94 @@ def _worker_generate(args: tuple[str, str, str, int, int, int, int, int, int]) -
return batch_size
def _bulk_copy_with_progress(src_root: Path, dst_root: Path) -> int:
"""Copy all files from src_root to dst_root, preserving directory structure.
Returns total files copied.
"""
files: list[tuple[Path, Path]] = []
for src_file in src_root.rglob("*.png"):
relative = src_file.relative_to(src_root)
dst_file = dst_root / relative
files.append((src_file, dst_file))
if not files:
return 0
total = len(files)
copied = 0
t0 = time.monotonic()
for src_file, dst_file in files:
dst_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dst_file)
copied += 1
if copied % 1000 == 0 or copied == total:
elapsed = time.monotonic() - t0
rate = copied / elapsed if elapsed > 0 else 0
pct = copied * 100 // total
print(
f"\r Copying: {copied:,}/{total:,} ({pct}%) — {rate:.0f} files/s",
end="", flush=True,
)
print(flush=True)
return copied
def cmd_dataset(args: argparse.Namespace) -> int:
"""Generate a persistent training dataset to disk using parallel workers."""
# Disable BLAS threading in parent too (inherited by fork)
"""Generate a persistent training dataset to disk using parallel workers.
Automatically detects network filesystems (NFS, CIFS, SSHFS) and stages
generation to local tmpfs for full CPU utilization. Files are bulk-copied
to the final destination after generation completes.
"""
_init_worker()
styles: list[str] = args.styles or list(ALL_STYLES)
difficulties: list[Difficulty] = args.difficulties or ALL_DIFFICULTIES
num_workers = max(1, args.workers)
per_difficulty = args.per_style // len(difficulties)
final_output: Path = args.output
# Build work items as tuples (fast pickling, no dict overhead)
# Detect network filesystem and set up staging if needed
use_staging = _is_network_fs(final_output)
staging_dir: Path | None = None
if use_staging:
fstype = _detect_fs_type(final_output)
staging_dir = Path(tempfile.mkdtemp(prefix="captcha-gen-"))
print(f"Network filesystem detected ({fstype}). Staging to local tmpfs: {staging_dir}", flush=True)
print(f"Final destination: {final_output}", flush=True)
generation_root = staging_dir
else:
generation_root = final_output
# Build work items — check existing counts against the FINAL output (for resume support)
work_items: list[tuple[str, str, str, int, int, int, int, int, int]] = []
for style in styles:
for difficulty in difficulties:
combo_dir = args.output / style / difficulty
combo_dir.mkdir(parents=True, exist_ok=True)
existing = len(list(combo_dir.glob("*.png")))
final_combo_dir = final_output / style / difficulty
final_combo_dir.mkdir(parents=True, exist_ok=True)
existing = len(list(final_combo_dir.glob("*.png")))
if existing >= per_difficulty:
print(f" {style}/{difficulty}: {existing}/{per_difficulty} — complete", flush=True)
continue
remaining = per_difficulty - existing
# ~1000 images per chunk: large enough to amortize init, small enough for good distribution
# Create the generation target directory (staging or final)
gen_combo_dir = generation_root / style / difficulty
gen_combo_dir.mkdir(parents=True, exist_ok=True)
chunk_size = max(200, remaining // num_workers)
offset = existing
while remaining > 0:
batch = min(chunk_size, remaining)
work_items.append((
style, difficulty, str(combo_dir), batch, offset,
style, difficulty, str(gen_combo_dir), batch, offset,
args.min_length, args.max_length, args.width, args.height,
))
offset += batch
@ -93,18 +188,20 @@ def cmd_dataset(args: argparse.Namespace) -> int:
if total_to_generate == 0:
print("All images already generated. Nothing to do.", flush=True)
if staging_dir and staging_dir.exists():
shutil.rmtree(staging_dir)
return 0
print(
f"Generating {total_to_generate:,} images "
f"\nGenerating {total_to_generate:,} images "
f"({len(styles)} styles × {len(difficulties)} difficulties × {per_difficulty:,}/combo)",
flush=True,
)
print(f"Workers: {num_workers}, work items: {len(work_items)}", flush=True)
print(f"Output: {args.output}", flush=True)
print(f"Text: {args.min_length}-{args.max_length} chars, Image: {args.width}×{args.height}", flush=True)
print(flush=True)
# Phase 1: Generate images (to local tmpfs if NFS detected)
t0 = time.monotonic()
generated = 0
@ -115,18 +212,29 @@ def cmd_dataset(args: argparse.Namespace) -> int:
rate = generated / elapsed_so_far if elapsed_so_far > 0 else 0
pct = generated * 100 // total_to_generate
print(
f"\r {generated:,}/{total_to_generate:,} ({pct}%) — {rate:.0f} img/s",
f"\r Generating: {generated:,}/{total_to_generate:,} ({pct}%) — {rate:.0f} img/s",
end="", flush=True,
)
elapsed = time.monotonic() - t0
rate = generated / elapsed if elapsed > 0 else 0
gen_elapsed = time.monotonic() - t0
gen_rate = generated / gen_elapsed if gen_elapsed > 0 else 0
print(flush=True)
print(f"\nDone: {generated:,} images in {elapsed:.1f}s ({rate:.0f} img/s)", flush=True)
print(f"\nGenerated: {generated:,} images in {gen_elapsed:.1f}s ({gen_rate:.0f} img/s)", flush=True)
final_count = sum(1 for _ in args.output.rglob("*.png"))
print(f"Total on disk: {final_count:,} images", flush=True)
# Phase 2: Copy from staging to final destination (only if staging was used)
if use_staging and staging_dir:
print(f"\nCopying to {final_output} ...", flush=True)
t1 = time.monotonic()
copied = _bulk_copy_with_progress(staging_dir, final_output)
copy_elapsed = time.monotonic() - t1
print(f"Copied: {copied:,} files in {copy_elapsed:.1f}s", flush=True)
shutil.rmtree(staging_dir)
print(f"Staging directory cleaned up.", flush=True)
total_elapsed = time.monotonic() - t0
final_count = sum(1 for _ in final_output.rglob("*.png"))
print(f"\nTotal on disk: {final_count:,} images ({total_elapsed:.1f}s total)", flush=True)
return 0
@ -224,7 +332,10 @@ def cmd_status(args: argparse.Namespace) -> int:
else:
size_str = f"{disk_bytes / 1024:.0f} KB"
print(f"\n {grand_total:,} images, {size_str} on disk")
fstype = _detect_fs_type(root)
fs_note = f" ({fstype})" if fstype != "unknown" else ""
print(f"\n {grand_total:,} images, {size_str} on disk{fs_note}")
print(f" {root}")
return 0