![[youtubedemo.gif]]
# What is this?
Provided a list of YouTube channels, this system will find the most recent uploads from those channels, and grab the video transcripts and comments. The data is then wrangled into organized CSVs. You can use this pipeline to train NLP models/LLMs, identify trending topics and narratives, or create knowledge bases.
I use this system to aggregate LeetCode tutorials that help me prepare for technical interviews, essentially transforming scattered video content into a structured learning resource. Other interesting use cases or niches could be real estate listings, news and weather forecasts, and entertainment.
The neat part about this script is that it's lightweight. We don't need to spin up a remote browser. This script leverages curl_cffi to make requests without a browser, while mimicking browser fingerprints.
# Code
youtube_channel_scraper.py handles the initial data collection:
1. Reads a list of YouTube channel IDs from `channels.txt`
2. For each channel, it visits the channel's video page
3. Extracts video information (title, views, URL, etc.) from the initial page
4. Follows YouTube's "load more" pagination system by using continuation tokens
5. All collected video data is saved as JSON files in a `temp` directory (one file per channel)
```python
#youtube_channel_scraper.py
import traceback
from curl_cffi import requests as curl_requests
import json
import time
import random
import os
from datetime import datetime
import re
class YouTubeChannelScraper:
def __init__(self):
self.session = curl_requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': '*/*',
'Content-Type': 'application/json',
'Origin': 'https://www.youtube.com',
'Referer': 'https://www.youtube.com/'
})
def get_initial_data(self, channel_id):
"""
fetches initial page data... looks for the continuation token in the grid renderer.
yt structures their initial channel page data differently from subsequent requests.
"""
if channel_id.startswith('@'):
url = f'https://www.youtube.com/{channel_id}/videos'
elif channel_id.startswith('UC'):
url = f'https://www.youtube.com/channel/{channel_id}/videos'
else:
url = f'https://www.youtube.com/c/{channel_id}/videos'
response = self.session.get(url, impersonate="chrome120")
if response.status_code != 200 and not channel_id.startswith('@'):
url = f'https://www.youtube.com/@{channel_id}/videos'
response = self.session.get(url, impersonate="chrome120")
api_key = re.search(r'"INNERTUBE_API_KEY":"([^"]+)"', response.text)
client_version = re.search(r'"clientVersion":"([^"]+)"', response.text)
visitor_data = re.search(r'"visitorData":"([^"]+)"', response.text)
initial_data_match = re.search(r'ytInitialData\s*=\s*({.+?});', response.text)
continuation_token = None
if initial_data_match:
try:
initial_data = json.loads(initial_data_match.group(1))
tab_contents = (
initial_data.get('contents', {})
.get('twoColumnBrowseResultsRenderer', {})
.get('tabs', [])
)
for tab in tab_contents:
if tab.get('tabRenderer', {}).get('title') == 'Videos':
grid_contents = (
tab.get('tabRenderer', {})
.get('content', {})
.get('richGridRenderer', {})
.get('contents', [])
)
if grid_contents:
last_item = grid_contents[-1]
if 'continuationItemRenderer' in last_item:
continuation_token = (
last_item['continuationItemRenderer']
.get('continuationEndpoint', {})
.get('continuationCommand', {})
.get('token')
)
if continuation_token:
print(f"Found initial continuation token: {continuation_token[:30]}...")
break
except Exception as e:
print(f"Error parsing initial data: {e}")
browse_id = re.search(r'"browseId":"([^"]+)"', response.text)
return {
'api_key': api_key.group(1) if api_key else None,
'continuation': continuation_token,
'client_version': client_version.group(1) if client_version else None,
'visitor_data': visitor_data.group(1) if visitor_data else None,
'browse_id': browse_id.group(1) if browse_id else None # Extract the actual string
}
def get_continuation_token(self, data):
"""
the API can return the continuation token in several different structures:
1. in the 'contents' section for initial page loads
2. in the grid renderer for video listings
3. in the tabs section for paginated responses
"""
try:
# check the contents structure for init page load
if 'contents' in data:
contents = (
data.get('contents', {})
.get('twoColumnBrowseResultsRenderer', {})
.get('tabs', [])
)
for tab in contents:
tab_renderer = tab.get('tabRenderer', {})
if tab_renderer.get('title') == 'Videos':
grid_contents = (
tab_renderer
.get('content', {})
.get('richGridRenderer', {})
.get('contents', [])
)
if grid_contents:
# check each item for a continuation token
for item in grid_contents:
if 'continuationItemRenderer' in item:
continuation_data = (
item['continuationItemRenderer']
.get('continuationEndpoint', {})
.get('continuationCommand', {})
)
token = continuation_data.get('token')
if token:
print(f"Found continuation token in grid: {token[:30]}...")
return token
# check the direct response structure for subsequent API responses
if 'onResponseReceivedEndpoints' in data:
endpoints = data['onResponseReceivedEndpoints']
for endpoint in endpoints:
if 'appendContinuationItemsAction' in endpoint:
items = endpoint['appendContinuationItemsAction'].get('continuationItems', [])
if items:
last_item = items[-1]
if 'continuationItemRenderer' in last_item:
token = (
last_item['continuationItemRenderer']
.get('continuationEndpoint', {})
.get('continuationCommand', {})
.get('token')
)
if token:
print(f"Found continuation token in response: {token[:30]}...")
return token
print("\nResponse structure analysis:")
print("Top-level keys:", list(data.keys()))
if 'contents' in data:
tabs = data['contents'].get('twoColumnBrowseResultsRenderer', {}).get('tabs', [])
print(f"Number of tabs found: {len(tabs)}")
for i, tab in enumerate(tabs):
if 'tabRenderer' in tab:
print(f"Tab {i} title: {tab['tabRenderer'].get('title', 'Unknown')}")
return None
except Exception as e:
print(f"Error extracting continuation token: {e}")
traceback.print_exc()
return None
def create_browse_payload(self, browse_id, visitor_data, client_version, continuation=None):
"""
Creates the payload for the browse endpoint following YouTube's schema exactly.
We need to match their structure precisely to get a valid response.
"""
payload = {
"context": {
"client": {
"hl": "en",
"gl": "US",
"visitorData": visitor_data,
"userAgent": self.session.headers['User-Agent'],
"clientName": "WEB",
"clientVersion": client_version,
"platform": "DESKTOP",
"acceptHeader": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8"
},
"user": {
"lockedSafetyMode": False
},
"request": {
"useSsl": True,
"internalExperimentFlags": [],
"consistencyTokenJars": []
}
}
}
if continuation:
payload["continuation"] = continuation
else:
payload["browseId"] = browse_id
return payload
def extract_videos_from_response(self, data):
"""
Extracts videos from the API response, following the exact structure shown in the schema.
"""
videos = []
try:
# The videos are nested in onResponseReceivedActions
for action in data.get('onResponseReceivedActions', []):
if 'appendContinuationItemsAction' in action:
continuation_items = action['appendContinuationItemsAction'].get('continuationItems', [])
for item in continuation_items:
if 'richItemRenderer' in item:
try:
video_renderer = item['richItemRenderer']['content']['videoRenderer']
# Extract view count properly from the schema structure
view_count_text = video_renderer.get('viewCountText', {}).get('simpleText', '0 views')
views = int(''.join(filter(str.isdigit, view_count_text)))
# Extract title from the runs array as shown in schema
title = video_renderer['title']['runs'][0]['text']
video_details = {
'title': title,
'views': views,
'videoId': video_renderer['videoId'],
'url': f"https://www.youtube.com/watch?v={video_renderer['videoId']}",
'timeScraped': datetime.now().isoformat(),
'publishedTime': video_renderer.get('publishedTimeText', {}).get('simpleText', '')
}
videos.append(video_details)
print(f"Found video: {title}")
except Exception as e:
print(f"Error processing video: {e}")
continue
except Exception as e:
print(f"Error extracting videos: {e}")
traceback.print_exc()
return videos
def extract_video_details_from_data(self, data):
"""
Extracts videos from the API response data.
The videos can appear in two different structures:
4. In the initial page load under richGridRenderer
5. In subsequent responses under onResponseReceivedActions
"""
videos = []
try:
# First, check for videos in the initial page structure
if 'contents' in data:
tabs = data.get('contents', {}).get('twoColumnBrowseResultsRenderer', {}).get('tabs', [])
for tab in tabs:
if tab.get('tabRenderer', {}).get('title') == 'Videos':
grid_contents = (
tab.get('tabRenderer', {})
.get('content', {})
.get('richGridRenderer', {})
.get('contents', [])
)
# Process each video in the grid
for item in grid_contents:
if 'richItemRenderer' in item:
try:
video_renderer = item['richItemRenderer'].get('content', {}).get('videoRenderer', {})
if video_renderer:
# Extract view count from viewCountText
view_count_text = video_renderer.get('viewCountText', {}).get('simpleText', '0 views')
views = int(''.join(filter(str.isdigit, view_count_text)))
video_details = {
'title': video_renderer['title']['runs'][0]['text'],
'views': views,
'url': f"https://www.youtube.com/watch?v={video_renderer['videoId']}",
'videoId': video_renderer['videoId'],
'timeScraped': datetime.now().isoformat()
}
videos.append(video_details)
except Exception as e:
print(f"Error processing video item: {e}")
continue
# Check for videos in the continuation response structure
if 'onResponseReceivedActions' in data:
for action in data['onResponseReceivedActions']:
if 'appendContinuationItemsAction' in action:
items = action['appendContinuationItemsAction'].get('continuationItems', [])
for item in items:
if 'richItemRenderer' in item:
try:
video_renderer = item['richItemRenderer'].get('content', {}).get('videoRenderer', {})
if video_renderer:
view_count_text = video_renderer.get('viewCountText', {}).get('simpleText', '0 views')
views = int(''.join(filter(str.isdigit, view_count_text)))
video_details = {
'title': video_renderer['title']['runs'][0]['text'],
'views': views,
'url': f"https://www.youtube.com/watch?v={video_renderer['videoId']}",
'videoId': video_renderer['videoId'],
'timeScraped': datetime.now().isoformat()
}
videos.append(video_details)
except Exception as e:
print(f"Error processing continuation video: {e}")
continue
print(f"Found {len(videos)} videos in response")
return videos
except Exception as e:
print(f"Error extracting videos: {e}")
traceback.print_exc()
return []
def extract_video_details(self, video_data):
"""
Extracts relevant video information from the API response.
"""
try:
video_renderer = video_data['richItemRenderer']['content']['videoRenderer']
# Extract view count from viewCountText
view_count_text = video_renderer.get('viewCountText', {}).get('simpleText', '0 views')
views = int(''.join(filter(str.isdigit, view_count_text)))
return {
'title': video_renderer['title']['runs'][0]['text'],
'views': views,
'url': f"https://www.youtube.com/watch?v={video_renderer['videoId']}",
'videoId': video_renderer['videoId'],
'timeScraped': datetime.now().isoformat()
}
except Exception as e:
print(f"Error extracting video details: {e}")
return None
def scrape_channel(self, channel_id):
print(f"Processing channel: {channel_id}")
initial_data = self.get_initial_data(channel_id)
if not all([initial_data['api_key'], initial_data['browse_id']]):
print(f"Could not get required tokens for channel: {channel_id}")
return []
videos = []
continuation_token = initial_data['continuation']
page_count = 0
max_pages = 2
while continuation_token and page_count < max_pages:
try:
print(f"\nFetching page {page_count + 1}")
print(f"Using continuation token: {continuation_token[:30]}...")
payload = self.create_browse_payload(
initial_data['browse_id'],
initial_data['visitor_data'],
initial_data['client_version'],
continuation_token
)
# Debug the request payload
#print("Request payload:")
#print(json.dumps(payload, indent=2))
response = self.session.post(
f"https://www.youtube.com/youtubei/v1/browse?key={initial_data['api_key']}",
json=payload,
impersonate="chrome120"
)
if response.status_code != 200:
print(f"Error: Received status code {response.status_code}")
print("Response headers:", dict(response.headers))
try:
error_data = response.json()
print("Error response:", json.dumps(error_data, indent=2))
except:
print("Raw error response:", response.text)
break
if response.status_code == 200:
data = response.json()
# Extract videos from this page
new_videos = self.extract_videos_from_response(data)
videos.extend(new_videos)
print(f"Found {len(new_videos)} videos on this page")
# Get the next continuation token
next_token = None
if 'onResponseReceivedActions' in data:
for action in data['onResponseReceivedActions']:
if 'appendContinuationItemsAction' in action:
continuation_items = action['appendContinuationItemsAction'].get('continuationItems', [])
if continuation_items:
last_item = continuation_items[-1]
if 'continuationItemRenderer' in last_item:
next_token = (
last_item['continuationItemRenderer']
.get('continuationEndpoint', {})
.get('continuationCommand', {})
.get('token')
)
if next_token:
print(f"Found next continuation token: {next_token[:30]}...")
continuation_token = next_token
else:
print("No more continuation tokens found")
break
page_count += 1
time.sleep(2 + random.random() * 3)
else:
print(f"Error: Received status code {response.status_code}")
break
except Exception as e:
print(f"Error fetching page: {e}")
traceback.print_exc()
break
print(f"\nFinished scraping. Total videos found: {len(videos)}")
# Save results
os.makedirs('temp', exist_ok=True)
with open(f'temp/{channel_id}_videos.json', 'w') as f:
json.dump({
'channelId': channel_id,
'videos': videos
}, f, indent=2)
return videos
def main():
with open('channels.txt', 'r') as f:
channels = [line.strip() for line in f if line.strip()]
random.shuffle(channels)
scraper = YouTubeChannelScraper()
for channel_id in channels:
scraper.scrape_channel(channel_id)
time.sleep(2 + random.random() * 3)
if __name__ == "__main__":
main()
```
This script continuously monitors the system for new videos:
1. It watches the `temp` directory for new JSON files created by the first script
2. When a new file appears, it reads the video data
3. It adds these videos to a master database (`master_videos.csv`)
4. It identifies which videos haven't had their comments scraped yet
5. It adds these videos to `video_urls.txt` for processing
6. It marks these videos as "queued for scraping" in its tracking system
7. It cleans up by removing the original JSON file
```python
# process_stream.py
'''
watchdog script that watches the temp directory for new videos, scrapes the comments, and upserts the data into MongoDB
runs constantly, and will continue to scrape comments for videos that match the filter words
'''
import csv
import json
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime
import pandas as pd
import time
import subprocess
import traceback
from comment_scrape import YouTubeCommentScraper
from dotenv import load_dotenv
load_dotenv()
scraper = YouTubeCommentScraper()
class VideoDataHandler(FileSystemEventHandler):
def __init__(self, master_csv='data/master_videos.csv'):
self.master_csv = master_csv
self.ensure_master_csv_exists()
#self.client = MongoClient(MONGO_URI)
#self.db = self.client['yt_comments']
self.comments_scraped_file = 'data/comments_scraped.json'
self.comments_scraped = self.load_comments_scraped()
def ensure_master_csv_exists(self):
"""Create master CSV if it doesn't exist"""
if not os.path.exists('data'):
os.makedirs('data')
if not os.path.exists(self.master_csv):
with open(self.master_csv, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['channel_id', 'video_id', 'video_title',
'video_url', 'views', 'timeScraped'])
def load_comments_scraped(self):
if os.path.exists(self.comments_scraped_file):
with open(self.comments_scraped_file, 'r') as f:
return json.load(f)
else:
with open(self.comments_scraped_file, 'w') as f:
json.dump({}, f)
return {}
def save_comments_scraped(self):
with open(self.comments_scraped_file, 'w') as f:
json.dump(self.comments_scraped, f)
def on_created(self, event):
if event.is_directory or not event.src_path.endswith('_videos.json'):
return
try:
time.sleep(2)
with open(event.src_path, 'r') as f:
data = json.load(f)
try:
df_master = pd.read_csv(self.master_csv)
except pd.errors.EmptyDataError:
df_master = pd.DataFrame(columns=['channel_id', 'video_id',
'video_title', 'video_url',
'views', 'timeScraped'])
new_rows = []
for video in data['videos']:
video_data = {
'channel_id': data['channelId'],
'video_id': video['videoId'],
'video_title': video['title'],
'video_url': video['url'],
'views': video['views'],
'timeScraped': video['timeScraped']
}
new_rows.append(video_data)
#print rows added
#print(f"Added {len(new_rows)} rows to master CSV")
# upsert
'''
self.db.video_data.update_one(
{'video_id': video_data['video_id']},
{'$set': video_data},
upsert=True
)
'''
df_new = pd.DataFrame(new_rows)
print(f'df_new original cardinality: {len(df_new)}')
df_combined = pd.concat([df_master, df_new])
df_combined = df_combined.sort_values('timeScraped', ascending=False)
df_combined = df_combined.drop_duplicates(subset='video_id', keep='first')
df_combined.to_csv(self.master_csv, index=False)
df_new = df_new.sort_values('timeScraped', ascending=False)
df_new = df_new.drop_duplicates(subset='video_id', keep='first')
print(f'df_new deduped cardinality: {len(df_new)}')
df_new = df_new[
# (df_new['video_title'].str.contains(r'(?i)' + '|'.join(filter_words), regex=True))
(~df_new['video_id'].isin(self.comments_scraped))
]
print(f"Scraping {len(df_new)} videos for comments")
for _, row in df_new.iterrows():
# transcript_scraper.py will handle the scraping of the comments
# add video to video_urls.txt
with open('video_urls.txt', 'a') as f:
f.write(f"{row['video_url']},{row['channel_id']}\n")
print("saved video to video_urls.txt: ", row['video_title'])
self.comments_scraped[row['video_id']] = True
self.save_comments_scraped()
# output_dir = f"data/{row['channel_id']}/{row['video_id']}"
# os.makedirs(output_dir, exist_ok=True)
# source_file = f"transcript_scrape/scraped_comments/{row['video_id']}_comments.csv"
# destination_file = f"{output_dir}/comments.csv"
# if os.path.exists(source_file):
# os.rename(source_file, destination_file)
# print(f"Moved comments file to {destination_file}")
# else:
# print(f"Warning: Comments file not found at {source_file}")
os.remove(event.src_path)
print(f"Processed and added videos from {data['channelId']}")
except Exception as e:
print(f"Error processing {event.src_path}: {str(e)}")
traceback.print_exc()
def __del__(self):
self.client.close()
self.save_comments_scraped()
def main():
observer = Observer()
event_handler = VideoDataHandler()
observer.schedule(event_handler, 'temp', recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()
```
This script performs the in-depth content collection:
1. It reads the video URLs from `video_urls.txt` (created by the watchdog)
2. It cross-references these with the master csv to get full video information
3. For each video, it:
- Downloads the video transcript using YouTubeTranscriptApi
- Collects comments using the comment scraper (default of 140 comments)
- Formats this data neatly
4. It saves all collected data in two formats:
- Individual records in `comments_with_video.csv` (for detailed analysis)
- A combined text file of all transcripts (for easier reading)
5. All data is tagged with a label you provide when running the script
```python
#transcript_scraper.py
# script that scrapes transcripts and comments from a list of videos
# %%
import os
import sys
import traceback
from youtube_transcript_api import YouTubeTranscriptApi
from comment_scrape import YouTubeCommentScraper
import re
import pandas as pd
scraper = YouTubeCommentScraper()
def extract_video_id(url):
patterns = [
r'(?:v=|/)([\w-]{11})(?:\?|&|/|$)',
r'^([\w-]{11})