![[stockdemo.gif]]
# About the Data
- The exchange data is pulled from one of NASDAQ's back-end APIs
- Each CSV extracted can be thought of as a snapshot of a point in time.
- By building a data pipeline, we can transform these individual snapshots into a stream of market intelligence.
- We can track the movement of stock prices / trading volume and identify patterns in sector rotation over time.
![[NASDAQ_Sheet_Shot.png]]
- 7000+ tickers
- AMEX
- NASDAQ
- NYSE
- Updates every 15 minutes
## Visualizing Using Microsoft Fabric and PowerBI
![[Market Overview 2-1.png]]
## EDA Performed using Python, Pandas, Scikit-learn
Public Kaggle Notebook: https://www.kaggle.com/code/dataranch/nasdaq-analysis-python
![[sector_map.png]]
![[market_map.png]]
![[MSFT_analysis.png]]
![[Python-EDA2.png]]
![[Python-EDA3.png]]
# Data Flow
![[Stock-Scraper-Flow.png]]
# Scraping Code
When investigating a data source, it's a good idea to start with a simple Python script.
![[res_screenshot2.png]]
This request will hang indefinitely. A workaround for this is to use [curl_cffi](https://github.com/lexiforest/curl_cffi) to get a response. This library gives us a lightweight way to impersonate browser fingerprints.
```python
from curl_cffi import requests as curl_requests
import json
from datetime import datetime
import os
import time
from bs4 import BeautifulSoup
from db_utils import create_tables, insert_ticker_data
import pandas as pd
def check_market_status():
session = curl_requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
})
try:
response = session.get('https://www.tradinghours.com/open')
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
open_text = soup.find('p', class_='display-6 text-center my-5 font-weight-bold')
if open_text:
open_text = open_text.text
print(open_text)
if 'is open right now.' in open_text:
print('market is open')
return True
else:
print('market is closed')
# import sys
# sys.exit(0)
return False
except Exception as e:
print(f"Error checking market status: {str(e)}")
return False
def fetch_exchange_data(exchange):
session = curl_requests.Session()
session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
})
try:
response = session.get(
'[REDACTED_URL]',
params={
'tableonly': 'true',
'limit': '25',
'offset': '0',
'exchange': exchange,
'download': 'true'
},
impersonate="chrome120"
)
if response.status_code == 200:
return response.json()
else:
print(f"Request failed for {exchange} with status code: {response.status_code}")
return None
except Exception as e:
print(f"Error fetching {exchange} data: {str(e)}")
return None
def main():
exchanges = ['NYSE', 'AMEX', 'NASDAQ']
if not os.path.exists('./tickers'):
os.makedirs('./tickers')
#is_market_open = check_market_status()
#print('market is open' if is_market_open else 'market is closed')
now = datetime.now()
timestamp = now.strftime("%m-%d-%Y_%H-%M-%S")
create_tables()
for exchange in exchanges:
data = fetch_exchange_data(exchange)
if data and 'data' in data:
filename = f'./tickers/{exchange}_{timestamp}.json'
with open(filename, 'w') as f:
json.dump(data['data'], f)
print(f'file written to {filename}')
# insert into postgres table
df = pd.DataFrame(data['data']['rows'], columns=data['data']['headers'])
df['date'] = now
insert_ticker_data(df, exchange)
if __name__ == "__main__":
main()
```
## PostgreSQL Helper Functions
```python
# db_utils.py
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
from datetime import datetime
#dotenv
from dotenv import load_dotenv
import os
load_dotenv()
DBNAME = os.getenv('DBNAME')
USER = os.getenv('USER')
PASSWORD = os.getenv('PASSWORD')
HOST = os.getenv('HOST')
DB_PARAMS = {
"dbname": DBNAME,
"user": USER,
"password": PASSWORD,
"host": HOST
}
def get_connection():
return psycopg2.connect(**DB_PARAMS)
def create_tables():
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
CREATE TABLE IF NOT EXISTS stock_tickers (
id SERIAL PRIMARY KEY,
symbol VARCHAR(10) NOT NULL,
name TEXT NOT NULL,
lastsale NUMERIC,
netchange NUMERIC,
pctchange VARCHAR(20),
marketcap NUMERIC,
country VARCHAR(100),
ipoyear NUMERIC,
volume INTEGER,
sector VARCHAR(100),
industry VARCHAR(100),
url TEXT,
date TIMESTAMP NOT NULL,
exchange VARCHAR(10) NOT NULL,
UNIQUE (symbol, date, exchange)
);
CREATE INDEX IF NOT EXISTS idx_stock_tickers_symbol ON stock_tickers(symbol);
CREATE INDEX IF NOT EXISTS idx_stock_tickers_date ON stock_tickers(date);
CREATE INDEX IF NOT EXISTS idx_stock_tickers_exchange ON stock_tickers(exchange);
""")
conn.commit()
print("Tables created successfully")
except Exception as e:
print(f"Error creating tables: {e}")
conn.rollback()
finally:
cursor.close()
conn.close()
def insert_ticker_data(df, exchange):
conn = get_connection()
cursor = conn.cursor()
columns = ['symbol', 'name', 'lastsale', 'netchange', 'pctchange',
'marketcap', 'country', 'ipoyear', 'volume', 'sector',
'industry', 'url', 'date', 'exchange']
values = []
for _, row in df.iterrows():
try:
lastsale = float(str(row['lastsale']).replace('