![[stockdemo.gif]] # About the Data - The exchange data is pulled from one of NASDAQ's back-end APIs - Each CSV extracted can be thought of as a snapshot of a point in time. - By building a data pipeline, we can transform these individual snapshots into a stream of market intelligence. - We can track the movement of stock prices / trading volume and identify patterns in sector rotation over time. ![[NASDAQ_Sheet_Shot.png]] - 7000+ tickers - AMEX - NASDAQ - NYSE - Updates every 15 minutes ## Visualizing Using Microsoft Fabric and PowerBI ![[Market Overview 2-1.png]] ## EDA Performed using Python, Pandas, Scikit-learn Public Kaggle Notebook: https://www.kaggle.com/code/dataranch/nasdaq-analysis-python ![[sector_map.png]] ![[market_map.png]] ![[MSFT_analysis.png]] ![[Python-EDA2.png]] ![[Python-EDA3.png]] # Data Flow ![[Stock-Scraper-Flow.png]] # Scraping Code When investigating a data source, it's a good idea to start with a simple Python script. ![[res_screenshot2.png]] This request will hang indefinitely. A workaround for this is to use [curl_cffi](https://github.com/lexiforest/curl_cffi) to get a response. This library gives us a lightweight way to impersonate browser fingerprints. ```python from curl_cffi import requests as curl_requests import json from datetime import datetime import os import time from bs4 import BeautifulSoup from db_utils import create_tables, insert_ticker_data import pandas as pd def check_market_status(): session = curl_requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9', }) try: response = session.get('https://www.tradinghours.com/open') if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') open_text = soup.find('p', class_='display-6 text-center my-5 font-weight-bold') if open_text: open_text = open_text.text print(open_text) if 'is open right now.' in open_text: print('market is open') return True else: print('market is closed') # import sys # sys.exit(0) return False except Exception as e: print(f"Error checking market status: {str(e)}") return False def fetch_exchange_data(exchange): session = curl_requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive' }) try: response = session.get( '[REDACTED_URL]', params={ 'tableonly': 'true', 'limit': '25', 'offset': '0', 'exchange': exchange, 'download': 'true' }, impersonate="chrome120" ) if response.status_code == 200: return response.json() else: print(f"Request failed for {exchange} with status code: {response.status_code}") return None except Exception as e: print(f"Error fetching {exchange} data: {str(e)}") return None def main(): exchanges = ['NYSE', 'AMEX', 'NASDAQ'] if not os.path.exists('./tickers'): os.makedirs('./tickers') #is_market_open = check_market_status() #print('market is open' if is_market_open else 'market is closed') now = datetime.now() timestamp = now.strftime("%m-%d-%Y_%H-%M-%S") create_tables() for exchange in exchanges: data = fetch_exchange_data(exchange) if data and 'data' in data: filename = f'./tickers/{exchange}_{timestamp}.json' with open(filename, 'w') as f: json.dump(data['data'], f) print(f'file written to {filename}') # insert into postgres table df = pd.DataFrame(data['data']['rows'], columns=data['data']['headers']) df['date'] = now insert_ticker_data(df, exchange) if __name__ == "__main__": main() ``` ## PostgreSQL Helper Functions ```python # db_utils.py import psycopg2 from psycopg2.extras import execute_values import pandas as pd from datetime import datetime #dotenv from dotenv import load_dotenv import os load_dotenv() DBNAME = os.getenv('DBNAME') USER = os.getenv('USER') PASSWORD = os.getenv('PASSWORD') HOST = os.getenv('HOST') DB_PARAMS = { "dbname": DBNAME, "user": USER, "password": PASSWORD, "host": HOST } def get_connection(): return psycopg2.connect(**DB_PARAMS) def create_tables(): conn = get_connection() cursor = conn.cursor() try: cursor.execute(""" CREATE TABLE IF NOT EXISTS stock_tickers ( id SERIAL PRIMARY KEY, symbol VARCHAR(10) NOT NULL, name TEXT NOT NULL, lastsale NUMERIC, netchange NUMERIC, pctchange VARCHAR(20), marketcap NUMERIC, country VARCHAR(100), ipoyear NUMERIC, volume INTEGER, sector VARCHAR(100), industry VARCHAR(100), url TEXT, date TIMESTAMP NOT NULL, exchange VARCHAR(10) NOT NULL, UNIQUE (symbol, date, exchange) ); CREATE INDEX IF NOT EXISTS idx_stock_tickers_symbol ON stock_tickers(symbol); CREATE INDEX IF NOT EXISTS idx_stock_tickers_date ON stock_tickers(date); CREATE INDEX IF NOT EXISTS idx_stock_tickers_exchange ON stock_tickers(exchange); """) conn.commit() print("Tables created successfully") except Exception as e: print(f"Error creating tables: {e}") conn.rollback() finally: cursor.close() conn.close() def insert_ticker_data(df, exchange): conn = get_connection() cursor = conn.cursor() columns = ['symbol', 'name', 'lastsale', 'netchange', 'pctchange', 'marketcap', 'country', 'ipoyear', 'volume', 'sector', 'industry', 'url', 'date', 'exchange'] values = [] for _, row in df.iterrows(): try: lastsale = float(str(row['lastsale']).replace(', '').replace(',', '')) if pd.notna(row['lastsale']) and row['lastsale'] != '' else None except (ValueError, TypeError): lastsale = None try: netchange = float(row['netchange']) if pd.notna(row['netchange']) and row['netchange'] != '' else None except (ValueError, TypeError): netchange = None pctchange = row['pctchange'] if pd.notna(row['pctchange']) else None try: marketcap = float(row['marketCap']) if pd.notna(row['marketCap']) and row['marketCap'] != '' else None except (ValueError, TypeError, KeyError): try: marketcap = float(row['marketcap']) if pd.notna(row['marketcap']) and row['marketcap'] != '' else None except (ValueError, TypeError, KeyError): marketcap = None try: ipoyear = float(row['ipoyear']) if pd.notna(row['ipoyear']) and row['ipoyear'] != '' else None except (ValueError, TypeError): ipoyear = None try: volume = int(row['volume']) if pd.notna(row['volume']) and row['volume'] != '' else None except (ValueError, TypeError): volume = None date_obj = None date_str = row['date'] if isinstance(date_str, str): try: date_obj = datetime.strptime(date_str, "%m-%d-%Y_%H-%M-%S") except ValueError: try: date_obj = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") except ValueError: print(f"Error parsing date: {date_str}") else: date_obj = date_str if date_obj is None: continue values.append(( row['symbol'], row['name'], lastsale, netchange, pctchange, marketcap, row['country'], ipoyear, volume, row['sector'], row['industry'], row['url'], date_obj, exchange )) try: execute_values( cursor, """ INSERT INTO stock_tickers (symbol, name, lastsale, netchange, pctchange, marketcap, country, ipoyear, volume, sector, industry, url, date, exchange) VALUES %s ON CONFLICT (symbol, date, exchange) DO UPDATE SET lastsale = EXCLUDED.lastsale, netchange = EXCLUDED.netchange, pctchange = EXCLUDED.pctchange, marketcap = EXCLUDED.marketcap, volume = EXCLUDED.volume """, values ) conn.commit() print(f"Inserted {len(values)} records successfully") except Exception as e: print(f"Error inserting data: {e}") conn.rollback() finally: cursor.close() conn.close() def get_ticker_data(symbols=None, start_date=None, end_date=None, exchanges=None): #Returns: DataFrame with query results conn = get_connection() query = "SELECT * FROM stock_tickers WHERE 1=1" params = [] if symbols: placeholders = ', '.join(['%s'] * len(symbols)) query += f" AND symbol IN ({placeholders})" params.extend(symbols) if start_date: query += " AND date >= %s" params.append(start_date) if end_date: query += " AND date <= %s" params.append(end_date) if exchanges: placeholders = ', '.join(['%s'] * len(exchanges)) query += f" AND exchange IN ({placeholders})" params.extend(exchanges) query += " ORDER BY symbol, date" df = pd.read_sql_query(query, conn, params=params) conn.close() return df ``` ## JSON to CSV Code The scraper produces JSON files, and you'll want to convert these to CSVs before performing an EDA. ```python # parse.py #%% import pandas as pd import json import os from tqdm import tqdm import time # %% start_time = time.time() TICKER_DIR = 'tickers' exchanges = ['NASDAQ', 'NYSE', 'NASD', 'AMEX'] exchange_files = os.listdir(TICKER_DIR) for file in exchange_files: print(f'FILE: {file}') if file.endswith('.json'): try: date = "_".join(file.split('_')[1:]).split('.')[0] except: date = None print(f'DATE: {date}') with open(f'{TICKER_DIR}/{file}', 'r') as f: data = json.load(f) exchange_df = pd.DataFrame(data['rows'], columns=data['headers']) exchange_df['date'] = date file = file.split('.')[0] print(f'CSV FILE: {file}') exchange_df.to_csv(f'{TICKER_DIR}/{file}.csv', index=False) end_time = time.time() print(f"Execution Time: {end_time - start_time} seconds") ``` ## Alternative Approach - Node.js and Puppeteer This is a more computationally expensive way to grab the same data. ```javascript // puppeteer-extra is a drop-in replacement for puppeteer, // it augments the installed puppeteer with plugin functionality const puppeteer = require('puppeteer-extra') const StealthPlugin = require('puppeteer-extra-plugin-stealth'); const fs = require('fs'); const { executablePath } = require('puppeteer') const fsp = require('fs').promises; (async () => { puppeteer.use( require('puppeteer-extra-plugin-stealth/evasions/navigator.webdriver')() ); puppeteer.use( require('puppeteer-extra-plugin-stealth/evasions/sourceurl')() ); const customArgs = [ `--start-maximized`, `--no-sandbox`, ]; // const browser = await puppeteer.launch({ defaultViewport: null, executablePath: executablePath(), headless: false, ignoreDefaultArgs: ["--disable-extensions", "--enable-automation"], args: customArgs, }); const myArgs = process.argv.slice(2); console.log('myArgs: ', myArgs); const page = await browser.newPage(); // capture //await page.goto(checkUrl); //await page.waitForTimeout(9000); //send init signal console.log('init signal sent') //await page.goto(`${myArgs[0].replace('\n','')}`, { timeout: 10000 }); urls = [ 'REDACTED_URLS' ] // ===================== IS MARKET OPEN? ===================== marketCheckUrl = 'https://www.tradinghours.com/open' await page.goto(marketCheckUrl); // new waitForTimeout await new Promise(r => setTimeout(r, 5000)); pageText = await page.evaluate(() => document.body.innerText); //get the text from p[class="display-6 text-center my-5 font-weight-bold"] openText = await page.$eval('p[class="display-6 text-center my-5 font-weight-bold"]', el => el.innerText); console.log(openText); if (openText.includes('is open right now.')) { console.log('market is open'); } else { console.log('market is closed'); // if we want to halt execution until the market is open, we can do so with: //await browser.close(); //process.exit(0); } // =========================================================== for (i = 0; i < urls.length; i++) { await page.goto(urls[i]); await new Promise(r => setTimeout(r, 5000)); //get all text from page //pageText = await page.content() //get all inner text pageText = await page.evaluate(() => document.body.innerText); //convert to json //pageText = JSON.parse(pageText); //get the data //pageText = pageText['data']['rows']; console.log(pageText); //replace all '\' with '' pageText = pageText.replace(/\\/g, ''); //take a screenshot //await page.screenshot({ path: `test.png`, fullPage: true }); //write to file //get the current date var date = new Date(); var dd = String(date.getDate()).padStart(2, '0'); var mm = String(date.getMonth() + 1).padStart(2, '0'); //January is 0! var yyyy = date.getFullYear(); var hours = date.getHours(); var minutes = date.getMinutes(); var seconds = date.getSeconds(); today = mm + '-' + dd + '-' + yyyy + '_' + hours + '-' + minutes + '-' + seconds; await fsp.writeFile(`./tickers/${urls[i].split('exchange=')[1].split('&')[0]}_${today}.json`, JSON.stringify(JSON.parse(pageText).data)); console.log(`file written to ./tickers/${urls[i].split('exchange=')[1].split('&')[0]}_${today}.json`) } await browser.close(); process.exit(0); })(); ``` # Automating the Script On Linux In order to perform an in-depth analysis, we need to collect data frequently. We can use crontab on Linux to schedule the scraper to run every 15 minutes during market hours on weekdays. ```bash */15 14-20 * * 1-5 cd /path/to/scraper && /usr/bin/python3 ticker_scrape.py >> output.log 2>&1 ``` ## Streaming Data to Kaggle for Analysis Finally, we need a script that 1. Aggregates all the data we've collected from the scheduled runs 2. Updates the Kaggle dataset so that we can analyze the fresh data and build models. ```python # combine_csvs.py import os from datetime import timedelta, datetime as dt import warnings import pandas as pd from kaggle.api.kaggle_api_extended import KaggleApi import shutil import json from db_utils import get_ticker_data warnings.simplefilter(action='ignore', category=FutureWarning) api = KaggleApi() api.authenticate() CSV_DIR = 'tickers' DATE_FORMAT = '%Y-%m-%d' DATE_FORMAT2 = '%m-%d-%Y' DATE_FORMAT3 = '%m-%d-%Y_%H-%M-%S' # create required metadata file for kaggle def create_metadata_file(dataset_name, folder='tmp_dataset'): metadata = { "title": dataset_name.split('/')[1], "id": f"{dataset_name}", "licenses": [{"name": "CC0-1.0"}] } metadata_path = os.path.join(folder, 'dataset-metadata.json') with open(metadata_path, 'w') as f: json.dump(metadata, f) def run_notebook_analysis(): notebook_slug = "dataranch/nasdaq-analysis-python" # Push a new version of the notebook to run it api.kernel_push(notebook_slug) # Optionally monitor the status status = api.kernel_status(notebook_slug) print(f"Notebook status: {status}") def update_kaggle_dataset(dataset_name, file_path, version_notes): os.makedirs('tmp_dataset', exist_ok=True) shutil.copy2(file_path, os.path.join('tmp_dataset', os.path.basename(file_path))) create_metadata_file(dataset_name, 'tmp_dataset') api.dataset_create_version( folder='tmp_dataset', version_notes=version_notes, quiet=False ) # clean up shutil.rmtree('tmp_dataset') num_days = 100 start_date = '2025-2-17' end_date = None if end_date is None: end_date = dt.strptime(start_date, DATE_FORMAT) + timedelta(days=num_days) else: end_date = dt.strptime(end_date, DATE_FORMAT) start_date = dt.strptime(start_date, DATE_FORMAT) print(f'start_date: {start_date}') print(f'end_date: {end_date}') exchanges = ['NYSE', 'AMEX', 'NASDAQ'] master_df = get_ticker_data( start_date=start_date, end_date=end_date, exchanges=exchanges ) master_df = master_df.sort_values(['symbol', 'date'], ascending=[True, True]) master_df.to_csv('master.csv', index=False) # upsert to kaggle dataset_name = 'dataranch/nasdaq-minutes-1' version_notes = f"Updated master.csv with data through {end_date.strftime(DATE_FORMAT)}" update_kaggle_dataset(dataset_name, 'master.csv', version_notes) ``` # Realtime Analysis Kaggle Notebook: https://www.kaggle.com/code/dataranch/nasdaq-analysis-python