![[Pasted image 20241230164858.png]]
## Summary
- I came across an [interesting repo on GitHub](https://github.com/Dorian25/ytb-most-replayed). It allows you to extract the engagement heatmap from the slider displayed on a YouTube video, and then clip the segments around the peaks.
- I decided to build this out into a system that reads a text file of video links (newline separated), and clips the engaging parts before uploading them to Google Drive.
- The heatmap (SVG) extraction was originally built using Selenium in Python with a pre-configured Firefox profile...
- I swapped Selenium out for puppeteer-extra in Node.js. No browser profile is needed, as you can use the puppeteer-extra-plugin-adblocker module to prevent ads.
- In my experience, launching browsers with profiles can be costly.
## What is this useful for?
- Increasing engagement on an Instagram profile:
- Clipping videos in this way takes the guesswork out of trying to find engaging content to repost.
- Instagram may favor mobile posts over PC posts, as mobile posts are less likely to be bot generated / automated.
- Google Drive acts as a harbor where these posts can be accessed by a smartphone and downloaded before being re-posted.
![[Pasted image 20241231235722.png]]
## Technical Implementation
The system operates in three stages:
1. A link processor reads YouTube URLs from a text file and manages the processing queue with built-in retry logic
2. A Node.js-based heatmap extractor captures engagement data using puppeteer-extra, replacing the original Selenium implementation to eliminate the need for browser profiles
3. A Python processor analyzes the heatmap data, identifies engagement peaks, and extracts video segments before uploading them to Google Drive
## Code
### heatmap_extractor.js
```js
// heatmap_extractor.js
const puppeteer = require('puppeteer-extra')
const StealthPlugin = require('puppeteer-extra-plugin-stealth')
const AdblockerPlugin = require('puppeteer-extra-plugin-adblocker')
const fs = require('fs').promises
puppeteer.use(StealthPlugin())
puppeteer.use(AdblockerPlugin({ blockTrackers: true }))
class HeatmapExtractor {
constructor(videoUrl, outputDir = './pending') {
this.videoUrl = videoUrl;
this.outputDir = outputDir;
}
async getVideoTitle(page) {
return await page.evaluate(() => {
const titleElement = document.querySelector('h1.style-scope.ytd-video-primary-info-renderer');
return titleElement ? titleElement.textContent.trim() : null;
});
}
async extractHeatmap() {
const browser = await puppeteer.launch({
headless: false,
args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-notifications']
});
const page = await browser.newPage();
await page.setDefaultTimeout(60000);
await page.setViewport({ width: 1280, height: 720 });
try {
await page.goto(this.videoUrl, { waitUntil: 'networkidle0' });
const videoTitle = await this.getVideoTitle(page);
if (!videoTitle) {
throw new Error('Could not extract video title');
}
// Handle consent dialog if it appears
try {
const consentButton = await page.$('.ytp-consent-dialog button')
if (consentButton) {
await consentButton.click()
}
} catch (e) {
}
await page.waitForSelector('.html5-video-player')
try {
const progressBar = await page.waitForSelector('.ytp-progress-bar-container', { timeout: 10000 })
const box = await progressBar.boundingBox()
await page.mouse.move(box.x + box.width / 2, box.y + box.height / 2)
let attempts = 0
let heatmapData = null
while (attempts < 20 && !heatmapData) {
await new Promise(resolve => setTimeout(resolve, 4000))
heatmapData = await page.evaluate(() => {
// Get all chapter elements instead of just one
const heatmapElements = document.querySelectorAll('.ytp-heat-map-chapter');
if (!heatmapElements.length) return null;
const container = document.createElement('div');
container.className = 'ytp-heat-map-container';
let totalWidth = 0;
heatmapElements.forEach(element => {
totalWidth += parseFloat(element.style.width);
});
heatmapElements.forEach((element, index) => {
const clone = element.cloneNode(true);
clone.setAttribute('data-original-width', element.style.width);
clone.setAttribute('data-original-left', element.style.left);
clone.setAttribute('data-chapter-index', index);
container.appendChild(clone);
});
return container.outerHTML;
});
attempts++;
}
if (!heatmapData) {
throw new Error('Heatmap data not found - video might be too new or have too few views')
}
const timestamp = new Date().toISOString();
const fileId = `${timestamp.replace(/[:.]/g, '-')}`;
const svgPath = `${this.outputDir}/${fileId}.svg`;
const metadataPath = `${this.outputDir}/${fileId}_metadata.json`;
await fs.writeFile(svgPath, heatmapData);
const metadata = {
video_url: this.videoUrl,
video_title: videoTitle,
extraction_timestamp: timestamp,
status: 'pending',
file_id: fileId,
has_chapters: true
};
await fs.writeFile(metadataPath, JSON.stringify(metadata, null, 2));
await browser.close();
return true;
} catch (error) {
console.error('Failed to extract heatmap:', error.message)
throw error
}
} catch (error) {
await browser.close()
throw error
}
}
}
// Allow command line usage
if (require.main === module) {
const videoUrl = process.argv[2]
if (!videoUrl) {
console.error('Please provide a YouTube URL')
process.exit(1)
}
const extractor = new HeatmapExtractor(videoUrl)
extractor.extractHeatmap()
.then(() => console.log('Heatmap extracted successfully'))
.catch(error => {
console.error('Error extracting heatmap:', error)
process.exit(1)
})
}
module.exports = HeatmapExtractor
```
### process_links.py
```python
# process_links.py
# This script processes links from a file and runs the heatmap extraction for each link
import os
import time
import logging
import subprocess
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('link_processor.log'),
logging.StreamHandler()
]
)
class LinkProcessor:
def __init__(self, links_file='links_to_crawl.txt', processed_file='processed_links.txt'):
self.links_file = links_file
self.processed_file = processed_file
self.processed_links = self.load_processed_links()
self.failed_attempts = {}
def load_processed_links(self):
"""Load previously processed links"""
try:
with open(self.processed_file, 'r') as f:
return set(line.strip() for line in f)
except FileNotFoundError:
return set()
def mark_as_processed(self, link):
"""Mark a link as processed"""
with open(self.processed_file, 'a') as f:
f.write(f"{link}\n")
self.processed_links.add(link)
def process_next_link(self):
"""Process the next unprocessed link from the file"""
retry_attempts = 5
try:
with open(self.links_file, 'r') as f:
links = [line.strip() for line in f if line.strip()]
for link in links:
if link not in self.processed_links:
logging.info(f"Processing link: {link}")
if link not in self.failed_attempts:
self.failed_attempts[link] = 0
result = subprocess.run(['./run_heatmap.sh', link],
capture_output=True,
text=True)
if result.returncode == 0:
logging.info(f"Successfully processed: {link}")
self.mark_as_processed(link)
self.failed_attempts.pop(link, None) # Remove from failed attempts
return True
else:
self.failed_attempts[link] += 1
logging.error(f"Failed to process {link} (Attempt {self.failed_attempts[link]}/{retry_attempts}): {result.stderr}")
if self.failed_attempts[link] >= retry_attempts:
logging.warning(f"Link {link} failed {retry_attempts} times, marking as processed and skipping")
self.mark_as_processed(link)
self.failed_attempts.pop(link, None)
return False
return False
except Exception as e:
logging.error(f"Error processing links: {e}")
return False
def main():
processor = LinkProcessor()
while True:
try:
if not processor.process_next_link():
logging.info("No new links to process. Waiting...")
time.sleep(30)
except KeyboardInterrupt:
logging.info("Link processor stopped by user")
break
except Exception as e:
logging.error(f"Unexpected error: {e}")
time.sleep(60)
if __name__ == "__main__":
main()
```
### svg_processor.py
```python
# svg_processor.py
# script to extract video segments from a youtube video based on a heatmap
#%%
import subprocess
import time
import numpy as np
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
import yt_dlp
import os
import traceback
from argparse import ArgumentParser
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import json
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('processor.log'),
logging.StreamHandler()
]
)
class ProcessingState:
def __init__(self, state_file='processing_state.json'):
self.state_file = state_file
self.load_state()
def load_state(self):
try:
with open(self.state_file, 'r') as f:
self.state = json.load(f)
except FileNotFoundError:
self.state = {}
def save_state(self):
with open(self.state_file, 'w') as f:
json.dump(self.state, f)
def mark_started(self, file_id):
self.state[file_id] = {
'status': 'processing',
'start_time': datetime.now().isoformat()
}
self.save_state()
def mark_completed(self, file_id):
if file_id in self.state:
self.state[file_id]['status'] = 'completed'
self.state[file_id]['end_time'] = datetime.now().isoformat()
self.save_state()
def mark_failed(self, file_id, error):
if file_id in self.state:
self.state[file_id]['status'] = 'failed'
self.state[file_id]['error'] = str(error)
self.state[file_id]['end_time'] = datetime.now().isoformat()
self.save_state()
class SVGHandler(FileSystemEventHandler):
def __init__(self, processor):
self.processor = processor
self.processing_state = ProcessingState()
def on_created(self, event):
if not event.src_path.endswith('.svg'):
return
try:
file_id = os.path.basename(event.src_path).replace('.svg', '')
metadata_path = f"{os.path.dirname(event.src_path)}/{file_id}_metadata.json"
time.sleep(1)
print(metadata_path)
if not os.path.exists(metadata_path):
logging.warning(f"No metadata found for {event.src_path}")
return
with open(metadata_path, 'r') as f:
metadata = json.load(f)
if metadata.get('status') != 'pending':
logging.info(f"Skipping {file_id} - status is {metadata.get('status')}")
return
self.processing_state.mark_started(file_id)
self.processor.process_video_package(event.src_path, metadata)
self.processing_state.mark_completed(file_id)
self._archive_processed_files(event.src_path, metadata_path)
except Exception as e:
logging.error(f"Error processing {event.src_path}: {str(e)}")
self.processing_state.mark_failed(file_id, e)
self._move_to_error_directory(event.src_path, metadata_path)
def _archive_processed_files(self, svg_path, metadata_path):
"""Move processed files to completed directory"""
try:
new_svg_path = svg_path.replace('/pending/', '/completed/')
os.rename(svg_path, new_svg_path)
new_metadata_path = metadata_path.replace('/pending/', '/completed/')
os.rename(metadata_path, new_metadata_path)
logging.info(f"Archived files to completed directory: {os.path.basename(svg_path)}")
except Exception as e:
logging.error(f"Error archiving files: {e}")
def _move_to_error_directory(self, svg_path, metadata_path):
"""Move failed files to error directory"""
try:
new_svg_path = svg_path.replace('/pending/', '/error/')
os.rename(svg_path, new_svg_path)
new_metadata_path = metadata_path.replace('/pending/', '/error/')
os.rename(metadata_path, new_metadata_path)
logging.info(f"Moved failed files to error directory: {os.path.basename(svg_path)}")
except Exception as e:
logging.error(f"Error moving files to error directory: {e}")
class SVGProcessor:
def __init__(self, watch_directory='./pending'):
self.watch_directory = watch_directory
self.drive_service = self.google_drive_login()
for dir_name in ['pending', 'completed', 'error', 'peaks']:
os.makedirs(dir_name, exist_ok=True)
def start(self):
"""Start the processor service"""
logging.info("Starting SVG Processor service...")
event_handler = SVGHandler(self)
observer = Observer()
observer.schedule(event_handler, self.watch_directory, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("Shutting down SVG Processor service...")
observer.stop()
observer.join()
def google_drive_login(self):
"""Handles Google Drive authentication and creates the service."""
SCOPES = ["https://www.googleapis.com/auth/drive"]
creds = None
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file("credentials1.json", SCOPES)
creds = flow.run_local_server(port=0)
with open("token.json", "w") as token:
token.write(creds.to_json())
return build("drive", "v3", credentials=creds)
def create_drive_folder(self, folder_name):
"""Creates a folder in Google Drive and returns its ID."""
file_metadata = {
"name": folder_name,
"mimeType": "application/vnd.google-apps.folder"
}
file = self.drive_service.files().create(
body=file_metadata, fields="id"
).execute()
return file.get("id")
def upload_to_drive(self, filename, folder_id, file_type="video/mp4"):
"""Uploads a file to the specified Google Drive folder."""
file_metadata = {
"name": os.path.basename(filename),
"parents": [folder_id]
}
media = MediaFileUpload(filename, mimetype=file_type)
self.drive_service.files().create(
body=file_metadata,
media_body=media,
fields="id"
).execute()
def read_svg(self):
"""Read the SVG file saved by the Node.js extractor"""
if not os.path.exists(self.svg_path):
raise FileNotFoundError(
f"SVG file not found at {self.svg_path}. "
"Did you run heatmap_extractor.js first?"
)
with open(self.svg_path, 'r') as f:
return f.read()
'''
def download_video(self, resolution=360):
print("launch video downloading")
ydl_opts = {
'format': f"b[height={resolution}][ext=mp4]",
'quiet': True,
"nopart": True,
'outtmpl': "./download/youtube_video.%(ext)s"
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
self.video_info = ydl.extract_info(self.video_url)
except yt_dlp.utils.DownloadError:
print('interrupt the download')
traceback.print_exc()
return False
return True
'''
def download_video(self, max_height=1080):
"""Downloads a YouTube video with more robust error handling and retries."""
print(f"Launching video download (max height: {max_height}p)")
ydl_opts = {
'format': f'bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]',
'quiet': False,
'nopart': True,
'outtmpl': "./download/youtube_video.%(ext)s",
'retries': 10,
'fragment_retries': 10,
'skip_unavailable_fragments': False,
'continuedl': True,
'progress_hooks': [lambda d: print(f"Download progress: {d.get('status', 'unknown')}")],
# Remove post-processors to avoid immediate MP4 conversion
'postprocessors': []
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
self.video_info = ydl.extract_info(self.video_url, download=True)
self.downloaded_video = ydl.prepare_filename(self.video_info)
return True
except Exception as e:
print(f"Unexpected error during download: {str(e)}")
traceback.print_exc()
return False
def extract_points_from_svg(self):
"""
Extract engagement points from SVG, handling multiple chapters if present.
Returns normalized points that account for chapter positions in the 0-1000 range.
"""
print("Extracting points from SVG")
svg = self.read_svg()
soup = BeautifulSoup(svg, "lxml")
chapters = soup.find_all("div", class_="ytp-heat-map-chapter")
if not chapters:
logging.warning("No chapters found in SVG data")
return []
all_points = []
total_width = 0
for i, chapter in enumerate(chapters):
style = chapter.get('style', '')
width_str = style.split('width:')[1].split('px')[0].strip() if 'width:' in style else '0'
width = float(width_str)
total_width += width
print(f'Chapter {i}: Width = {width}px')
accumulated_width = 0
for chapter in chapters:
style = chapter.get('style', '')
width_str = style.split('width:')[1].split('px')[0].strip() if 'width:' in style else '0'
width = float(width_str)
path = chapter.find("path")
if not path:
continue
d = path.get("d")
coordinates = d.replace("M ", "").replace("C ", "").split(" ")
chapter_points = []
for c in coordinates[1:]:
try:
x, y = c.split(",")
x_coordinate = float(x)
y_coordinate = float(y)
chapter_position = (accumulated_width / total_width) * 1000
chapter_width_normalized = (width / total_width) * 1000
normalized_x = chapter_position + (x_coordinate * chapter_width_normalized / 1000)
chapter_points.append((normalized_x, y_coordinate))
except (ValueError, IndexError) as e:
logging.warning(f"Skipping invalid coordinate: {c} - {str(e)}")
all_points.extend(chapter_points)
accumulated_width += width
all_points.sort(key=lambda p: p[0])
if all_points:
x_coords, y_coords = zip(*all_points)
y_coords = [90.0 - y for y in y_coords]
y_coords = [y if y >= 0.0 else 0.0 for y in y_coords]
return list(zip(x_coords, y_coords))
return []
def find_peaks(self, points, threshold=50.0, min_distance=20):
"""
Find peaks while ensuring they're separated by at least min_distance points.
This helps avoid detecting multiple peaks that are too close together.
"""
peaks = []
last_peak_index = -min_distance
for i in range(1, len(points)-1):
prev_y = points[i-1][1]
curr_y = points[i][1]
next_y = points[i+1][1]
if (curr_y > prev_y and curr_y > next_y and
curr_y >= threshold and
i - last_peak_index >= min_distance):
peaks.append(points[i])
last_peak_index = i
return peaks
def plot_data(self, points, peaks, output_path="heatmap_analysis.png"):
"""Plot and save the heatmap visualization"""
x_coords, y_coords = zip(*points)
plt.figure(figsize=(12, 6))
plt.plot(x_coords, y_coords, marker="+", label='Engagement')
threshold = 50.0
plt.axhline(y=threshold, color='r', linestyle='--', label='Threshold')
if peaks:
peak_x, peak_y = zip(*peaks)
plt.scatter(peak_x, peak_y, marker="o", color='red', label='Peaks')
else:
logging.warning("No peaks detected in the heatmap data")
plt.xlabel('Video Position')
plt.ylabel('Replay Intensity')
plt.title(f'YouTube Video Heatmap Analysis\n{os.path.basename(self.video_url)}')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
def peaks_to_time(self, x):
"""Convert x coordinate (0-1000) to video timestamp in seconds"""
if not self.video_info.get('duration'):
raise ValueError("Video info not available. Did you run download_video first?")
return (x * self.video_info["duration"]) / 1000
def extract_part(self, x_coord, y_coord, name):
"""Extract video segment around the peak timestamp"""
if not self.video_info.get('duration'):
raise ValueError("Video not downloaded. Run download_video first.")
timestamp = self.peaks_to_time(x_coord)
window_size = 15
start_time = max(0, timestamp - (window_size))
end_time = min(self.video_info["duration"], start_time + (window_size * 1.8))
os.makedirs("peaks", exist_ok=True)
input_ext = os.path.splitext(self.downloaded_video)[1]
temp_output = f"peaks/{name}_temp{input_ext}"
final_output = f"peaks/{name}.mp4"
try:
print(f"Extracting segment from {start_time:.2f}s to {end_time:.2f}s")
ffmpeg_extract_cmd = [
'ffmpeg', '-y',
'-i', self.downloaded_video,
'-ss', f"{start_time:.2f}",
'-t', f"{end_time - start_time:.2f}",
'-c', 'copy',
temp_output
]
subprocess.run(ffmpeg_extract_cmd, check=True, capture_output=True)
ffmpeg_convert_cmd = [
'ffmpeg', '-y',
'-i', temp_output,
'-c:v', 'libx264',
'-c:a', 'aac',
'-preset', 'fast',
final_output
]
subprocess.run(ffmpeg_convert_cmd, check=True, capture_output=True)
if os.path.exists(temp_output):
os.remove(temp_output)
if not os.path.exists(final_output) or os.path.getsize(final_output) == 0:
raise ValueError(f"Failed to create valid video file at {final_output}")
except Exception as e:
print(f"Error extracting segment: {str(e)}")
if os.path.exists(temp_output):
os.remove(temp_output)
raise
def smooth_points(self, points, window_size=5):
"""
Smooths the data using a simple moving average.
The window_size parameter determines how much smoothing occurs -
larger values create more smoothing but might miss important details.
Args:
points: List of (x,y) coordinate tuples from the heatmap data
window_size: Number of points to consider in the moving average window
Returns:
List of (x,y) coordinates with smoothed y values
"""
x_coords, y_coords = zip(*points)
y_coords = list(y_coords)
smoothed_y = []
for i in range(len(y_coords)):
start = max(0, i - window_size//2)
end = min(len(y_coords), i + window_size//2 + 1)
window_average = sum(y_coords[start:end]) / (end - start)
smoothed_y.append(window_average)
return list(zip(x_coords, smoothed_y))
def process_video_package(self, svg_path, metadata):
"""Process a single video package (SVG + metadata)"""
try:
self.svg_path = svg_path
self.video_url = metadata['video_url']
self.video_info = {}
if not self.download_video(max_height=360):
raise Exception("Failed to download video")
folder_name = metadata.get('video_title', 'youtube_clips')
folder_id = self.create_drive_folder(folder_name)
logging.info(f"Created Google Drive folder: {folder_name}")
points = self.extract_points_from_svg()
if not points:
raise ValueError("No valid heatmap data found in SVG")
points = self.smooth_points(points)
peaks = self.find_peaks(points)
if not peaks:
logging.warning("No significant peaks found in the heatmap data")
heatmap_path = f"peaks/heatmap_{os.path.basename(svg_path)}.png".replace(".svg", "")
self.plot_data(points, peaks, output_path=heatmap_path)
logging.info("Uploading heatmap visualization to Google Drive...")
self.upload_to_drive(heatmap_path, folder_id, file_type="image/png")
os.remove(heatmap_path)
successful_extracts = 0
for i, peak in enumerate(peaks):
try:
clip_path = f"peaks/peak_{i}.mp4"
self.extract_part(peak[0], peak[1], f"peak_{i}")
logging.info(f"Uploading clip {i} to Google Drive...")
self.upload_to_drive(clip_path, folder_id, file_type="video/mp4")
successful_extracts += 1
logging.info(f"Successfully extracted and uploaded peak {i} at position {peak[0]} ({self.peaks_to_time(peak[0]):.2f}s)")
os.remove(clip_path)
except Exception as e:
logging.error(f"Failed to extract peak {i}: {e}")
logging.info(f"Extraction complete: {successful_extracts} of {len(peaks)} clips extracted and uploaded successfully")
try:
os.remove("./download/youtube_video.webm")
logging.info("Cleaned up downloaded video file")
except Exception as e:
logging.warning(f"Could not remove downloaded video: {e}")
return True
except Exception as e:
logging.error(f"Error processing video package: {e}")
traceback.print_exc()
raise
def main():
"""Start the SVG processor service"""
try:
processor = SVGProcessor()
logging.info("Starting SVG processor service...")
processor.start()
except KeyboardInterrupt:
logging.info("Service stopped by user")
except Exception as e:
logging.error(f"Service error: {str(e)}")
raise
if __name__ == "__main__":
main()
```
## Bonus Script
- In the event you want to manually clip a section of a YouTube video:
```python
# manual_clip.py
# clip youtube videos and upload to google drive
# input: url, start, end, quality, title(optional)
import yt_dlp
import os
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
import subprocess
import argparse
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('clipper.log'),
logging.StreamHandler()
]
)
class YouTubeClipper:
def __init__(self):
self.drive_service = self.authenticate_google_drive()
os.makedirs("temp_downloads", exist_ok=True)
def authenticate_google_drive(self):
"""Handle Google Drive authentication process."""
SCOPES = ["https://www.googleapis.com/auth/drive"]
creds = None
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
# Handle credential refresh or new authentication
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
"credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
with open("token.json", "w") as token:
token.write(creds.to_json())
return build("drive", "v3", credentials=creds)
def download_video(self, url, max_height=720):
"""Download the YouTube video at specified quality."""
logging.info(f"Downloading video: {url}")
ydl_opts = {
'format': f'bestvideo[height<={max_height}]+bestaudio/best[height<={max_height}]',
'quiet': False,
'nopart': True,
'outtmpl': "temp_downloads/%(title)s.%(ext)s",
'retries': 3
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
return ydl.prepare_filename(info), info.get('title')
except Exception as e:
logging.error(f"Download failed: {str(e)}")
raise
def clip_video(self, input_path, start_time, end_time, output_path):
"""Extract the clip using FFmpeg."""
logging.info(f"Extracting clip from {start_time}s to {end_time}s")
try:
temp_output = f"{output_path}_temp.mp4"
extract_cmd = [
'ffmpeg', '-y',
'-i', input_path,
'-ss', str(start_time),
'-t', str(end_time - start_time),
'-c', 'copy',
temp_output
]
subprocess.run(extract_cmd, check=True, capture_output=True)
convert_cmd = [
'ffmpeg', '-y',
'-i', temp_output,
'-c:v', 'libx264',
'-c:a', 'aac',
'-preset', 'fast',
output_path
]
subprocess.run(convert_cmd, check=True, capture_output=True)
if os.path.exists(temp_output):
os.remove(temp_output)
return True
except subprocess.CalledProcessError as e:
logging.error(f"FFmpeg error: {e.stderr.decode()}")
raise
except Exception as e:
logging.error(f"Clipping error: {str(e)}")
raise
def upload_to_drive(self, file_path, title=None):
"""Upload the clip to Google Drive."""
logging.info("Uploading clip to Google Drive")
try:
folder_name = "YouTube Clips"
folder_id = self.get_or_create_folder(folder_name)
file_metadata = {
'name': title or os.path.basename(file_path),
'parents': [folder_id]
}
media = MediaFileUpload(file_path, mimetype='video/mp4')
file = self.drive_service.files().create(
body=file_metadata,
media_body=media,
fields='id'
).execute()
logging.info(f"Upload complete. File ID: {file.get('id')}")
return file.get('id')
except Exception as e:
logging.error(f"Upload failed: {str(e)}")
raise
def get_or_create_folder(self, folder_name):
"""Get or create a folder in Google Drive."""
results = self.drive_service.files().list(
q=f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false",
spaces='drive',
fields='files(id)'
).execute()
if results.get('files'):
return results['files'][0]['id']
folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}
folder = self.drive_service.files().create(
body=folder_metadata,
fields='id'
).execute()
return folder.get('id')
def cleanup(self, file_path):
"""Remove temporary files."""
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logging.warning(f"Cleanup error: {str(e)}")
def main():
parser = argparse.ArgumentParser(description='Clip YouTube videos and upload to Google Drive')
parser.add_argument('url', help='YouTube video URL')
parser.add_argument('start', type=float, help='Start time in seconds')
parser.add_argument('end', type=float, help='End time in seconds')
parser.add_argument('--quality', type=int, default=720, help='Maximum video height (default: 720)')
parser.add_argument('--title', help='Custom title for the clip (optional)')
args = parser.parse_args()
clipper = YouTubeClipper()
try:
input_path, video_title = clipper.download_video(args.url, args.quality)
clip_title = args.title or f"{video_title}_clip_{args.start}-{args.end}"
output_path = f"temp_downloads/{clip_title}.mp4"
clipper.clip_video(input_path, args.start, args.end, output_path)
clipper.upload_to_drive(output_path, clip_title)
clipper.cleanup(input_path)
clipper.cleanup(output_path)
logging.info("Process completed successfully")
except Exception as e:
logging.error(f"Process failed: {str(e)}")
raise
if __name__ == "__main__":
main()
#python manual_clip.py "https://youtube.com/watch?v=VIDEO_ID" 30 45 --quality 720 --title "My Clip"
```