## What is this? This is an automated pipeline that collects and analyzes YouTube comments across multiple states, currently focused on traffic and road condition discussions from local news channels. The system automatically: - Discovers relevant videos based on configurable keywords - Extracts comments using browser automation - Architecture as seen in the [[Youtube Transcript & Comment Scraping Pipeline]] - Maps data to geographic regions (states) - Generates insights on engagement, sentiment, and trending topics [Live Demo](https://comment-cow.vercel.app/) [Github Repo (Frontend)](https://github.com/jarrettdev/CommentCow) [Github Repo (Backend)](https://github.com/jarrettdev/Youtube-Comment-Analysis-Pipeline) ![[commentcow_map.png]] *These insights can then be fetched from a frontend to display a map that allows users to view relevant conversations/comments within each state.* The pipeline runs continuously, storing data in MongoDB and updating analyses automatically. While it's currently tracking traffic-related discussions, the system is topic-agnostic and can be adapted to ==monitor any niche== by adjusting the target channels and keywords. **Key Features:** - Geographic data attribution - Real-time monitoring and collection - Scalable data processing - Automated insight generation - Topic-agnostic design This infrastructure enables detailed analysis of public sentiment and engagement across different regions, providing valuable insights into how different communities discuss and react to specific topics. --- ## process_stream.py - Watches temp directory for new videos that are scraped, triggers comment scraping and analysis before throwing data into MongoDB ```python # process_stream.py ''' watchdog script that watches the temp directory for new videos, scrapes the comments, and upserts the data into MongoDB runs constantly, and will continue to scrape comments for videos that match the filter words ''' import csv import json import os from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from datetime import datetime import pandas as pd import time import subprocess from pymongo import MongoClient from dotenv import load_dotenv load_dotenv() MONGO_URI = os.getenv('MONGO_URI') client = MongoClient(MONGO_URI) db = client['yt_comments'] class VideoDataHandler(FileSystemEventHandler): def __init__(self, master_csv='data/master_videos.csv'): self.master_csv = master_csv self.ensure_master_csv_exists() self.client = MongoClient(MONGO_URI) self.db = self.client['yt_comments'] self.comments_scraped_file = 'data/comments_scraped.json' self.comments_scraped = self.load_comments_scraped() def ensure_master_csv_exists(self): """Create master CSV if it doesn't exist""" if not os.path.exists('data'): os.makedirs('data') if not os.path.exists(self.master_csv): with open(self.master_csv, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['channel_id', 'video_id', 'video_title', 'video_url', 'views', 'timeScraped']) def load_comments_scraped(self): if os.path.exists(self.comments_scraped_file): with open(self.comments_scraped_file, 'r') as f: return json.load(f) else: with open(self.comments_scraped_file, 'w') as f: json.dump({}, f) return {} def save_comments_scraped(self): with open(self.comments_scraped_file, 'w') as f: json.dump(self.comments_scraped, f) def on_created(self, event): if event.is_directory or not event.src_path.endswith('_videos.json'): return try: # Read new video data time.sleep(2) with open(event.src_path, 'r') as f: data = json.load(f) # Read existing master CSV try: df_master = pd.read_csv(self.master_csv) except pd.errors.EmptyDataError: df_master = pd.DataFrame(columns=['channel_id', 'video_id', 'video_title', 'video_url', 'views', 'timeScraped']) # Process new videos new_rows = [] for video in data['videos']: video_data = { 'channel_id': data['channelId'], 'video_id': video['videoId'], 'video_title': video['title'], 'video_url': video['url'], 'views': video['views'], 'timeScraped': video['timeScraped'] } new_rows.append(video_data) #print rows added print(f"Added {len(new_rows)} rows to master CSV") # Upsert to MongoDB self.db.video_data.update_one( {'video_id': video_data['video_id']}, {'$set': video_data}, upsert=True ) df_new = pd.DataFrame(new_rows) # Remove duplicates based on video_id, keeping the newer entry df_combined = pd.concat([df_master, df_new]) df_combined = df_combined.sort_values('timeScraped', ascending=False) df_combined = df_combined.drop_duplicates(subset='video_id', keep='first') # these are the words that will be used to determine which videos to scrape comments for filter_words = ['traffic', 'accident', 'crash', 'road', 'highway', 'construction', 'commute', 'delays'] # we want the videos that contain any of these words in the video title df_combined = df_combined[df_combined['video_title'].str.lower().str.contains(r'(?i)' + '|'.join(filter_words), regex=True)] # Save updated data df_combined.to_csv(self.master_csv, index=False) df_new = df_new.sort_values('timeScraped', ascending=False) df_new = df_new.drop_duplicates(subset='video_id', keep='first') df_new = df_new[ (df_new['video_title'].str.contains(r'(?i)' + '|'.join(filter_words), regex=True)) & (~df_new['video_id'].isin(self.comments_scraped)) ] for _, row in df_new.iterrows(): print("Scraping comments for: ", row['video_title']) # bottleneck here scrape_comments(row['video_id'], row['channel_id']) self.comments_scraped[row['video_id']] = True self.save_comments_scraped() # Clean up processed file os.remove(event.src_path) print(f"Processed and added videos from {data['channelId']}") except Exception as e: print(f"Error processing {event.src_path}: {str(e)}") def __del__(self): self.client.close() self.save_comments_scraped() def scrape_comments(video_id, channel_id): # bottleneck, we're launching a new node process (and browser) for each video comment_scrape_cmd = f"xvfb-run -a node comment_scrape.js https://www.youtube.com/watch?v={video_id}" try: # Set a timeout of 5 minutes (300 seconds) result = subprocess.run(comment_scrape_cmd, shell=True, check=True, capture_output=True, text=True, timeout=300) print(f"Successfully scraped comments for video {video_id}") #remove data.json os.remove('output/com_youtubei_v1_next/data.json') print(f"Output: {result.stdout}") except subprocess.TimeoutExpired: print(f"Timeout occurred while scraping comments for video {video_id}") return except subprocess.CalledProcessError as e: print(f"Error scraping comments for video {video_id}: {e}") #remove data.json os.remove('output/com_youtubei_v1_next/data.json') print(f"Error output: {e.output}") return except Exception as e: print(f"Unexpected error occurred while scraping comments for video {video_id}: {str(e)}") return # Move the scraped comments to the appropriate directory output_dir = f"data/{channel_id}/{video_id}" os.makedirs(output_dir, exist_ok=True) source_file = f"transcript_scrape/scraped_comments/{video_id}_comments.csv" destination_file = f"{output_dir}/comments.csv" if os.path.exists(source_file): os.rename(source_file, destination_file) print(f"Moved comments file to {destination_file}") else: print(f"Warning: Comments file not found at {source_file}") def main(): # Set up the watchdog observer observer = Observer() event_handler = VideoDataHandler() # Start watching the temp directory observer.schedule(event_handler, 'temp', recursive=False) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": main() ``` ## comment_scrape.js - Script that's called to scrape comments on a single video ```python //comment_scrape.js /* script that scrapes the comments from a youtube video */ const puppeteer = require('puppeteer-extra'); const StealthPlugin = require('puppeteer-extra-plugin-stealth'); const { spawn, exec } = require('child_process'); const fs = require('fs'); const fsp = require('fs').promises; const { executablePath } = require('puppeteer') // Apply the Stealth plugin puppeteer.use(StealthPlugin()); // Function to handle sleep function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } // Main function to run the puppeteer script async function run() { if (!fs.existsSync('transcript_scrape/comments.csv')){ fs.writeFile('transcript_scrape/comments.csv', '', function () { console.log('') }) } if (!fs.existsSync('output/com_youtubei_v1_next/data.json')){ fs.writeFile('output/com_youtubei_v1_next/data.json', '', function () { console.log('') }) } try { // launching browser const browser = await puppeteer.launch({ headless: false, args: [ '--proxy-server=http://localhost:8080', '--no-sandbox', '--ignore-certificate-errors', '--ignore-certificate-errors-spki-list' ], ignoreHTTPSErrors: true }); const page = await browser.newPage(); // okay browser launched... // url is read in here via command line const targetUrl = process.argv[2]; if (!targetUrl) { console.error('Please provide a YouTube video URL as an argument'); process.exit(1); } console.log(`Visiting ${targetUrl}`); await page.goto(targetUrl, { waitUntil: 'networkidle2' }); const scrollDown = async (count) => { for (let i = 0; i < count; i++) { await page.evaluate(() => { window.scrollBy(0, window.innerHeight); }); await sleep(3000); } }; // expand all comments "show more" const clickAllButtons = async () => { const buttons = await page.$('button.yt-spec-button-shape-next.yt-spec-button-shape-next--text.yt-spec-button-shape-next--call-to-action.yt-spec-button-shape-next--size-m.yt-spec-button-shape-next--icon-leading.yt-spec-button-shape-next--align-by-text'); for (const button of buttons) { console.log("Clicking button"); await button.click(); //await sleep(100); } }; // Call the function to scroll down await scrollDown(15); // Call the function to click all buttons await clickAllButtons(); //await browser.close(); // Close the browser await scrollDown(2); // Scroll down the page 2 times await browser.close(); // Close the browser console.log("Comment scraping completed successfully."); process.exit(0); // Explicitly exit the process with success code } catch (error) { console.error(error); process.exit(1); // Exit the process with error code } } // Run the script run().catch((error) => { console.error(error); // we're shutting down the browser here // keep in mind, we've only read in one url by this point... bottleneck!! process.exit(1); }); ``` ## xhr_scrape.py - Script that intercepts requests made by the comment_scrape.js script. Requests are wrangled before being saved into files ```python #xhr_scrape_ds.py ''' script that intercepts the xhr requests from the youtube api, performs some data wrangling, and saves the data to a file ''' import json import os import subprocess from urllib.parse import urlparse from datetime import datetime from process_output import process_file import pandas as pd import json from datetime import datetime import traceback #getenv from dotenv import load_dotenv import pandas as pd from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta import pytz #TODO: add a way to get the videoId from the xhr request load_dotenv() def parse_published_time(published_time): current_time = datetime.now() time_parts = published_time.split() number = int(time_parts[0]) unit = time_parts[1] if 'minute' in unit: return current_time - timedelta(minutes=number) elif 'hour' in unit: return current_time - timedelta(hours=number) elif 'day' in unit: return current_time - timedelta(days=number) elif 'month' in unit: return current_time - relativedelta(months=number) elif 'year' in unit: return current_time - relativedelta(years=number) else: return current_time # Default to current time if something unexpected occurs def get_substring_after_tld(url): parsed_url = urlparse(url) tld_index = parsed_url.netloc.rfind('.') # Find the last dot in the netloc part if tld_index != -1: return parsed_url.netloc[tld_index+1:] + parsed_url.path return '' # Return empty string if TLD is not found def sanitize_substring(substring): return substring.replace(':', '_').replace('/', '_').replace('.','_').replace('www', '_').replace('https', '_').replace('http', '_').replace('?','_') with open('config.json', 'r') as f: config = json.load(f) # the xhr url is read in from config.json # this is to handle the case where the api makes a change to the url target_urls = config['xhr_urls'] target_strs = [target_str for target_str in target_urls] target_str = target_strs[0] substring_after_tld = get_substring_after_tld(target_str) target_dir_str = sanitize_substring(substring_after_tld) out_dir = f'output/{target_dir_str}' if not os.path.exists(out_dir): os.makedirs(out_dir) # Global variable to store the last tracked URL last_url = None video_id = None def extract_comment_info(comment_entity_payload): try: # Extracting properties properties = comment_entity_payload['properties'] author = comment_entity_payload['author'] toolbar = comment_entity_payload['toolbar'] print(f'=====================================\nToolbar: {toolbar}\n=====================================\n') like_count = toolbar['likeCountNotliked'] reply_count = toolbar['replyCount'] # Extracting individual fields content = properties['content']['content'] comment_id = properties['commentId'] published_time = properties['publishedTime'] reply_level = properties['replyLevel'] # Extracting author details channel_id = author['channelId'] display_name = author['displayName'] avatar_url = author['avatarThumbnailUrl'] is_verified = author['isVerified'] #REASSIGNMENT! author = properties['authorButtonA11y'] # Print extracted information print(f"Comment ID: {comment_id}") print(f"Content: {content}") print(f"Published Time: {published_time}") print(f"Reply Level: {reply_level}") print(f"Channel ID: {channel_id}") print(f"Display Name: {display_name}") print(f"Avatar URL: {avatar_url}") print(f"Verified: {'Yes' if is_verified else 'No'}") json_obj = { "comment_id": comment_id, "0": content, "like_count": like_count, "reply_count": reply_count, "author": author, "published_time": published_time, "reply_level": reply_level, "channel_id": channel_id, "display_name": display_name, "avatar_url": avatar_url, "verified": is_verified } return json_obj except Exception as e: print(f"Error extracting information: {e}") traceback.print_exc() def response(flow): global last_url # Declare last_url as global so we can modify it #global video_id flow_url = flow.request.url if target_str in flow_url.lower(): # print(f'XHR URL: {flow_url}\n\n\n\n\n\n') # Parse the JSON response from the XHR request # print(flow_url) try: response_dict = json.loads(flow.response.text) except Exception: response_dict = { 'content': flow.response.text } # Append the last tracked URL to the response_dict if last_url is not None: response_dict['tracked_url'] = last_url # print(response_dict) # Save the metadata and XHR response to a file if not os.path.exists(out_dir): os.makedirs(out_dir) response_dict['timeOfScrape'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: response_dict['videoId'] = str(response_dict).split("watch%253Fv%253D")[1].split('&')[0] response_dict['video_id'] = 'aaa' except: response_dict['videoId'] = 'N/A' pass with open(f'{out_dir}/data.json', 'a') as f: json.dump(response_dict, f) f.write(',\n') youtube_comments_object = process_file('com_youtubei_v1_next/data.json') comments_data = [] # Iterate through the youtube_comments_object for i, comment_object in enumerate(youtube_comments_object): framework_updates = None try: framework_updates = comment_object['frameworkUpdates'] except Exception as e: # try legacy comments print(f'Error extracting frameworkUpdates for comment {i}: {e}') continue #print(framework_updates) mutations = framework_updates['entityBatchUpdate']['mutations'] for mutation in mutations: #print(mutation.keys()) if ('payload' not in mutation.keys()): continue #try to get commentEntityPayload comment_entity_payload = None comment_obj = None try: comment_entity_payload = mutation['payload']['commentEntityPayload'] print('commentEntityPayload') comment_obj = extract_comment_info(comment_entity_payload) except Exception as e: print(f'Error extracting commentEntityPayload for comment {i}: {e}') traceback.print_exc() #print(mutation['payload'].keys()) continue comments_data.append(comment_obj) #by this point, we have a comment! continue # Convert to a DataFrame df_comments = pd.DataFrame(comments_data) #video_id = youtube_comments_object[0]['videoId'] #make video_id the first item in data.json ['videoId'] video_id = youtube_comments_object[0]['videoId'] df_comments['video_id'] = video_id print(df_comments) df_comments['Date'] = df_comments['published_time'].apply(parse_published_time) # # Localize the datetime to UTC df_comments['Date'] = df_comments['Date'].dt.tz_localize('UTC') # # Convert UTC to Arizona Time (MST, which is UTC-7) arizona_tz = pytz.timezone('America/Phoenix') df_comments['Date'] = df_comments['Date'].dt.tz_convert(arizona_tz) # # Extract datetime components df_comments['Day_of_Week'] = df_comments['Date'].dt.day_name() df_comments['Hour_of_Day'] = df_comments['Date'].dt.hour #replace double quotes df_comments['0'] = df_comments['0'].str.replace('"', "'") if not os.path.exists('transcript_scrape/scraped_comments'): os.makedirs('transcript_scrape/scraped_comments') df_comments.to_csv(f'/root/snap/misc/general_scrape/scrape/transcript_scrape/scraped_comments/{video_id}_comments.csv', index=False) print(f'Saved comments for {video_id} to transcript_scrape/scraped_comments/{video_id}_comments.csv') ``` ## analysis.py - Computes simple statistics on comments before chucking data into Mongo ```python ''' script that performs analysis (keyword, sentiment, engagement, top comments) on videos and throws the results into MongoDB ''' #%% import pandas as pd import os from glob import glob import matplotlib.pyplot as plt import json from comment_analysis import CommentAnalyzer from pymongo import MongoClient from dotenv import load_dotenv from datetime import datetime # Load environment variables load_dotenv() # MongoDB Atlas connection MONGO_URI = os.getenv('MONGO_URI') client = MongoClient(MONGO_URI) # TODO: generalize # database of current domain or niche db = client['yt_comments'] # 2. Load the channel-to-state mapping channel_to_state = pd.read_csv('master_channel_to_states.csv') # 3. Load the master videos CSV videos = pd.read_csv('data/master_videos.csv') # 4. Load and process comment data def load_comments(video_id, channel_id): file_path = f'data/{channel_id}/{video_id}/comments.csv' if os.path.exists(file_path): df = pd.read_csv(file_path) df['video_id'] = video_id return df return None all_comments = [] for _, row in videos.iterrows(): comments = load_comments(row['video_id'], row['channel_id']) if comments is not None: all_comments.append(comments) print(len(all_comments)) print('Comments loaded') comments = pd.concat(all_comments, ignore_index=True) #%% # 5. Join the data videos_with_state = pd.merge(videos, channel_to_state, on='channel_id', how='left') # Include additional columns like 'video_title' in the merge comments_with_video = pd.merge(comments, videos_with_state[['video_id', 'state', 'video_title','video_url','views']], on='video_id', how='left') #%% #we need to convert like_count to int and handle periods, commas, and other non-numeric characters (k, M) def convert_to_int(value): if pd.isna(value): return None if str(value).strip() == '': return None value = str(value) value = value.replace(',', '').replace('.', '').replace('k', '000').replace('M', '000000') return int(value) comments_with_video['like_count'] = comments_with_video['like_count'].apply(convert_to_int) #%% # 6. Perform state-level analysis # Example: Top liked comment per state top_comments_by_state = comments_with_video.sort_values('like_count', ascending=False).groupby('state').first() # Print results print(top_comments_by_state[['0', 'like_count']]) # Example: Average likes per comment by state avg_likes_by_state = comments_with_video.groupby('state')['like_count'].mean().sort_values(ascending=False) print("\nAverage likes per comment by state:") print(avg_likes_by_state) # Example: Comment count by state comment_count_by_state = comments_with_video['state'].value_counts() print("\nComment count by state:") print(comment_count_by_state) # %% #most liked comment in each state most_liked_comment_by_state = comments_with_video.sort_values('like_count', ascending=False).groupby('state').first() print(most_liked_comment_by_state[['0', 'like_count']]) # %% # Save the comments_with_video DataFrame to a CSV comments_with_video.to_csv('data/comments_with_video.csv', index=False) # Upsert to MongoDB for _, row in comments_with_video.iterrows(): # Convert the row to a dictionary comment_data = row.to_dict() # Upsert to MongoDB db.comments_with_video.update_one( {'comment_id': comment_data['comment_id']}, {'$set': comment_data}, upsert=True ) # Perform comment analysis # TODO: generalize analyzer = CommentAnalyzer(comments_with_video) # Get all analyses keyword_analysis = analyzer.analyze_keywords_by_state() sentiment_analysis = pd.DataFrame(list(analyzer.get_sentiment_by_state())) engagement_metrics = analyzer.get_engagement_metrics() top_comments_by_state = analyzer.get_top_comments_by_state() top_comments_by_video = analyzer.get_top_comments_by_video() # Combine all metrics into a single state-level summary state_summary = {} for state in comments_with_video['state'].unique(): if pd.isna(state): continue state_summary[state] = { 'keywords': keyword_analysis.get(state, {}), 'sentiment': sentiment_analysis[sentiment_analysis['state'] == state].to_dict('records')[0], 'engagement': engagement_metrics.loc[state].to_dict(), 'top_comments': [comment for comment in top_comments_by_state if comment['state'] == state] } # Create a video-level summary video_summary = {} for video_id in comments_with_video['video_id'].unique(): video_summary[video_id] = { 'top_comments': [comment for comment in top_comments_by_video if comment['video_id'] == video_id] } # Save results to MongoDB db.state_analysis.delete_many({}) # Clear existing data db.state_analysis.insert_many([{'state': k, **v} for k, v in state_summary.items()]) db.video_analysis.delete_many({}) # Clear existing data db.video_analysis.insert_many([{'video_id': k, **v} for k, v in video_summary.items()]) # Generate visualizations #analyzer.generate_visualizations(keyword_analysis) # %% #visualize top comments by state for state in comments_with_video['state'].unique(): top_comments = [comment for comment in top_comments_by_state if comment['state'] == state] print(f"Top comments for {state}:") for comment in top_comments: print(f" - {comment['0']} (Likes: {comment['like_count']})") #print current time in human readable format print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # %% ``` ## master_channels_to_states.csv - Where you manually link target channels to locations (US States in this case) ```python channel_id,state abc15,Arizona wfaa,Texas @Wfaa8,Texas @fox6milwaukee,Wisconsin @ABC13Houston,Texas @fox5atlanta,Georgia @11Alive,Georgia @WFRVLocal5,Wisconsin @wxyztvdetroit,Michigan @6abcActionNews,Pennsylvania @13ONYOURSIDE,Michigan @kxan_news,Texas @CBS17,North Carolina @ABC11WTVD,North Carolina @News6WKMG,Florida @WPLGLocal10,Florida @News4JAX,Florida @WRAL5,North Carolina ``` ## start_comment_scraper.sh - Bash script to start the whole system ```sh #!/bin/bash # start_comment_scraper.sh # Command to start the first screen session and run the mitmdump command. Intercepts traffic and runs the script xhr_scrape_ds.py screen -dmS comment_interceptor bash -c 'cd /misc/general_scrape/scrape; mitmdump -s xhr_scrape_ds.py' # start the stream that processes new videos screen -dmS video_stream bash -c 'cd /misc/general_scrape/scrape; python3 process_stream.py' # run index.js (this whole system) from a scheduled crontab (every 375 minutes) */375 * * * * cd /misc/general_scrape/scrape && xvfb-run -a /root/.nvm/versions/node/v21.6.2/bin/node index.js && /usr/bin/python3 analysis.py >> output.log 2>&1 ```