![[Pasted image 20241230164858.png]] ## Summary - I came across an [interesting repo on GitHub](https://github.com/Dorian25/ytb-most-replayed). It allows you to extract the engagement heatmap from the slider displayed on a YouTube video, and then clip the segments around the peaks. - I decided to build this out into a system that reads a text file of video links (newline separated), and clips the engaging parts before uploading them to Google Drive. - The heatmap (SVG) extraction was originally built using Selenium in Python with a pre-configured Firefox profile... - I swapped Selenium out for puppeteer-extra in Node.js. No browser profile is needed, as you can use the puppeteer-extra-plugin-adblocker module to prevent ads. - In my experience, launching browsers with profiles can be costly. ## What is this useful for? - Increasing engagement on an Instagram profile: - Clipping videos in this way takes the guesswork out of trying to find engaging content to repost. - Instagram may favor mobile posts over PC posts, as mobile posts are less likely to be bot generated / automated. - Google Drive acts as a harbor where these posts can be accessed by a smartphone and downloaded before being re-posted. ![[Pasted image 20241231235722.png]] ## Technical Implementation The system operates in three stages: 1. A link processor reads YouTube URLs from a text file and manages the processing queue with built-in retry logic 2. A Node.js-based heatmap extractor captures engagement data using puppeteer-extra, replacing the original Selenium implementation to eliminate the need for browser profiles 3. A Python processor analyzes the heatmap data, identifies engagement peaks, and extracts video segments before uploading them to Google Drive ## Code ### heatmap_extractor.js ```js // heatmap_extractor.js const puppeteer = require('puppeteer-extra') const StealthPlugin = require('puppeteer-extra-plugin-stealth') const AdblockerPlugin = require('puppeteer-extra-plugin-adblocker') const fs = require('fs').promises puppeteer.use(StealthPlugin()) puppeteer.use(AdblockerPlugin({ blockTrackers: true })) class HeatmapExtractor { constructor(videoUrl, outputDir = './pending') { this.videoUrl = videoUrl; this.outputDir = outputDir; } async getVideoTitle(page) { return await page.evaluate(() => { const titleElement = document.querySelector('h1.style-scope.ytd-video-primary-info-renderer'); return titleElement ? titleElement.textContent.trim() : null; }); } async extractHeatmap() { const browser = await puppeteer.launch({ headless: false, args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-notifications'] }); const page = await browser.newPage(); await page.setDefaultTimeout(60000); await page.setViewport({ width: 1280, height: 720 }); try { await page.goto(this.videoUrl, { waitUntil: 'networkidle0' }); const videoTitle = await this.getVideoTitle(page); if (!videoTitle) { throw new Error('Could not extract video title'); } // Handle consent dialog if it appears try { const consentButton = await page.$('.ytp-consent-dialog button') if (consentButton) { await consentButton.click() } } catch (e) { } await page.waitForSelector('.html5-video-player') try { const progressBar = await page.waitForSelector('.ytp-progress-bar-container', { timeout: 10000 }) const box = await progressBar.boundingBox() await page.mouse.move(box.x + box.width / 2, box.y + box.height / 2) let attempts = 0 let heatmapData = null while (attempts < 20 && !heatmapData) { await new Promise(resolve => setTimeout(resolve, 4000)) heatmapData = await page.evaluate(() => { // Get all chapter elements instead of just one const heatmapElements = document.querySelectorAll('.ytp-heat-map-chapter'); if (!heatmapElements.length) return null; const container = document.createElement('div'); container.className = 'ytp-heat-map-container'; let totalWidth = 0; heatmapElements.forEach(element => { totalWidth += parseFloat(element.style.width); }); heatmapElements.forEach((element, index) => { const clone = element.cloneNode(true); clone.setAttribute('data-original-width', element.style.width); clone.setAttribute('data-original-left', element.style.left); clone.setAttribute('data-chapter-index', index); container.appendChild(clone); }); return container.outerHTML; }); attempts++; } if (!heatmapData) { throw new Error('Heatmap data not found - video might be too new or have too few views') } const timestamp = new Date().toISOString(); const fileId = `${timestamp.replace(/[:.]/g, '-')}`; const svgPath = `${this.outputDir}/${fileId}.svg`; const metadataPath = `${this.outputDir}/${fileId}_metadata.json`; await fs.writeFile(svgPath, heatmapData); const metadata = { video_url: this.videoUrl, video_title: videoTitle, extraction_timestamp: timestamp, status: 'pending', file_id: fileId, has_chapters: true }; await fs.writeFile(metadataPath, JSON.stringify(metadata, null, 2)); await browser.close(); return true; } catch (error) { console.error('Failed to extract heatmap:', error.message) throw error } } catch (error) { await browser.close() throw error } } } // Allow command line usage if (require.main === module) { const videoUrl = process.argv[2] if (!videoUrl) { console.error('Please provide a YouTube URL') process.exit(1) } const extractor = new HeatmapExtractor(videoUrl) extractor.extractHeatmap() .then(() => console.log('Heatmap extracted successfully')) .catch(error => { console.error('Error extracting heatmap:', error) process.exit(1) }) } module.exports = HeatmapExtractor ``` ### process_links.py ```python # process_links.py # This script processes links from a file and runs the heatmap extraction for each link import os import time import logging import subprocess from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('link_processor.log'), logging.StreamHandler() ] ) class LinkProcessor: def __init__(self, links_file='links_to_crawl.txt', processed_file='processed_links.txt'): self.links_file = links_file self.processed_file = processed_file self.processed_links = self.load_processed_links() self.failed_attempts = {} def load_processed_links(self): """Load previously processed links""" try: with open(self.processed_file, 'r') as f: return set(line.strip() for line in f) except FileNotFoundError: return set() def mark_as_processed(self, link): """Mark a link as processed""" with open(self.processed_file, 'a') as f: f.write(f"{link}\n") self.processed_links.add(link) def process_next_link(self): """Process the next unprocessed link from the file""" retry_attempts = 5 try: with open(self.links_file, 'r') as f: links = [line.strip() for line in f if line.strip()] for link in links: if link not in self.processed_links: logging.info(f"Processing link: {link}") if link not in self.failed_attempts: self.failed_attempts[link] = 0 result = subprocess.run(['./run_heatmap.sh', link], capture_output=True, text=True) if result.returncode == 0: logging.info(f"Successfully processed: {link}") self.mark_as_processed(link) self.failed_attempts.pop(link, None) # Remove from failed attempts return True else: self.failed_attempts[link] += 1 logging.error(f"Failed to process {link} (Attempt {self.failed_attempts[link]}/{retry_attempts}): {result.stderr}") if self.failed_attempts[link] >= retry_attempts: logging.warning(f"Link {link} failed {retry_attempts} times, marking as processed and skipping") self.mark_as_processed(link) self.failed_attempts.pop(link, None) return False return False except Exception as e: logging.error(f"Error processing links: {e}") return False def main(): processor = LinkProcessor() while True: try: if not processor.process_next_link(): logging.info("No new links to process. Waiting...") time.sleep(30) except KeyboardInterrupt: logging.info("Link processor stopped by user") break except Exception as e: logging.error(f"Unexpected error: {e}") time.sleep(60) if __name__ == "__main__": main() ``` ### svg_processor.py ```python # svg_processor.py # script to extract video segments from a youtube video based on a heatmap #%% import subprocess import time import numpy as np import matplotlib.pyplot as plt from bs4 import BeautifulSoup from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip import yt_dlp import os import traceback from argparse import ArgumentParser from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import json import logging from datetime import datetime logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('processor.log'), logging.StreamHandler() ] ) class ProcessingState: def __init__(self, state_file='processing_state.json'): self.state_file = state_file self.load_state() def load_state(self): try: with open(self.state_file, 'r') as f: self.state = json.load(f) except FileNotFoundError: self.state = {} def save_state(self): with open(self.state_file, 'w') as f: json.dump(self.state, f) def mark_started(self, file_id): self.state[file_id] = { 'status': 'processing', 'start_time': datetime.now().isoformat() } self.save_state() def mark_completed(self, file_id): if file_id in self.state: self.state[file_id]['status'] = 'completed' self.state[file_id]['end_time'] = datetime.now().isoformat() self.save_state() def mark_failed(self, file_id, error): if file_id in self.state: self.state[file_id]['status'] = 'failed' self.state[file_id]['error'] = str(error) self.state[file_id]['end_time'] = datetime.now().isoformat() self.save_state() class SVGHandler(FileSystemEventHandler): def __init__(self, processor): self.processor = processor self.processing_state = ProcessingState() def on_created(self, event): if not event.src_path.endswith('.svg'): return try: file_id = os.path.basename(event.src_path).replace('.svg', '') metadata_path = f"{os.path.dirname(event.src_path)}/{file_id}_metadata.json" time.sleep(1) print(metadata_path) if not os.path.exists(metadata_path): logging.warning(f"No metadata found for {event.src_path}") return with open(metadata_path, 'r') as f: metadata = json.load(f) if metadata.get('status') != 'pending': logging.info(f"Skipping {file_id} - status is {metadata.get('status')}") return self.processing_state.mark_started(file_id) self.processor.process_video_package(event.src_path, metadata) self.processing_state.mark_completed(file_id) self._archive_processed_files(event.src_path, metadata_path) except Exception as e: logging.error(f"Error processing {event.src_path}: {str(e)}") self.processing_state.mark_failed(file_id, e) self._move_to_error_directory(event.src_path, metadata_path) def _archive_processed_files(self, svg_path, metadata_path): """Move processed files to completed directory""" try: new_svg_path = svg_path.replace('/pending/', '/completed/') os.rename(svg_path, new_svg_path) new_metadata_path = metadata_path.replace('/pending/', '/completed/') os.rename(metadata_path, new_metadata_path) logging.info(f"Archived files to completed directory: {os.path.basename(svg_path)}") except Exception as e: logging.error(f"Error archiving files: {e}") def _move_to_error_directory(self, svg_path, metadata_path): """Move failed files to error directory""" try: new_svg_path = svg_path.replace('/pending/', '/error/') os.rename(svg_path, new_svg_path) new_metadata_path = metadata_path.replace('/pending/', '/error/') os.rename(metadata_path, new_metadata_path) logging.info(f"Moved failed files to error directory: {os.path.basename(svg_path)}") except Exception as e: logging.error(f"Error moving files to error directory: {e}") class SVGProcessor: def __init__(self, watch_directory='./pending'): self.watch_directory = watch_directory self.drive_service = self.google_drive_login() for dir_name in ['pending', 'completed', 'error', 'peaks']: os.makedirs(dir_name, exist_ok=True) def start(self): """Start the processor service""" logging.info("Starting SVG Processor service...") event_handler = SVGHandler(self) observer = Observer() observer.schedule(event_handler, self.watch_directory, recursive=False) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: logging.info("Shutting down SVG Processor service...") observer.stop() observer.join() def google_drive_login(self): """Handles Google Drive authentication and creates the service.""" SCOPES = ["https://www.googleapis.com/auth/drive"] creds = None if os.path.exists("token.json"): creds = Credentials.from_authorized_user_file("token.json", SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file("credentials1.json", SCOPES) creds = flow.run_local_server(port=0) with open("token.json", "w") as token: token.write(creds.to_json()) return build("drive", "v3", credentials=creds) def create_drive_folder(self, folder_name): """Creates a folder in Google Drive and returns its ID.""" file_metadata = { "name": folder_name, "mimeType": "application/vnd.google-apps.folder" } file = self.drive_service.files().create( body=file_metadata, fields="id" ).execute() return file.get("id") def upload_to_drive(self, filename, folder_id, file_type="video/mp4"): """Uploads a file to the specified Google Drive folder.""" file_metadata = { "name": os.path.basename(filename), "parents": [folder_id] } media = MediaFileUpload(filename, mimetype=file_type) self.drive_service.files().create( body=file_metadata, media_body=media, fields="id" ).execute() def read_svg(self): """Read the SVG file saved by the Node.js extractor""" if not os.path.exists(self.svg_path): raise FileNotFoundError( f"SVG file not found at {self.svg_path}. " "Did you run heatmap_extractor.js first?" ) with open(self.svg_path, 'r') as f: return f.read() ''' def download_video(self, resolution=360): print("launch video downloading") ydl_opts = { 'format': f"b[height={resolution}][ext=mp4]", 'quiet': True, "nopart": True, 'outtmpl': "./download/youtube_video.%(ext)s" } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: self.video_info = ydl.extract_info(self.video_url) except yt_dlp.utils.DownloadError: print('interrupt the download') traceback.print_exc() return False return True ''' def download_video(self, max_height=1080): """Downloads a YouTube video with more robust error handling and retries.""" print(f"Launching video download (max height: {max_height}p)") ydl_opts = { 'format': f'bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]', 'quiet': False, 'nopart': True, 'outtmpl': "./download/youtube_video.%(ext)s", 'retries': 10, 'fragment_retries': 10, 'skip_unavailable_fragments': False, 'continuedl': True, 'progress_hooks': [lambda d: print(f"Download progress: {d.get('status', 'unknown')}")], # Remove post-processors to avoid immediate MP4 conversion 'postprocessors': [] } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: self.video_info = ydl.extract_info(self.video_url, download=True) self.downloaded_video = ydl.prepare_filename(self.video_info) return True except Exception as e: print(f"Unexpected error during download: {str(e)}") traceback.print_exc() return False def extract_points_from_svg(self): """ Extract engagement points from SVG, handling multiple chapters if present. Returns normalized points that account for chapter positions in the 0-1000 range. """ print("Extracting points from SVG") svg = self.read_svg() soup = BeautifulSoup(svg, "lxml") chapters = soup.find_all("div", class_="ytp-heat-map-chapter") if not chapters: logging.warning("No chapters found in SVG data") return [] all_points = [] total_width = 0 for i, chapter in enumerate(chapters): style = chapter.get('style', '') width_str = style.split('width:')[1].split('px')[0].strip() if 'width:' in style else '0' width = float(width_str) total_width += width print(f'Chapter {i}: Width = {width}px') accumulated_width = 0 for chapter in chapters: style = chapter.get('style', '') width_str = style.split('width:')[1].split('px')[0].strip() if 'width:' in style else '0' width = float(width_str) path = chapter.find("path") if not path: continue d = path.get("d") coordinates = d.replace("M ", "").replace("C ", "").split(" ") chapter_points = [] for c in coordinates[1:]: try: x, y = c.split(",") x_coordinate = float(x) y_coordinate = float(y) chapter_position = (accumulated_width / total_width) * 1000 chapter_width_normalized = (width / total_width) * 1000 normalized_x = chapter_position + (x_coordinate * chapter_width_normalized / 1000) chapter_points.append((normalized_x, y_coordinate)) except (ValueError, IndexError) as e: logging.warning(f"Skipping invalid coordinate: {c} - {str(e)}") all_points.extend(chapter_points) accumulated_width += width all_points.sort(key=lambda p: p[0]) if all_points: x_coords, y_coords = zip(*all_points) y_coords = [90.0 - y for y in y_coords] y_coords = [y if y >= 0.0 else 0.0 for y in y_coords] return list(zip(x_coords, y_coords)) return [] def find_peaks(self, points, threshold=50.0, min_distance=20): """ Find peaks while ensuring they're separated by at least min_distance points. This helps avoid detecting multiple peaks that are too close together. """ peaks = [] last_peak_index = -min_distance for i in range(1, len(points)-1): prev_y = points[i-1][1] curr_y = points[i][1] next_y = points[i+1][1] if (curr_y > prev_y and curr_y > next_y and curr_y >= threshold and i - last_peak_index >= min_distance): peaks.append(points[i]) last_peak_index = i return peaks def plot_data(self, points, peaks, output_path="heatmap_analysis.png"): """Plot and save the heatmap visualization""" x_coords, y_coords = zip(*points) plt.figure(figsize=(12, 6)) plt.plot(x_coords, y_coords, marker="+", label='Engagement') threshold = 50.0 plt.axhline(y=threshold, color='r', linestyle='--', label='Threshold') if peaks: peak_x, peak_y = zip(*peaks) plt.scatter(peak_x, peak_y, marker="o", color='red', label='Peaks') else: logging.warning("No peaks detected in the heatmap data") plt.xlabel('Video Position') plt.ylabel('Replay Intensity') plt.title(f'YouTube Video Heatmap Analysis\n{os.path.basename(self.video_url)}') plt.legend() plt.grid(True, alpha=0.3) plt.savefig(output_path, dpi=300, bbox_inches='tight') plt.close() def peaks_to_time(self, x): """Convert x coordinate (0-1000) to video timestamp in seconds""" if not self.video_info.get('duration'): raise ValueError("Video info not available. Did you run download_video first?") return (x * self.video_info["duration"]) / 1000 def extract_part(self, x_coord, y_coord, name): """Extract video segment around the peak timestamp""" if not self.video_info.get('duration'): raise ValueError("Video not downloaded. Run download_video first.") timestamp = self.peaks_to_time(x_coord) window_size = 15 start_time = max(0, timestamp - (window_size)) end_time = min(self.video_info["duration"], start_time + (window_size * 1.8)) os.makedirs("peaks", exist_ok=True) input_ext = os.path.splitext(self.downloaded_video)[1] temp_output = f"peaks/{name}_temp{input_ext}" final_output = f"peaks/{name}.mp4" try: print(f"Extracting segment from {start_time:.2f}s to {end_time:.2f}s") ffmpeg_extract_cmd = [ 'ffmpeg', '-y', '-i', self.downloaded_video, '-ss', f"{start_time:.2f}", '-t', f"{end_time - start_time:.2f}", '-c', 'copy', temp_output ] subprocess.run(ffmpeg_extract_cmd, check=True, capture_output=True) ffmpeg_convert_cmd = [ 'ffmpeg', '-y', '-i', temp_output, '-c:v', 'libx264', '-c:a', 'aac', '-preset', 'fast', final_output ] subprocess.run(ffmpeg_convert_cmd, check=True, capture_output=True) if os.path.exists(temp_output): os.remove(temp_output) if not os.path.exists(final_output) or os.path.getsize(final_output) == 0: raise ValueError(f"Failed to create valid video file at {final_output}") except Exception as e: print(f"Error extracting segment: {str(e)}") if os.path.exists(temp_output): os.remove(temp_output) raise def smooth_points(self, points, window_size=5): """ Smooths the data using a simple moving average. The window_size parameter determines how much smoothing occurs - larger values create more smoothing but might miss important details. Args: points: List of (x,y) coordinate tuples from the heatmap data window_size: Number of points to consider in the moving average window Returns: List of (x,y) coordinates with smoothed y values """ x_coords, y_coords = zip(*points) y_coords = list(y_coords) smoothed_y = [] for i in range(len(y_coords)): start = max(0, i - window_size//2) end = min(len(y_coords), i + window_size//2 + 1) window_average = sum(y_coords[start:end]) / (end - start) smoothed_y.append(window_average) return list(zip(x_coords, smoothed_y)) def process_video_package(self, svg_path, metadata): """Process a single video package (SVG + metadata)""" try: self.svg_path = svg_path self.video_url = metadata['video_url'] self.video_info = {} if not self.download_video(max_height=360): raise Exception("Failed to download video") folder_name = metadata.get('video_title', 'youtube_clips') folder_id = self.create_drive_folder(folder_name) logging.info(f"Created Google Drive folder: {folder_name}") points = self.extract_points_from_svg() if not points: raise ValueError("No valid heatmap data found in SVG") points = self.smooth_points(points) peaks = self.find_peaks(points) if not peaks: logging.warning("No significant peaks found in the heatmap data") heatmap_path = f"peaks/heatmap_{os.path.basename(svg_path)}.png".replace(".svg", "") self.plot_data(points, peaks, output_path=heatmap_path) logging.info("Uploading heatmap visualization to Google Drive...") self.upload_to_drive(heatmap_path, folder_id, file_type="image/png") os.remove(heatmap_path) successful_extracts = 0 for i, peak in enumerate(peaks): try: clip_path = f"peaks/peak_{i}.mp4" self.extract_part(peak[0], peak[1], f"peak_{i}") logging.info(f"Uploading clip {i} to Google Drive...") self.upload_to_drive(clip_path, folder_id, file_type="video/mp4") successful_extracts += 1 logging.info(f"Successfully extracted and uploaded peak {i} at position {peak[0]} ({self.peaks_to_time(peak[0]):.2f}s)") os.remove(clip_path) except Exception as e: logging.error(f"Failed to extract peak {i}: {e}") logging.info(f"Extraction complete: {successful_extracts} of {len(peaks)} clips extracted and uploaded successfully") try: os.remove("./download/youtube_video.webm") logging.info("Cleaned up downloaded video file") except Exception as e: logging.warning(f"Could not remove downloaded video: {e}") return True except Exception as e: logging.error(f"Error processing video package: {e}") traceback.print_exc() raise def main(): """Start the SVG processor service""" try: processor = SVGProcessor() logging.info("Starting SVG processor service...") processor.start() except KeyboardInterrupt: logging.info("Service stopped by user") except Exception as e: logging.error(f"Service error: {str(e)}") raise if __name__ == "__main__": main() ``` ## Bonus Script - In the event you want to manually clip a section of a YouTube video: ```python # manual_clip.py # clip youtube videos and upload to google drive # input: url, start, end, quality, title(optional) import yt_dlp import os from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload import subprocess import argparse import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('clipper.log'), logging.StreamHandler() ] ) class YouTubeClipper: def __init__(self): self.drive_service = self.authenticate_google_drive() os.makedirs("temp_downloads", exist_ok=True) def authenticate_google_drive(self): """Handle Google Drive authentication process.""" SCOPES = ["https://www.googleapis.com/auth/drive"] creds = None if os.path.exists("token.json"): creds = Credentials.from_authorized_user_file("token.json", SCOPES) # Handle credential refresh or new authentication if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( "credentials.json", SCOPES) creds = flow.run_local_server(port=0) with open("token.json", "w") as token: token.write(creds.to_json()) return build("drive", "v3", credentials=creds) def download_video(self, url, max_height=720): """Download the YouTube video at specified quality.""" logging.info(f"Downloading video: {url}") ydl_opts = { 'format': f'bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]', 'quiet': False, 'nopart': True, 'outtmpl': "temp_downloads/%(title)s.%(ext)s", 'retries': 3 } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) return ydl.prepare_filename(info), info.get('title') except Exception as e: logging.error(f"Download failed: {str(e)}") raise def clip_video(self, input_path, start_time, end_time, output_path): """Extract the clip using FFmpeg.""" logging.info(f"Extracting clip from {start_time}s to {end_time}s") try: temp_output = f"{output_path}_temp.mp4" extract_cmd = [ 'ffmpeg', '-y', '-i', input_path, '-ss', str(start_time), '-t', str(end_time - start_time), '-c', 'copy', temp_output ] subprocess.run(extract_cmd, check=True, capture_output=True) convert_cmd = [ 'ffmpeg', '-y', '-i', temp_output, '-c:v', 'libx264', '-c:a', 'aac', '-preset', 'fast', output_path ] subprocess.run(convert_cmd, check=True, capture_output=True) if os.path.exists(temp_output): os.remove(temp_output) return True except subprocess.CalledProcessError as e: logging.error(f"FFmpeg error: {e.stderr.decode()}") raise except Exception as e: logging.error(f"Clipping error: {str(e)}") raise def upload_to_drive(self, file_path, title=None): """Upload the clip to Google Drive.""" logging.info("Uploading clip to Google Drive") try: folder_name = "YouTube Clips" folder_id = self.get_or_create_folder(folder_name) file_metadata = { 'name': title or os.path.basename(file_path), 'parents': [folder_id] } media = MediaFileUpload(file_path, mimetype='video/mp4') file = self.drive_service.files().create( body=file_metadata, media_body=media, fields='id' ).execute() logging.info(f"Upload complete. File ID: {file.get('id')}") return file.get('id') except Exception as e: logging.error(f"Upload failed: {str(e)}") raise def get_or_create_folder(self, folder_name): """Get or create a folder in Google Drive.""" results = self.drive_service.files().list( q=f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false", spaces='drive', fields='files(id)' ).execute() if results.get('files'): return results['files'][0]['id'] folder_metadata = { 'name': folder_name, 'mimeType': 'application/vnd.google-apps.folder' } folder = self.drive_service.files().create( body=folder_metadata, fields='id' ).execute() return folder.get('id') def cleanup(self, file_path): """Remove temporary files.""" try: if os.path.exists(file_path): os.remove(file_path) except Exception as e: logging.warning(f"Cleanup error: {str(e)}") def main(): parser = argparse.ArgumentParser(description='Clip YouTube videos and upload to Google Drive') parser.add_argument('url', help='YouTube video URL') parser.add_argument('start', type=float, help='Start time in seconds') parser.add_argument('end', type=float, help='End time in seconds') parser.add_argument('--quality', type=int, default=720, help='Maximum video height (default: 720)') parser.add_argument('--title', help='Custom title for the clip (optional)') args = parser.parse_args() clipper = YouTubeClipper() try: input_path, video_title = clipper.download_video(args.url, args.quality) clip_title = args.title or f"{video_title}_clip_{args.start}-{args.end}" output_path = f"temp_downloads/{clip_title}.mp4" clipper.clip_video(input_path, args.start, args.end, output_path) clipper.upload_to_drive(output_path, clip_title) clipper.cleanup(input_path) clipper.cleanup(output_path) logging.info("Process completed successfully") except Exception as e: logging.error(f"Process failed: {str(e)}") raise if __name__ == "__main__": main() #python manual_clip.py "https://youtube.com/watch?v=VIDEO_ID" 30 45 --quality 720 --title "My Clip" ```