## Scraping XHR Requests The following architecture can be employed to automate data collection from a source where the data is loaded dynamically via XHR (XMLHttpRequest) requests. This is commonly seen with lazy-loaded data like social media posts, infinite scrolling pages, or dynamically updated content. ## Mermaid Diagram ![[Extension-mitm-architecture.png]] ## What is this useful for? 1. Social media platforms (e.g., Twitter, Instagram, Youtube) where posts and comments are loaded as the user scrolls down. 2. E-commerce websites that load more products as the user browses through categories. 3. News websites with "load more" buttons or automatic content loading. 4. Discussion forums that dynamically fetch new comments or threads. 5. Search result pages that load additional results without changing the URL. 6. Streaming platforms that continuously load new content recommendations. 7. Stock market or cryptocurrency tracking sites that update data in real-time. 8. Weather applications that update forecasts periodically without full page reloads. ## Youtube Comment Scraper Example Script The following script can be run to process intercepted requests by running : ```bash mitmdump -s process_data.py ``` ```python #process_data.py import json import os from urllib.parse import urlparse from datetime import datetime from process_output import process_file import pandas as pd import json from datetime import datetime import traceback from dotenv import load_dotenv import pandas as pd from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta import pytz load_dotenv() def parse_published_time(published_time): current_time = datetime.now() time_parts = published_time.split() number = int(time_parts[0]) unit = time_parts[1] if 'minute' in unit: return current_time - timedelta(minutes=number) elif 'hour' in unit: return current_time - timedelta(hours=number) elif 'day' in unit: return current_time - timedelta(days=number) elif 'month' in unit: return current_time - relativedelta(months=number) elif 'year' in unit: return current_time - relativedelta(years=number) else: return current_time # Default to current time if something unexpected occurs # automatically infer the substring after the top level domain (ex: google.com/ajax -> ajax) def get_substring_after_tld(url): parsed_url = urlparse(url) tld_index = parsed_url.netloc.rfind('.') if tld_index != -1: return parsed_url.netloc[tld_index+1:] + parsed_url.path return '' def sanitize_substring(substring): return substring.replace(':', '_').replace('/', '_').replace('.','_').replace('www', '_').replace('https', '_').replace('http', '_').replace('?','_') with open('config.json', 'r') as f: config = json.load(f) target_urls = config['xhr_urls'] target_strs = [target_str for target_str in target_urls] target_str = target_strs[0] substring_after_tld = get_substring_after_tld(target_str) target_dir_str = sanitize_substring(substring_after_tld) out_dir = f'output/{target_dir_str}' if not os.path.exists(out_dir): os.makedirs(out_dir) # Global variable to store the last tracked URL last_url = None video_id = None def extract_comment_info(comment_entity_payload): try: # Extracting properties properties = comment_entity_payload['properties'] author = comment_entity_payload['author'] toolbar = comment_entity_payload['toolbar'] print(f'=====================================\nToolbar: {toolbar}\n=====================================\n') like_count = toolbar['likeCountNotliked'] reply_count = toolbar['replyCount'] # Extracting individual fields content = properties['content']['content'] comment_id = properties['commentId'] published_time = properties['publishedTime'] reply_level = properties['replyLevel'] # Extracting author details channel_id = author['channelId'] display_name = author['displayName'] avatar_url = author['avatarThumbnailUrl'] is_verified = author['isVerified'] #REASSIGNMENT! author = properties['authorButtonA11y'] print(f"Comment ID: {comment_id}") print(f"Content: {content}") print(f"Published Time: {published_time}") print(f"Reply Level: {reply_level}") print(f"Channel ID: {channel_id}") print(f"Display Name: {display_name}") print(f"Avatar URL: {avatar_url}") print(f"Verified: {'Yes' if is_verified else 'No'}") json_obj = { "comment_id": comment_id, "0": content, "like_count": like_count, "reply_count": reply_count, "author": author, "published_time": published_time, "reply_level": reply_level, "channel_id": channel_id, "display_name": display_name, "avatar_url": avatar_url, "verified": is_verified } return json_obj except Exception as e: print(f"Error extracting information: {e}") traceback.print_exc() def response(flow): global last_url # Declare last_url as global so we can modify it flow_url = flow.request.url if target_str in flow_url.lower(): # print(f'XHR URL: {flow_url}\n\n\n\n\n\n') # Parse the JSON response from the XHR request # print(flow_url) try: response_dict = json.loads(flow.response.text) except Exception: response_dict = { 'content': flow.response.text } # Append the last tracked URL to the response_dict if last_url is not None: response_dict['tracked_url'] = last_url # print(response_dict) # Save the metadata and XHR response to a file if not os.path.exists(out_dir): os.makedirs(out_dir) response_dict['timeOfScrape'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') try: response_dict['videoId'] = str(response_dict).split("watch%253Fv%253D")[1].split('&')[0] response_dict['video_id'] = 'aaa' except: response_dict['videoId'] = 'N/A' pass with open(f'{out_dir}/data.json', 'a') as f: json.dump(response_dict, f) f.write(',\n') youtube_comments_object = process_file('com_youtubei_v1_next/data.json') comments_data = [] for i, comment_object in enumerate(youtube_comments_object): framework_updates = None try: framework_updates = comment_object['frameworkUpdates'] except Exception as e: print(f'Error extracting frameworkUpdates for comment {i}: {e}') continue #print(framework_updates) mutations = framework_updates['entityBatchUpdate']['mutations'] for mutation in mutations: #print(mutation.keys()) if ('payload' not in mutation.keys()): continue #try to get commentEntityPayload comment_entity_payload = None comment_obj = None try: comment_entity_payload = mutation['payload']['commentEntityPayload'] print('commentEntityPayload') comment_obj = extract_comment_info(comment_entity_payload) except Exception as e: print(f'Error extracting commentEntityPayload for comment {i}: {e}') traceback.print_exc() #print(mutation['payload'].keys()) continue comments_data.append(comment_obj) #by this point, we have a comment! continue # Convert to a DataFrame df_comments = pd.DataFrame(comments_data) video_id = youtube_comments_object[0]['videoId'] df_comments['video_id'] = video_id print(df_comments) df_comments['Date'] = df_comments['published_time'].apply(parse_published_time) # # Localize the datetime to UTC df_comments['Date'] = df_comments['Date'].dt.tz_localize('UTC') # # Convert UTC to Arizona Time (MST, which is UTC-7) arizona_tz = pytz.timezone('America/Phoenix') df_comments['Date'] = df_comments['Date'].dt.tz_convert(arizona_tz) # # Extract datetime components df_comments['Day_of_Week'] = df_comments['Date'].dt.day_name() df_comments['Hour_of_Day'] = df_comments['Date'].dt.hour #replace double quotes df_comments['0'] = df_comments['0'].str.replace('"', "'") df_comments.to_csv(f'scraped_comments/{video_id}_comments.csv', index=False) ``` In this way, we're processing the scraped data in real time as the requests come through our proxy! By doing this, we can scrape and process dynamically loaded data from ajax requests from our remote browser. In the above example, we captured the requests that are sent as the browser scrolls down the page on a YouTube video, and extracted the comment metadata. To implement this on a server, you can orchestrate this via a browser extension as described in [[Chrome Extension-Based Dynamic Content Scraping System]], or simply setup a Puppeteer script to automate the browser tasks (navigate, scroll, click, etc).