Source code for odd_kernel.datasets.cryptocurrencies

import requests
import json
import time
import websockets
from enum import Enum
from typing import List, Dict, Optional
from datetime import datetime, timedelta

DEFAULT_SNAPSHOT_SIZE = 1000
MAX_BOOK_DEPTH = 5000
MILISECONDS_IN_SECOND = 1000
DEFAULT_HISTORICAL_LIMIT = 1000

BASE_URL = "https://api.binance.com"
WS_BASE_URL = "wss://stream.binance.com:9443"


[docs] class CryptoCurrenciesTimeUnit(Enum): """Time units for defining periods.""" SECOND = "s" MINUTE = "m" HOUR = "h" DAY = "d" WEEK = "w" MONTH = "M"
[docs] class CryptoCurrenciesPeriod: """ Class for defining data granularity. Attributes: count (int): Number of units in the crypto currency period. unit (CryptoCurrenciesTimeUnit): Time unit for the crypto currency period. """ def __init__(self, count: int, unit: CryptoCurrenciesTimeUnit): if count <= 0: raise ValueError("Period value must be positive.") self.count = count self.unit = unit
[docs] def to_binance_interval(self) -> str: """ Converts the period to Binance interval format. Returns: str: String with the interval in Binance format (e.g., "1m", "1h", "1d"). """ valid_intervals = { (1, CryptoCurrenciesTimeUnit.SECOND): "1s", (1, CryptoCurrenciesTimeUnit.MINUTE): "1m", (3, CryptoCurrenciesTimeUnit.MINUTE): "3m", (5, CryptoCurrenciesTimeUnit.MINUTE): "5m", (15, CryptoCurrenciesTimeUnit.MINUTE): "15m", (30, CryptoCurrenciesTimeUnit.MINUTE): "30m", (1, CryptoCurrenciesTimeUnit.HOUR): "1h", (2, CryptoCurrenciesTimeUnit.HOUR): "2h", (4, CryptoCurrenciesTimeUnit.HOUR): "4h", (6, CryptoCurrenciesTimeUnit.HOUR): "6h", (8, CryptoCurrenciesTimeUnit.HOUR): "8h", (12, CryptoCurrenciesTimeUnit.HOUR): "12h", (1, CryptoCurrenciesTimeUnit.DAY): "1d", (3, CryptoCurrenciesTimeUnit.DAY): "3d", (1, CryptoCurrenciesTimeUnit.WEEK): "1w", (1, CryptoCurrenciesTimeUnit.MONTH): "1M", } key = (self.count, self.unit) if key not in valid_intervals: raise ValueError( f"Invalid interval: {self.count}{self.unit.value}. " f"Valid intervals: {list(valid_intervals.values())}" ) return valid_intervals[key]
[docs] class DataType(Enum): """Data types available on Binance.""" KLINES = "klines" # OHLCV data (candlesticks) TRADES = "trades" # Individual trades AGG_TRADES = "aggTrades" # Aggregated trades DEPTH = "depth" # Order book (snapshot) TICKER_24HR = "ticker24hr" # 24-hour statistics
[docs] class BinanceClient: """Client for interacting with the Binance API.""" def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = None, verbose: bool = False): """ Initializes the Binance client. Args: api_key (Optional[str], optional): Binance API key (only for private endpoints). Defaults to None. api_secret (Optional[str], optional): Binance API secret (only for private endpoints). Defaults to None. verbose (bool, optional): If to print verbose output. Defaults to False. """ self.api_key = api_key self.api_secret = api_secret self.session = requests.Session() if api_key: self.session.headers.update({"X-MBX-APIKEY": api_key}) self.verbose = verbose
[docs] def get_exchange_info(self, symbol: Optional[str] = None) -> Dict: """ Gets exchange information including available symbols. Args: symbol (Optional[str], optional): Specific symbol. Defaults to None. Returns: Dict: Dictionary with exchange information. """ endpoint = f"{BASE_URL}/api/v3/exchangeInfo" params = {} if symbol: params["symbol"] = symbol.upper() response = self.session.get(endpoint, params=params) response.raise_for_status() return response.json()
[docs] def get_all_tickers(self) -> List[str]: """ Gets the list of all available tickers (symbols). Returns: List[str]: List of available symbols. """ exchange_info = self.get_exchange_info() symbols = [ symbol_info["symbol"] for symbol_info in exchange_info["symbols"] if symbol_info["status"] == "TRADING" ] return symbols
[docs] def get_trading_pairs_info(self) -> List[Dict]: """ Gets detailed information for all trading pairs. Returns: List[Dict]: List of dictionaries with information for each pair. """ exchange_info = self.get_exchange_info() return [ { "symbol": s["symbol"], "baseAsset": s["baseAsset"], "quoteAsset": s["quoteAsset"], "status": s["status"], } for s in exchange_info["symbols"] ]
[docs] def get_historical_klines( self, symbol: str, period: CryptoCurrenciesPeriod, start_time: datetime, end_time: Optional[datetime] = None, limit: int = DEFAULT_HISTORICAL_LIMIT, ) -> List[Dict]: """ Gets historical klines (OHLCV) data. Args: symbol (str): Trading pair (e.g., "BTCUSDT"). period (CryptoCurrenciesPeriod): Data period/granularity. start_time (datetime): Start date. end_time (Optional[datetime], optional): End date. Defaults to now. limit (int, optional): Maximum number of records per request (max 1000). Defaults to DEFAULT_HISTORICAL_LIMIT. Returns: List[Dict]: List of dictionaries with OHLCV data. """ endpoint = f"{BASE_URL}/api/v3/klines" interval = period.to_binance_interval() if end_time is None: end_time = datetime.now() all_klines = [] current_start = start_time while current_start < end_time: params = { "symbol": symbol.upper(), "interval": interval, "startTime": int(current_start.timestamp() * MILISECONDS_IN_SECOND), "endTime": int(end_time.timestamp() * MILISECONDS_IN_SECOND), "limit": limit, } response = self.session.get(endpoint, params=params) response.raise_for_status() klines = response.json() if not klines: break # Convert to a more readable format for kline in klines: all_klines.append( { "open_time": datetime.fromtimestamp( kline[0] / MILISECONDS_IN_SECOND ), "open": float(kline[1]), "high": float(kline[2]), "low": float(kline[3]), "close": float(kline[4]), "volume": float(kline[5]), "close_time": datetime.fromtimestamp( kline[6] / MILISECONDS_IN_SECOND ), "quote_volume": float(kline[7]), "trades": int(kline[8]), "taker_buy_base_volume": float(kline[9]), "taker_buy_quote_volume": float(kline[10]), } ) # Update start time for the next iteration current_start = datetime.fromtimestamp( klines[-1][6] / MILISECONDS_IN_SECOND ) + timedelta(milliseconds=1) # Avoid rate limiting time.sleep(0.1) return all_klines
[docs] def get_historical_trades( self, symbol: str, start_time: datetime, end_time: Optional[datetime] = None, limit: int = DEFAULT_HISTORICAL_LIMIT, ) -> List[Dict]: """ Gets historical aggregated trades. Args: symbol (str): Trading pair. start_time (datetime): Start date. end_time (Optional[datetime], optional): End date. Defaults to None. limit (int, optional): Maximum number of records per request. Defaults to DEFAULT_HISTORICAL_LIMIT. Returns: List[Dict]: List of dictionaries with trade data. """ endpoint = f"{BASE_URL}/api/v3/aggTrades" if end_time is None: end_time = datetime.now() params = { "symbol": symbol.upper(), "startTime": int(start_time.timestamp() * MILISECONDS_IN_SECOND), "endTime": int(end_time.timestamp() * MILISECONDS_IN_SECOND), "limit": limit, } all_trades = [] while True: response = self.session.get(endpoint, params=params) response.raise_for_status() trades = response.json() if not trades: break for trade in trades: all_trades.append( { "id": trade["a"], "price": float(trade["p"]), "quantity": float(trade["q"]), "first_trade_id": trade["f"], "last_trade_id": trade["l"], "timestamp": datetime.fromtimestamp( trade["T"] / MILISECONDS_IN_SECOND ), "is_buyer_maker": trade["m"], } ) # If we receive fewer records than the limit, there is no more data if len(trades) < limit: break # Update fromId for the next iteration params["fromId"] = trades[-1]["a"] + 1 params.pop("startTime", None) params.pop("endTime", None) time.sleep(0.1) return all_trades
[docs] def get_orderbook_snapshot(self, symbol: str, limit: int = DEFAULT_SNAPSHOT_SIZE) -> Dict: """ Gets an order book snapshot. Args: symbol (str): Trading pair. limit (int, optional): Book depth (max 5000). Defaults to DEFAULT_SNAPSHOT_SIZE. Returns: Dict: Dictionary with bids and asks. """ endpoint = f"{BASE_URL}/api/v3/depth" params = {"symbol": symbol.upper(), "limit": limit} response = self.session.get(endpoint, params=params) response.raise_for_status() return response.json()
[docs] class OrderBook: """Maintains a real-time order book for a symbol.""" def __init__(self, client: BinanceClient, symbol: str): """ Initializes the OrderBook. Args: client (BinanceClient): Binance client instance. symbol (str): Trading pair. """ self.client = client self.symbol = symbol.upper() self.bids: Dict[float, float] = {} self.asks: Dict[float, float] = {} self.last_update_id: Optional[int] = None self.initialized = False self.buffer: List[Dict] = []
[docs] def initialize_from_snapshot(self, limit: int = DEFAULT_SNAPSHOT_SIZE): """ Initializes the order book from a REST API snapshot. Args: limit (int, optional): Snapshot depth. Defaults to DEFAULT_SNAPSHOT_SIZE. """ snapshot = self.client.get_orderbook_snapshot(self.symbol, limit) self.last_update_id = snapshot["last_update_id"] # Initialize bids and asks self.bids = {float(price): float(qty) for price, qty in snapshot["bids"]} self.asks = {float(price): float(qty) for price, qty in snapshot["asks"]} self.initialized = True if self.client.verbose: print( f"OrderBook initialized for {self.symbol} (lastUpdateId: {self.last_update_id})" )
[docs] def process_depth_update(self, data: Dict): """ Processes a depth stream update. Args: data (Dict): Depth update data. """ # If it's a combined stream, extract data if "data" in data: data = data["data"] # Check that it is the correct symbol if data.get("s", "").upper() != self.symbol: return # If not initialized, buffer events if not self.initialized: self.buffer.append(data) return # Check update sequence first_update_id = data["U"] final_update_id = data["u"] # Discard old events if final_update_id <= self.last_update_id: return # Check for gap if first_update_id > self.last_update_id + 1: if self.client.verbose: print("Gap detected in updates! Reinitializing...") self.initialized = False self.initialize_from_snapshot() return # Apply bid updates for bid in data["b"]: price = float(bid[0]) qty = float(bid[1]) if qty == 0: self.bids.pop(price, None) else: self.bids[price] = qty # Apply ask updates for ask in data["a"]: price = float(ask[0]) qty = float(ask[1]) if qty == 0: self.asks.pop(price, None) else: self.asks[price] = qty self.last_update_id = final_update_id
[docs] def process_buffered_events(self): """Processes events that were buffered before initialization.""" if not self.initialized: return for event in self.buffer: if event["u"] > self.last_update_id: self.process_depth_update(event) self.buffer.clear()
[docs] def get_best_bid(self) -> Optional[tuple[float, float]]: """Gets the best bid (highest price).""" if not self.bids: return None best_price = max(self.bids.keys()) return (best_price, self.bids[best_price])
[docs] def get_best_ask(self) -> Optional[tuple[float, float]]: """Gets the best ask (lowest price).""" if not self.asks: return None best_price = min(self.asks.keys()) return (best_price, self.asks[best_price])
[docs] def get_spread(self) -> Optional[float]: """Calculates the spread between bid and ask.""" best_bid = self.get_best_bid() best_ask = self.get_best_ask() if best_bid and best_ask: return best_ask[0] - best_bid[0] return None
[docs] def get_orderbook_snapshot(self, depth: int = 10) -> Dict: """ Gets a snapshot of the current order book. Args: depth (int, optional): Number of levels to include. Defaults to 10. Returns: Dict: Dictionary with sorted bids and asks. """ sorted_bids = sorted(self.bids.items(), key=lambda x: x[0], reverse=True)[:depth] sorted_asks = sorted(self.asks.items(), key=lambda x: x[0])[:depth] return { "symbol": self.symbol, "last_update_id": self.last_update_id, "bids": [[price, qty] for price, qty in sorted_bids], "asks": [[price, qty] for price, qty in sorted_asks], "best_bid": self.get_best_bid(), "best_ask": self.get_best_ask(), "spread": self.get_spread(), }
[docs] async def start_stream(self): """Starts the depth update stream.""" url = f"{WS_BASE_URL}/ws/{self.symbol.lower()}@depth" async with websockets.connect(url) as websocket: if self.client.verbose: print(f"Depth stream started for {self.symbol}") async for message in websocket: data = json.loads(message) self.process_depth_update(data)