aboutsummaryrefslogtreecommitdiffstats
path: root/mediaconv.py
diff options
context:
space:
mode:
Diffstat (limited to 'mediaconv.py')
-rw-r--r--mediaconv.py598
1 files changed, 598 insertions, 0 deletions
diff --git a/mediaconv.py b/mediaconv.py
new file mode 100644
index 0000000..dd6954f
--- /dev/null
+++ b/mediaconv.py
@@ -0,0 +1,598 @@
+#!/usr/bin/env python3
+
+import argparse
+import os
+import subprocess
+import sys
+import logging
+import magic
+import concurrent.futures
+from pathlib import Path
+from typing import Optional, Dict, List, Tuple, Set
+from tqdm import tqdm
+from concurrent.futures import ThreadPoolExecutor
+
+def setup_logging(verbose: bool = False) -> logging.Logger:
+ logger = logging.getLogger('mediaconv')
+ logger.setLevel(logging.DEBUG if verbose else logging.INFO)
+
+ handler = logging.StreamHandler()
+ handler.setLevel(logging.DEBUG if verbose else logging.INFO)
+ formatter = logging.Formatter('%(levelname)s: %(message)s')
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+
+ return logger
+
+class MediaConverter:
+ SUPPORTED_FORMATS = {
+ 'audio': ['mp3', 'wav', 'flac', 'ogg', 'm4a'],
+ 'video': ['mp4', 'avi', 'mkv', 'mov', 'gif', 'webm']
+ }
+
+ DEFAULT_SETTINGS = {
+ 'audio': {
+ 'bitrate': '192k',
+ 'sample_rate': '44100',
+ 'channels': '2'
+ },
+ 'video': {
+ 'bitrate': '192k',
+ 'sample_rate': '44100',
+ 'channels': '2',
+ 'crf': '23',
+ 'preset': 'medium'
+ }
+ }
+
+ def __init__(self):
+ self.logger = setup_logging()
+
+ if not self._check_ffmpeg():
+ raise EnvironmentError("FFmpeg is not installed. Please install FFmpeg first.")
+ if not self._check_magic():
+ raise EnvironmentError("python-magic is not installed. Please install python-magic first.")
+
+ def _check_ffmpeg(self) -> bool:
+ try:
+ subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
+ return True
+ except (subprocess.SubprocessError, FileNotFoundError):
+ return False
+
+ def _check_magic(self) -> bool:
+ try:
+ magic.Magic(mime=True)
+ return True
+ except (ImportError, AttributeError):
+ return False
+
+ def _detect_media_type(self, file_path: str) -> Tuple[str, str]:
+ try:
+ mime = magic.Magic(mime=True)
+ file_type = mime.from_file(file_path)
+
+ if file_type.startswith('audio/'):
+ return 'audio', file_path.split('.')[-1].lower()
+ elif file_type.startswith('video/'):
+ return 'video', file_path.split('.')[-1].lower()
+ else:
+ raise ValueError(f"Unsupported file type: {file_type}")
+ except Exception as e:
+ raise ValueError(f"Error detecting file type: {str(e)}")
+
+ def _validate_input_file(self, input_file: str) -> None:
+ if not os.path.exists(input_file):
+ raise FileNotFoundError(f"Input file not found: {input_file}")
+ if not os.access(input_file, os.R_OK):
+ raise PermissionError(f"Cannot read input file: {input_file}")
+
+ def _validate_output_format(self, media_type: str, output_format: str) -> None:
+ output_format = output_format.lower()
+
+ if media_type == 'video' and output_format in self.SUPPORTED_FORMATS['audio']:
+ return
+
+ if output_format not in self.SUPPORTED_FORMATS[media_type]:
+ raise ValueError(f"Unsupported output format for {media_type}: {output_format}")
+
+ def _generate_output_filename(
+ self,
+ input_path: Path,
+ output_format: str,
+ output_name: Optional[str] = None,
+ add_suffix: bool = False,
+ output_dir: Optional[str] = None
+ ) -> Path:
+ if output_name:
+ output_path = Path(output_name).with_suffix(f'.{output_format}')
+ else:
+ stem = input_path.stem
+ if add_suffix:
+ stem = f"{stem}_converted"
+ output_path = input_path.with_name(f"{stem}.{output_format}")
+
+ if output_dir:
+ output_dir_path = Path(output_dir)
+ output_dir_path.mkdir(parents=True, exist_ok=True)
+ return output_dir_path / output_path.name
+
+ return output_path
+
+ def _build_ffmpeg_command(
+ self,
+ input_file: str,
+ output_file: str,
+ settings: Optional[Dict[str, str]] = None,
+ image_file: Optional[str] = None,
+ overwrite: bool = False
+ ) -> List[str]:
+ cmd = ['ffmpeg']
+ if overwrite:
+ cmd.append('-y')
+ output_file_str = str(output_file)
+
+ if image_file and output_file_str.endswith(('.mp4', '.mkv', '.avi', '.mov', '.webm')):
+ is_gif = image_file.lower().endswith('.gif')
+
+ if is_gif:
+ cmd.extend(['-stream_loop', '-1', '-i', image_file])
+ else:
+ cmd.extend(['-loop', '1', '-i', image_file])
+
+ cmd.extend(['-i', input_file])
+ cmd.extend(['-map', '0:v:0', '-map', '1:a:0'])
+ cmd.extend(['-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2'])
+
+ if is_gif:
+ cmd.extend(['-r', '25'])
+
+ if output_file_str.endswith('.webm'):
+ cmd.extend(['-c:v', 'libvpx-vp9'])
+ cmd.extend(['-crf', '30'])
+ cmd.extend(['-b:v', '0'])
+ cmd.extend(['-c:a', 'libopus'])
+ else:
+ cmd.extend(['-c:v', 'libx264', '-tune', 'stillimage'])
+ cmd.extend(['-c:a', 'aac'])
+
+ cmd.extend(['-pix_fmt', 'yuv420p', '-shortest'])
+ else:
+ cmd.extend(['-i', input_file])
+ if output_file_str.endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a')):
+ cmd.append('-vn')
+
+ if settings:
+ if output_file_str.endswith(('.mp3', '.wav', '.flac', '.ogg', '.m4a', '.webm')):
+ if output_file_str.endswith('.webm'):
+ cmd.extend(['-c:a', 'libopus'])
+ elif output_file_str.endswith('.mp3'):
+ cmd.extend(['-c:a', 'libmp3lame'])
+ elif output_file_str.endswith('.flac'):
+ cmd.extend(['-c:a', 'flac'])
+ elif output_file_str.endswith('.ogg'):
+ cmd.extend(['-c:a', 'libvorbis'])
+ elif output_file_str.endswith('.m4a'):
+ cmd.extend(['-c:a', 'aac'])
+
+ if 'audio_bitrate' in settings:
+ cmd.extend(['-b:a', settings['audio_bitrate']])
+ if 'sample_rate' in settings:
+ cmd.extend(['-ar', settings['sample_rate']])
+ if 'channels' in settings:
+ cmd.extend(['-ac', settings['channels']])
+
+ if output_file_str.endswith('.mp3') and 'quality' in settings:
+ cmd.extend(['-q:a', settings['quality']])
+ elif output_file_str.endswith('.ogg') and 'quality' in settings:
+ cmd.extend(['-q:a', settings['quality']])
+ elif output_file_str.endswith('.webm') and 'quality' in settings:
+ cmd.extend(['-vbr', settings['quality']])
+
+ elif output_file_str.endswith(('.mp4', '.mkv', '.avi', '.mov', '.webm')):
+ if output_file_str.endswith('.webm'):
+ cmd.extend(['-c:v', 'libvpx-vp9'])
+ else:
+ cmd.extend(['-c:v', 'libx264'])
+
+ if 'video_bitrate' in settings:
+ cmd.extend(['-b:v', settings['video_bitrate']])
+ if 'audio_bitrate' in settings:
+ cmd.extend(['-b:a', settings['audio_bitrate']])
+ if 'crf' in settings:
+ if output_file_str.endswith('.webm'):
+ crf = min(63, max(0, int(settings['crf'])))
+ cmd.extend(['-crf', str(crf)])
+ else:
+ crf = min(51, max(0, int(settings['crf'])))
+ cmd.extend(['-crf', str(crf)])
+ if 'preset' in settings:
+ if output_file_str.endswith('.webm'):
+ cmd.extend(['-cpu-used', settings['preset']])
+ else:
+ cmd.extend(['-preset', settings['preset']])
+
+ if output_file_str.endswith('.webm'):
+ if 'tile-columns' in settings:
+ cmd.extend(['-tile-columns', settings['tile-columns']])
+ if 'frame-parallel' in settings:
+ cmd.extend(['-frame-parallel', settings['frame-parallel']])
+ else:
+ if 'profile' in settings:
+ cmd.extend(['-profile:v', settings['profile']])
+ if 'level' in settings:
+ cmd.extend(['-level', settings['level']])
+
+ elif output_file_str.endswith('.gif'):
+ if 'duration' in settings:
+ cmd.extend(['-t', settings['duration']])
+ cmd.extend([
+ '-vf', 'fps=15,scale=320:-1:flags=lanczos,split[s0][s1];[s0]palettegen=max_colors=256[p];[s1][p]paletteuse=dither=bayer:bayer_scale=5:diff_mode=rectangle',
+ '-loop', '0'
+ ])
+
+ cmd.append(output_file_str)
+ return cmd
+
+ def convert(
+ self,
+ input_file: str,
+ output_format: str,
+ settings: Optional[Dict[str, str]] = None,
+ image_file: Optional[str] = None,
+ add_suffix: bool = False,
+ dry_run: bool = False,
+ overwrite: bool = False,
+ output_name: Optional[str] = None,
+ output_dir: Optional[str] = None
+ ) -> str:
+ try:
+ if not os.path.exists(input_file):
+ raise FileNotFoundError(f"Input file not found: {input_file}")
+
+ if image_file and not os.path.exists(image_file):
+ raise FileNotFoundError(f"Image file not found: {image_file}")
+
+ if output_format == 'gif':
+ try:
+ probe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'v', '-show_entries', 'stream=codec_type', '-of', 'default=noprint_wrappers=1:nokey=1', input_file]
+ result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
+ if not result.stdout.strip():
+ raise ValueError(f"Input file '{input_file}' does not contain any video stream")
+ except subprocess.CalledProcessError as e:
+ raise RuntimeError(f"Error checking video stream: {e.stderr}")
+
+ if output_format in ['mp3', 'wav', 'flac', 'ogg', 'm4a']:
+ try:
+ probe_cmd = ['ffprobe', '-v', 'error', '-select_streams', 'a', '-show_entries', 'stream=codec_type', '-of', 'default=noprint_wrappers=1:nokey=1', input_file]
+ result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
+ if not result.stdout.strip():
+ raise ValueError(f"Input file '{input_file}' does not contain any audio stream")
+ except subprocess.CalledProcessError as e:
+ raise RuntimeError(f"Error checking audio stream: {e.stderr}")
+
+ output_file = self._generate_output_filename(
+ Path(input_file),
+ output_format,
+ output_name=output_name,
+ add_suffix=add_suffix,
+ output_dir=output_dir
+ )
+
+ if os.path.exists(output_file) and not overwrite and not dry_run:
+ while True:
+ response = input(f"File {output_file} already exists. Overwrite? [y/N]: ").lower()
+ if response in ['y', 'yes']:
+ overwrite = True
+ break
+ elif response in ['n', 'no', '']:
+ raise FileExistsError(f"Output file already exists: {output_file}")
+ else:
+ print("Please answer 'y' or 'n'")
+
+ if overwrite and os.path.exists(output_file):
+ try:
+ os.remove(output_file)
+ except PermissionError:
+ import time
+ time.sleep(1)
+ try:
+ os.remove(output_file)
+ except PermissionError as e:
+ raise RuntimeError(f"Cannot overwrite file {output_file}: {str(e)}")
+
+ cmd = self._build_ffmpeg_command(input_file, output_file, settings, image_file, overwrite)
+
+ if dry_run:
+ print(f"Would execute: {' '.join(cmd)}")
+ return str(output_file)
+
+ try:
+ duration_cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', input_file]
+ duration = float(subprocess.run(duration_cmd, capture_output=True, text=True, check=True).stdout.strip())
+ except (subprocess.CalledProcessError, ValueError):
+ duration = None
+
+ try:
+ if duration is not None:
+ with tqdm(total=100, desc="Converting", unit="%") as pbar:
+ process = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=True,
+ bufsize=1
+ )
+
+ stderr_output = []
+ def read_stderr():
+ for line in process.stderr:
+ stderr_output.append(line)
+ if 'time=' in line:
+ try:
+ time_str = line.split('time=')[1].split()[0]
+ h, m, s = time_str.split(':')
+ time_sec = float(h) * 3600 + float(m) * 60 + float(s)
+ progress = min(100, round((time_sec / duration) * 100))
+ pbar.update(progress - pbar.n)
+ except (ValueError, ZeroDivisionError):
+ continue
+
+ import threading
+ stderr_thread = threading.Thread(target=read_stderr)
+ stderr_thread.daemon = True
+ stderr_thread.start()
+
+ process.wait()
+ stderr_thread.join(timeout=1)
+
+ if process.returncode != 0:
+ error_msg = ''.join(stderr_output)
+ raise RuntimeError(f"Conversion failed: {error_msg}")
+ else:
+ result = subprocess.run(cmd, capture_output=True, text=True, check=True)
+ if result.stderr:
+ print(f"FFmpeg output: {result.stderr}")
+
+ return str(output_file)
+ except subprocess.CalledProcessError as e:
+ error_msg = e.stderr if e.stderr else str(e)
+ raise RuntimeError(f"Conversion failed: {error_msg}")
+ except Exception as e:
+ self.logger.error("Error converting %s: %s", input_file, str(e))
+ raise
+
+ def convert_batch(
+ self,
+ input_files: List[str],
+ output_format: str,
+ output_dir: Optional[str] = None,
+ settings: Optional[Dict[str, str]] = None,
+ overwrite: bool = False,
+ dry_run: bool = False,
+ max_workers: int = 4,
+ add_suffix: bool = False
+ ) -> Dict[str, List[str]]:
+ self.logger.info("Starting batch conversion of %d files", len(input_files))
+
+ results = {'success': [], 'failed': []}
+
+ if dry_run:
+ for input_file in input_files:
+ try:
+ output_file = self.convert(
+ input_file=input_file,
+ output_format=output_format,
+ settings=settings,
+ add_suffix=add_suffix,
+ dry_run=True,
+ overwrite=overwrite,
+ output_dir=output_dir
+ )
+ results['success'].append(output_file)
+ except Exception as e:
+ self.logger.error("Failed to convert %s: %s", input_file, str(e))
+ results['failed'].append(input_file)
+ return results
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+ futures = []
+ for input_file in input_files:
+ future = executor.submit(
+ self.convert,
+ input_file=input_file,
+ output_format=output_format,
+ settings=settings,
+ add_suffix=add_suffix,
+ overwrite=overwrite,
+ output_dir=output_dir
+ )
+ futures.append((input_file, future))
+
+ with tqdm(total=len(futures), desc="Converting files", unit="file") as pbar:
+ for input_file, future in futures:
+ try:
+ output_file = future.result()
+ if output_file:
+ results['success'].append(output_file)
+ else:
+ results['failed'].append(input_file)
+ except Exception as e:
+ self.logger.error("Failed to convert %s: %s", input_file, str(e))
+ results['failed'].append(input_file)
+ finally:
+ pbar.update(1)
+
+ success_count = len(results['success'])
+ failed_count = len(results['failed'])
+ self.logger.info("Batch conversion complete: %d succeeded, %d failed",
+ success_count, failed_count)
+
+ if failed_count > 0:
+ self.logger.error("\nFailed conversions:")
+ for failed_file in results['failed']:
+ self.logger.error(" - %s", failed_file)
+
+ return results
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description='Convert media files using FFmpeg')
+ parser.add_argument('input_files', nargs='+', help='Input file(s) to convert')
+ parser.add_argument('--to', required=True, help='Output format (e.g., mp3, wav, mp4)')
+ parser.add_argument('--output-dir', help='Output directory')
+ parser.add_argument('--output-name', help='Custom output filename (for single input)')
+
+ parser.add_argument('--audio-bitrate', help='Audio bitrate (e.g., 192k)')
+ parser.add_argument('--video-bitrate', help='Video bitrate (e.g., 2M)')
+ parser.add_argument('--sample-rate', help='Audio sample rate (e.g., 44100)')
+ parser.add_argument('--channels', choices=['1', '2'], help='Number of audio channels')
+ parser.add_argument('--quality', help='Audio quality (0-9 for MP3, 0-10 for OGG)')
+
+ parser.add_argument('--crf', help='Video quality (0-51 for H.264, 0-63 for VP9)')
+ parser.add_argument('--preset', choices=['ultrafast', 'superfast', 'veryfast', 'faster', 'fast', 'medium', 'slow', 'slower', 'veryslow'],
+ help='FFmpeg preset')
+ parser.add_argument('--profile', choices=['baseline', 'main', 'high', 'high10', 'high422', 'high444'],
+ help='H.264 profile')
+ parser.add_argument('--level', help='H.264 level (e.g., 4.0, 4.1)')
+
+ parser.add_argument('--tile-columns', help='WebM tile columns (0-4)')
+ parser.add_argument('--frame-parallel', choices=['0', '1'], help='WebM frame parallel')
+
+ parser.add_argument('--duration', help='Duration in seconds for GIF conversion (max 15)')
+
+ parser.add_argument('--overwrite', action='store_true',
+ help='Overwrite existing output files (default: False)')
+ parser.add_argument('--dry-run', action='store_true',
+ help='Show commands without executing')
+ parser.add_argument('--max-workers', type=int, default=4,
+ help='Maximum number of parallel conversions')
+ parser.add_argument('--add-suffix', action='store_true',
+ help='Add "_converted" suffix to output files')
+ parser.add_argument('--test', action='store_true',
+ help='Run environment checks and exit')
+ parser.add_argument('--image', help='Image file to use when converting audio to video')
+
+ args = parser.parse_args()
+
+ if args.to == 'gif' and args.duration:
+ try:
+ duration = float(args.duration)
+ if duration <= 0 or duration > 15:
+ print("Error: GIF duration must be between 0 and 15 seconds")
+ return 1
+ except ValueError:
+ print("Error: Duration must be a valid number")
+ return 1
+
+ for input_file in args.input_files:
+ if not os.path.exists(input_file):
+ print(f"Error: Input file not found: {input_file}")
+ return 1
+ if not os.access(input_file, os.R_OK):
+ print(f"Error: Cannot read input file: {input_file}")
+ return 1
+
+ if args.output_dir:
+ try:
+ os.makedirs(args.output_dir, exist_ok=True)
+ if not os.access(args.output_dir, os.W_OK):
+ print(f"Error: Cannot write to output directory: {args.output_dir}")
+ return 1
+ except Exception as e:
+ print(f"Error creating output directory: {e}")
+ return 1
+
+ if args.output_name and len(args.input_files) > 1:
+ print("Warning: --output-name is ignored for batch conversion")
+
+ try:
+ converter = MediaConverter()
+
+ if args.test:
+ print("Environment check passed!")
+ return 0
+
+ settings = {}
+ if args.audio_bitrate:
+ settings['audio_bitrate'] = args.audio_bitrate
+ if args.video_bitrate:
+ settings['video_bitrate'] = args.video_bitrate
+ if args.sample_rate:
+ settings['sample_rate'] = args.sample_rate
+ if args.channels:
+ settings['channels'] = args.channels
+ if args.quality:
+ settings['quality'] = args.quality
+
+ if args.crf:
+ settings['crf'] = args.crf
+ if args.preset:
+ settings['preset'] = args.preset
+ if args.profile:
+ settings['profile'] = args.profile
+ if args.level:
+ settings['level'] = args.level
+
+ if args.tile_columns:
+ settings['tile-columns'] = args.tile_columns
+ if args.frame_parallel:
+ settings['frame-parallel'] = args.frame_parallel
+
+ if args.duration and args.to == 'gif':
+ settings['duration'] = args.duration
+
+ if len(args.input_files) == 1:
+ try:
+ output_file = converter.convert(
+ input_file=args.input_files[0],
+ output_format=args.to,
+ settings=settings,
+ image_file=args.image,
+ add_suffix=args.add_suffix,
+ dry_run=args.dry_run,
+ overwrite=args.overwrite,
+ output_name=args.output_name,
+ output_dir=args.output_dir
+ )
+ if output_file:
+ print(f"Successfully converted to: {output_file}")
+ return 0 if output_file else 1
+ except FileExistsError as e:
+ print(f"Error: {e}")
+ print("Use --overwrite to force overwrite existing files")
+ return 1
+ except Exception as e:
+ print(f"Error: {str(e)}")
+ return 1
+ else:
+ results = converter.convert_batch(
+ input_files=args.input_files,
+ output_format=args.to,
+ output_dir=args.output_dir,
+ settings=settings,
+ overwrite=args.overwrite,
+ dry_run=args.dry_run,
+ max_workers=args.max_workers,
+ add_suffix=args.add_suffix
+ )
+
+ success_count = len(results['success'])
+ failed_count = len(results['failed'])
+ print(f"Converted {success_count} out of {len(args.input_files)} files. {failed_count} failed.")
+
+ if failed_count > 0:
+ print("\nFailed conversions:")
+ for failed_file in results['failed']:
+ print(f" - {failed_file}")
+
+ return 0 if success_count == len(args.input_files) else 1
+
+ except EnvironmentError as e:
+ print(f"Environment error: {e}")
+ return 1
+ except Exception as e:
+ print(f"Error: {e}")
+ return 1
+
+if __name__ == '__main__':
+ sys.exit(main()) \ No newline at end of file