import requests
import json
import time
import websockets
from enum import Enum
from typing import List, Dict, Optional
from datetime import datetime, timedelta
DEFAULT_SNAPSHOT_SIZE = 1000
MAX_BOOK_DEPTH = 5000
MILISECONDS_IN_SECOND = 1000
DEFAULT_HISTORICAL_LIMIT = 1000
BASE_URL = "https://api.binance.com"
WS_BASE_URL = "wss://stream.binance.com:9443"
[docs]
class CryptoCurrenciesTimeUnit(Enum):
"""Time units for defining periods."""
SECOND = "s"
MINUTE = "m"
HOUR = "h"
DAY = "d"
WEEK = "w"
MONTH = "M"
[docs]
class CryptoCurrenciesPeriod:
"""
Class for defining data granularity.
Attributes:
count (int): Number of units in the crypto currency period.
unit (CryptoCurrenciesTimeUnit): Time unit for the crypto currency period.
"""
def __init__(self, count: int, unit: CryptoCurrenciesTimeUnit):
if count <= 0:
raise ValueError("Period value must be positive.")
self.count = count
self.unit = unit
[docs]
def to_binance_interval(self) -> str:
"""
Converts the period to Binance interval format.
Returns:
str: String with the interval in Binance format (e.g., "1m", "1h", "1d").
"""
valid_intervals = {
(1, CryptoCurrenciesTimeUnit.SECOND): "1s",
(1, CryptoCurrenciesTimeUnit.MINUTE): "1m",
(3, CryptoCurrenciesTimeUnit.MINUTE): "3m",
(5, CryptoCurrenciesTimeUnit.MINUTE): "5m",
(15, CryptoCurrenciesTimeUnit.MINUTE): "15m",
(30, CryptoCurrenciesTimeUnit.MINUTE): "30m",
(1, CryptoCurrenciesTimeUnit.HOUR): "1h",
(2, CryptoCurrenciesTimeUnit.HOUR): "2h",
(4, CryptoCurrenciesTimeUnit.HOUR): "4h",
(6, CryptoCurrenciesTimeUnit.HOUR): "6h",
(8, CryptoCurrenciesTimeUnit.HOUR): "8h",
(12, CryptoCurrenciesTimeUnit.HOUR): "12h",
(1, CryptoCurrenciesTimeUnit.DAY): "1d",
(3, CryptoCurrenciesTimeUnit.DAY): "3d",
(1, CryptoCurrenciesTimeUnit.WEEK): "1w",
(1, CryptoCurrenciesTimeUnit.MONTH): "1M",
}
key = (self.count, self.unit)
if key not in valid_intervals:
raise ValueError(
f"Invalid interval: {self.count}{self.unit.value}. "
f"Valid intervals: {list(valid_intervals.values())}"
)
return valid_intervals[key]
[docs]
class DataType(Enum):
"""Data types available on Binance."""
KLINES = "klines" # OHLCV data (candlesticks)
TRADES = "trades" # Individual trades
AGG_TRADES = "aggTrades" # Aggregated trades
DEPTH = "depth" # Order book (snapshot)
TICKER_24HR = "ticker24hr" # 24-hour statistics
[docs]
class BinanceClient:
"""Client for interacting with the Binance API."""
def __init__(self, api_key: Optional[str] = None, api_secret: Optional[str] = None, verbose: bool = False):
"""
Initializes the Binance client.
Args:
api_key (Optional[str], optional): Binance API key (only for private endpoints). Defaults to None.
api_secret (Optional[str], optional): Binance API secret (only for private endpoints). Defaults to None.
verbose (bool, optional): If to print verbose output. Defaults to False.
"""
self.api_key = api_key
self.api_secret = api_secret
self.session = requests.Session()
if api_key:
self.session.headers.update({"X-MBX-APIKEY": api_key})
self.verbose = verbose
[docs]
def get_exchange_info(self, symbol: Optional[str] = None) -> Dict:
"""
Gets exchange information including available symbols.
Args:
symbol (Optional[str], optional): Specific symbol. Defaults to None.
Returns:
Dict: Dictionary with exchange information.
"""
endpoint = f"{BASE_URL}/api/v3/exchangeInfo"
params = {}
if symbol:
params["symbol"] = symbol.upper()
response = self.session.get(endpoint, params=params)
response.raise_for_status()
return response.json()
[docs]
def get_all_tickers(self) -> List[str]:
"""
Gets the list of all available tickers (symbols).
Returns:
List[str]: List of available symbols.
"""
exchange_info = self.get_exchange_info()
symbols = [
symbol_info["symbol"]
for symbol_info in exchange_info["symbols"]
if symbol_info["status"] == "TRADING"
]
return symbols
[docs]
def get_trading_pairs_info(self) -> List[Dict]:
"""
Gets detailed information for all trading pairs.
Returns:
List[Dict]: List of dictionaries with information for each pair.
"""
exchange_info = self.get_exchange_info()
return [
{
"symbol": s["symbol"],
"baseAsset": s["baseAsset"],
"quoteAsset": s["quoteAsset"],
"status": s["status"],
}
for s in exchange_info["symbols"]
]
[docs]
def get_historical_klines(
self,
symbol: str,
period: CryptoCurrenciesPeriod,
start_time: datetime,
end_time: Optional[datetime] = None,
limit: int = DEFAULT_HISTORICAL_LIMIT,
) -> List[Dict]:
"""
Gets historical klines (OHLCV) data.
Args:
symbol (str): Trading pair (e.g., "BTCUSDT").
period (CryptoCurrenciesPeriod): Data period/granularity.
start_time (datetime): Start date.
end_time (Optional[datetime], optional): End date. Defaults to now.
limit (int, optional): Maximum number of records per request (max 1000). Defaults to DEFAULT_HISTORICAL_LIMIT.
Returns:
List[Dict]: List of dictionaries with OHLCV data.
"""
endpoint = f"{BASE_URL}/api/v3/klines"
interval = period.to_binance_interval()
if end_time is None:
end_time = datetime.now()
all_klines = []
current_start = start_time
while current_start < end_time:
params = {
"symbol": symbol.upper(),
"interval": interval,
"startTime": int(current_start.timestamp() * MILISECONDS_IN_SECOND),
"endTime": int(end_time.timestamp() * MILISECONDS_IN_SECOND),
"limit": limit,
}
response = self.session.get(endpoint, params=params)
response.raise_for_status()
klines = response.json()
if not klines:
break
# Convert to a more readable format
for kline in klines:
all_klines.append(
{
"open_time": datetime.fromtimestamp(
kline[0] / MILISECONDS_IN_SECOND
),
"open": float(kline[1]),
"high": float(kline[2]),
"low": float(kline[3]),
"close": float(kline[4]),
"volume": float(kline[5]),
"close_time": datetime.fromtimestamp(
kline[6] / MILISECONDS_IN_SECOND
),
"quote_volume": float(kline[7]),
"trades": int(kline[8]),
"taker_buy_base_volume": float(kline[9]),
"taker_buy_quote_volume": float(kline[10]),
}
)
# Update start time for the next iteration
current_start = datetime.fromtimestamp(
klines[-1][6] / MILISECONDS_IN_SECOND
) + timedelta(milliseconds=1)
# Avoid rate limiting
time.sleep(0.1)
return all_klines
[docs]
def get_historical_trades(
self,
symbol: str,
start_time: datetime,
end_time: Optional[datetime] = None,
limit: int = DEFAULT_HISTORICAL_LIMIT,
) -> List[Dict]:
"""
Gets historical aggregated trades.
Args:
symbol (str): Trading pair.
start_time (datetime): Start date.
end_time (Optional[datetime], optional): End date. Defaults to None.
limit (int, optional): Maximum number of records per request. Defaults to DEFAULT_HISTORICAL_LIMIT.
Returns:
List[Dict]: List of dictionaries with trade data.
"""
endpoint = f"{BASE_URL}/api/v3/aggTrades"
if end_time is None:
end_time = datetime.now()
params = {
"symbol": symbol.upper(),
"startTime": int(start_time.timestamp() * MILISECONDS_IN_SECOND),
"endTime": int(end_time.timestamp() * MILISECONDS_IN_SECOND),
"limit": limit,
}
all_trades = []
while True:
response = self.session.get(endpoint, params=params)
response.raise_for_status()
trades = response.json()
if not trades:
break
for trade in trades:
all_trades.append(
{
"id": trade["a"],
"price": float(trade["p"]),
"quantity": float(trade["q"]),
"first_trade_id": trade["f"],
"last_trade_id": trade["l"],
"timestamp": datetime.fromtimestamp(
trade["T"] / MILISECONDS_IN_SECOND
),
"is_buyer_maker": trade["m"],
}
)
# If we receive fewer records than the limit, there is no more data
if len(trades) < limit:
break
# Update fromId for the next iteration
params["fromId"] = trades[-1]["a"] + 1
params.pop("startTime", None)
params.pop("endTime", None)
time.sleep(0.1)
return all_trades
[docs]
def get_orderbook_snapshot(self, symbol: str, limit: int = DEFAULT_SNAPSHOT_SIZE) -> Dict:
"""
Gets an order book snapshot.
Args:
symbol (str): Trading pair.
limit (int, optional): Book depth (max 5000). Defaults to DEFAULT_SNAPSHOT_SIZE.
Returns:
Dict: Dictionary with bids and asks.
"""
endpoint = f"{BASE_URL}/api/v3/depth"
params = {"symbol": symbol.upper(), "limit": limit}
response = self.session.get(endpoint, params=params)
response.raise_for_status()
return response.json()
[docs]
class OrderBook:
"""Maintains a real-time order book for a symbol."""
def __init__(self, client: BinanceClient, symbol: str):
"""
Initializes the OrderBook.
Args:
client (BinanceClient): Binance client instance.
symbol (str): Trading pair.
"""
self.client = client
self.symbol = symbol.upper()
self.bids: Dict[float, float] = {}
self.asks: Dict[float, float] = {}
self.last_update_id: Optional[int] = None
self.initialized = False
self.buffer: List[Dict] = []
[docs]
def initialize_from_snapshot(self, limit: int = DEFAULT_SNAPSHOT_SIZE):
"""
Initializes the order book from a REST API snapshot.
Args:
limit (int, optional): Snapshot depth. Defaults to DEFAULT_SNAPSHOT_SIZE.
"""
snapshot = self.client.get_orderbook_snapshot(self.symbol, limit)
self.last_update_id = snapshot["last_update_id"]
# Initialize bids and asks
self.bids = {float(price): float(qty) for price, qty in snapshot["bids"]}
self.asks = {float(price): float(qty) for price, qty in snapshot["asks"]}
self.initialized = True
if self.client.verbose:
print(
f"OrderBook initialized for {self.symbol} (lastUpdateId: {self.last_update_id})"
)
[docs]
def process_depth_update(self, data: Dict):
"""
Processes a depth stream update.
Args:
data (Dict): Depth update data.
"""
# If it's a combined stream, extract data
if "data" in data:
data = data["data"]
# Check that it is the correct symbol
if data.get("s", "").upper() != self.symbol:
return
# If not initialized, buffer events
if not self.initialized:
self.buffer.append(data)
return
# Check update sequence
first_update_id = data["U"]
final_update_id = data["u"]
# Discard old events
if final_update_id <= self.last_update_id:
return
# Check for gap
if first_update_id > self.last_update_id + 1:
if self.client.verbose:
print("Gap detected in updates! Reinitializing...")
self.initialized = False
self.initialize_from_snapshot()
return
# Apply bid updates
for bid in data["b"]:
price = float(bid[0])
qty = float(bid[1])
if qty == 0:
self.bids.pop(price, None)
else:
self.bids[price] = qty
# Apply ask updates
for ask in data["a"]:
price = float(ask[0])
qty = float(ask[1])
if qty == 0:
self.asks.pop(price, None)
else:
self.asks[price] = qty
self.last_update_id = final_update_id
[docs]
def process_buffered_events(self):
"""Processes events that were buffered before initialization."""
if not self.initialized:
return
for event in self.buffer:
if event["u"] > self.last_update_id:
self.process_depth_update(event)
self.buffer.clear()
[docs]
def get_best_bid(self) -> Optional[tuple[float, float]]:
"""Gets the best bid (highest price)."""
if not self.bids:
return None
best_price = max(self.bids.keys())
return (best_price, self.bids[best_price])
[docs]
def get_best_ask(self) -> Optional[tuple[float, float]]:
"""Gets the best ask (lowest price)."""
if not self.asks:
return None
best_price = min(self.asks.keys())
return (best_price, self.asks[best_price])
[docs]
def get_spread(self) -> Optional[float]:
"""Calculates the spread between bid and ask."""
best_bid = self.get_best_bid()
best_ask = self.get_best_ask()
if best_bid and best_ask:
return best_ask[0] - best_bid[0]
return None
[docs]
def get_orderbook_snapshot(self, depth: int = 10) -> Dict:
"""
Gets a snapshot of the current order book.
Args:
depth (int, optional): Number of levels to include. Defaults to 10.
Returns:
Dict: Dictionary with sorted bids and asks.
"""
sorted_bids = sorted(self.bids.items(), key=lambda x: x[0], reverse=True)[:depth]
sorted_asks = sorted(self.asks.items(), key=lambda x: x[0])[:depth]
return {
"symbol": self.symbol,
"last_update_id": self.last_update_id,
"bids": [[price, qty] for price, qty in sorted_bids],
"asks": [[price, qty] for price, qty in sorted_asks],
"best_bid": self.get_best_bid(),
"best_ask": self.get_best_ask(),
"spread": self.get_spread(),
}
[docs]
async def start_stream(self):
"""Starts the depth update stream."""
url = f"{WS_BASE_URL}/ws/{self.symbol.lower()}@depth"
async with websockets.connect(url) as websocket:
if self.client.verbose:
print(f"Depth stream started for {self.symbol}")
async for message in websocket:
data = json.loads(message)
self.process_depth_update(data)