## Scraping XHR Requests
The following architecture can be employed to automate data collection from a source where the data is loaded dynamically via XHR (XMLHttpRequest) requests. This is commonly seen with lazy-loaded data like social media posts, infinite scrolling pages, or dynamically updated content.
## Mermaid Diagram
![[Extension-mitm-architecture.png]]
## What is this useful for?
1. Social media platforms (e.g., Twitter, Instagram, Youtube) where posts and comments are loaded as the user scrolls down.
2. E-commerce websites that load more products as the user browses through categories.
3. News websites with "load more" buttons or automatic content loading.
4. Discussion forums that dynamically fetch new comments or threads.
5. Search result pages that load additional results without changing the URL.
6. Streaming platforms that continuously load new content recommendations.
7. Stock market or cryptocurrency tracking sites that update data in real-time.
8. Weather applications that update forecasts periodically without full page reloads.
## Youtube Comment Scraper Example Script
The following script can be run to process intercepted requests by running :
```bash
mitmdump -s process_data.py
```
```python
#process_data.py
import json
import os
from urllib.parse import urlparse
from datetime import datetime
from process_output import process_file
import pandas as pd
import json
from datetime import datetime
import traceback
from dotenv import load_dotenv
import pandas as pd
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pytz
load_dotenv()
def parse_published_time(published_time):
current_time = datetime.now()
time_parts = published_time.split()
number = int(time_parts[0])
unit = time_parts[1]
if 'minute' in unit:
return current_time - timedelta(minutes=number)
elif 'hour' in unit:
return current_time - timedelta(hours=number)
elif 'day' in unit:
return current_time - timedelta(days=number)
elif 'month' in unit:
return current_time - relativedelta(months=number)
elif 'year' in unit:
return current_time - relativedelta(years=number)
else:
return current_time # Default to current time if something unexpected occurs
# automatically infer the substring after the top level domain (ex: google.com/ajax -> ajax)
def get_substring_after_tld(url):
parsed_url = urlparse(url)
tld_index = parsed_url.netloc.rfind('.')
if tld_index != -1:
return parsed_url.netloc[tld_index+1:] + parsed_url.path
return ''
def sanitize_substring(substring):
return substring.replace(':', '_').replace('/', '_').replace('.','_').replace('www', '_').replace('https', '_').replace('http', '_').replace('?','_')
with open('config.json', 'r') as f:
config = json.load(f)
target_urls = config['xhr_urls']
target_strs = [target_str for target_str in target_urls]
target_str = target_strs[0]
substring_after_tld = get_substring_after_tld(target_str)
target_dir_str = sanitize_substring(substring_after_tld)
out_dir = f'output/{target_dir_str}'
if not os.path.exists(out_dir):
os.makedirs(out_dir)
# Global variable to store the last tracked URL
last_url = None
video_id = None
def extract_comment_info(comment_entity_payload):
try:
# Extracting properties
properties = comment_entity_payload['properties']
author = comment_entity_payload['author']
toolbar = comment_entity_payload['toolbar']
print(f'=====================================\nToolbar: {toolbar}\n=====================================\n')
like_count = toolbar['likeCountNotliked']
reply_count = toolbar['replyCount']
# Extracting individual fields
content = properties['content']['content']
comment_id = properties['commentId']
published_time = properties['publishedTime']
reply_level = properties['replyLevel']
# Extracting author details
channel_id = author['channelId']
display_name = author['displayName']
avatar_url = author['avatarThumbnailUrl']
is_verified = author['isVerified']
#REASSIGNMENT!
author = properties['authorButtonA11y']
print(f"Comment ID: {comment_id}")
print(f"Content: {content}")
print(f"Published Time: {published_time}")
print(f"Reply Level: {reply_level}")
print(f"Channel ID: {channel_id}")
print(f"Display Name: {display_name}")
print(f"Avatar URL: {avatar_url}")
print(f"Verified: {'Yes' if is_verified else 'No'}")
json_obj = {
"comment_id": comment_id,
"0": content,
"like_count": like_count,
"reply_count": reply_count,
"author": author,
"published_time": published_time,
"reply_level": reply_level,
"channel_id": channel_id,
"display_name": display_name,
"avatar_url": avatar_url,
"verified": is_verified
}
return json_obj
except Exception as e:
print(f"Error extracting information: {e}")
traceback.print_exc()
def response(flow):
global last_url # Declare last_url as global so we can modify it
flow_url = flow.request.url
if target_str in flow_url.lower():
# print(f'XHR URL: {flow_url}\n\n\n\n\n\n')
# Parse the JSON response from the XHR request
# print(flow_url)
try:
response_dict = json.loads(flow.response.text)
except Exception:
response_dict = {
'content': flow.response.text
}
# Append the last tracked URL to the response_dict
if last_url is not None:
response_dict['tracked_url'] = last_url
# print(response_dict)
# Save the metadata and XHR response to a file
if not os.path.exists(out_dir):
os.makedirs(out_dir)
response_dict['timeOfScrape'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
response_dict['videoId'] = str(response_dict).split("watch%253Fv%253D")[1].split('&')[0]
response_dict['video_id'] = 'aaa'
except:
response_dict['videoId'] = 'N/A'
pass
with open(f'{out_dir}/data.json', 'a') as f:
json.dump(response_dict, f)
f.write(',\n')
youtube_comments_object = process_file('com_youtubei_v1_next/data.json')
comments_data = []
for i, comment_object in enumerate(youtube_comments_object):
framework_updates = None
try:
framework_updates = comment_object['frameworkUpdates']
except Exception as e:
print(f'Error extracting frameworkUpdates for comment {i}: {e}')
continue
#print(framework_updates)
mutations = framework_updates['entityBatchUpdate']['mutations']
for mutation in mutations:
#print(mutation.keys())
if ('payload' not in mutation.keys()):
continue
#try to get commentEntityPayload
comment_entity_payload = None
comment_obj = None
try:
comment_entity_payload = mutation['payload']['commentEntityPayload']
print('commentEntityPayload')
comment_obj = extract_comment_info(comment_entity_payload)
except Exception as e:
print(f'Error extracting commentEntityPayload for comment {i}: {e}')
traceback.print_exc()
#print(mutation['payload'].keys())
continue
comments_data.append(comment_obj)
#by this point, we have a comment!
continue
# Convert to a DataFrame
df_comments = pd.DataFrame(comments_data)
video_id = youtube_comments_object[0]['videoId']
df_comments['video_id'] = video_id
print(df_comments)
df_comments['Date'] = df_comments['published_time'].apply(parse_published_time)
# # Localize the datetime to UTC
df_comments['Date'] = df_comments['Date'].dt.tz_localize('UTC')
# # Convert UTC to Arizona Time (MST, which is UTC-7)
arizona_tz = pytz.timezone('America/Phoenix')
df_comments['Date'] = df_comments['Date'].dt.tz_convert(arizona_tz)
# # Extract datetime components
df_comments['Day_of_Week'] = df_comments['Date'].dt.day_name()
df_comments['Hour_of_Day'] = df_comments['Date'].dt.hour
#replace double quotes
df_comments['0'] = df_comments['0'].str.replace('"', "'")
df_comments.to_csv(f'scraped_comments/{video_id}_comments.csv', index=False)
```
In this way, we're processing the scraped data in real time as the requests come through our proxy! By doing this, we can scrape and process dynamically loaded data from ajax requests from our remote browser.
In the above example, we captured the requests that are sent as the browser scrolls down the page on a YouTube video, and extracted the comment metadata. To implement this on a server, you can orchestrate this via a browser extension as described in [[Chrome Extension-Based Dynamic Content Scraping System]], or simply setup a Puppeteer script to automate the browser tasks (navigate, scroll, click, etc).