Skip to content

Commit 6f2db20

Browse files
committed
feat: ws
1 parent b1af956 commit 6f2db20

File tree

6 files changed

+667
-1
lines changed

6 files changed

+667
-1
lines changed

examples/websocket_example.py

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
"""
2+
Example demonstrating WebSocket client usage for DeltaDeFi real-time data streams.
3+
This file shows how to use all 4 WebSocket endpoints with proper data parsing.
4+
"""
5+
6+
import asyncio
7+
import logging
8+
import os
9+
from datetime import datetime
10+
from dotenv import load_dotenv
11+
12+
from deltadefi import ApiClient
13+
14+
# Load environment variables from .env file
15+
load_dotenv()
16+
17+
# Set up logging
18+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
19+
api_key = os.getenv("DELTADEFI_API_KEY")
20+
21+
# ============================================================================
22+
# DATA PARSING HANDLERS FOR EACH WEBSOCKET ENDPOINT
23+
# ============================================================================
24+
25+
async def handle_trade_message(data):
26+
"""
27+
Handle recent trades WebSocket messages.
28+
29+
Data format: Array of trade objects
30+
Example: [{"timestamp": "2025-08-21T03:43:00.204624Z", "symbol": "ADAUSDM",
31+
"side": "sell", "price": 0.7803, "amount": 4.6}, ...]
32+
"""
33+
print("\n🔄 TRADE STREAM DATA:")
34+
for i, trade in enumerate(data, 1):
35+
timestamp = trade.get("timestamp", "")
36+
symbol = trade.get("symbol", "Unknown")
37+
side = trade.get("side", "unknown")
38+
price = trade.get("price", 0)
39+
amount = trade.get("amount", 0)
40+
41+
# Convert timestamp to readable format
42+
try:
43+
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
44+
readable_time = dt.strftime("%H:%M:%S")
45+
except:
46+
readable_time = timestamp
47+
48+
side_emoji = "🟢" if side.lower() == "buy" else "🔴"
49+
print(f" {i}. {side_emoji} {symbol}: {side.upper()} {amount} @ ${price:.4f} at {readable_time}")
50+
51+
52+
async def handle_depth_message(data):
53+
"""
54+
Handle market depth WebSocket messages.
55+
56+
Data format: {"timestamp": 1755747950587, "bids": [{"price": 0.195, "quantity": 30}],
57+
"asks": [{"price": 0.3495, "quantity": 50.65}]}
58+
"""
59+
print("\n📊 MARKET DEPTH DATA:")
60+
timestamp = data.get("timestamp", 0)
61+
bids = data.get("bids", [])
62+
asks = data.get("asks", [])
63+
64+
# Convert timestamp
65+
try:
66+
dt = datetime.fromtimestamp(timestamp / 1000) # Assuming milliseconds
67+
readable_time = dt.strftime("%H:%M:%S")
68+
except:
69+
readable_time = str(timestamp)
70+
71+
print(f" 📅 Time: {readable_time}")
72+
73+
# Show top 5 bids and asks
74+
print(" 🟢 BIDS (Buy Orders):")
75+
for i, bid in enumerate(bids[:5], 1):
76+
price = bid.get("price", 0)
77+
quantity = bid.get("quantity", 0)
78+
print(f" {i}. ${price:.4f} × {quantity}")
79+
80+
print(" 🔴 ASKS (Sell Orders):")
81+
for i, ask in enumerate(asks[:5], 1):
82+
price = ask.get("price", 0)
83+
quantity = ask.get("quantity", 0)
84+
print(f" {i}. ${price:.4f} × {quantity}")
85+
86+
if bids and asks:
87+
spread = asks[0]["price"] - bids[0]["price"]
88+
spread_pct = (spread / bids[0]["price"]) * 100 if bids[0]["price"] > 0 else 0
89+
print(f" 📈 Spread: ${spread:.4f} ({spread_pct:.3f}%)")
90+
91+
92+
async def handle_price_message(data):
93+
"""
94+
Handle market price WebSocket messages.
95+
96+
Data format: {"type": "Market", "sub_type": "market_price", "price": 0.75}
97+
"""
98+
print("\n💰 PRICE UPDATE:")
99+
msg_type = data.get("type", "Unknown")
100+
sub_type = data.get("sub_type", "unknown")
101+
price = data.get("price", 0)
102+
103+
print(f" 📊 Type: {msg_type}/{sub_type}")
104+
print(f" 💵 Current Price: ${price:.4f}")
105+
106+
107+
async def handle_account_message(data):
108+
"""
109+
Handle account streams WebSocket messages.
110+
111+
Data formats:
112+
- Balance: {"type": "Account", "sub_type": "balance", "balance": [...]}
113+
- Orders: {"type": "Account", "sub_type": "open_orders", "data": [...]}
114+
"""
115+
msg_type = data.get("type", "Unknown")
116+
sub_type = data.get("sub_type", "unknown")
117+
118+
print(f"\n👤 ACCOUNT UPDATE ({msg_type}/{sub_type}):")
119+
120+
if sub_type == "balance":
121+
balances = data.get("balance", [])
122+
print(" 💰 Account Balances:")
123+
for balance in balances:
124+
asset = balance.get("asset", "unknown")
125+
free = balance.get("free", 0)
126+
locked = balance.get("locked", 0)
127+
total = free + locked
128+
129+
print(f" {asset.upper()}: ")
130+
print(f" Free: {free:.6f}")
131+
print(f" Locked: {locked:.6f}")
132+
print(f" Total: {total:.6f}")
133+
134+
elif sub_type == "open_orders":
135+
orders_data = data.get("data", [])
136+
print(" 📋 Open Orders:")
137+
order_count = 0
138+
for order_group in orders_data:
139+
orders = order_group.get("orders", [])
140+
for order in orders:
141+
order_count += 1
142+
order_id = order.get("order_id", "unknown")
143+
status = order.get("status", "unknown")
144+
symbol = order.get("symbol", "unknown")
145+
side = order.get("side", "unknown")
146+
147+
side_emoji = "🟢" if side.lower() == "buy" else "🔴"
148+
print(f" {order_count}. {side_emoji} {order_id[:8]}... - {symbol} {side.upper()} ({status})")
149+
150+
if order_count == 0:
151+
print(" No open orders")
152+
153+
elif sub_type == "trading_history":
154+
print(" 📈 Trading History Update")
155+
print(f" Data: {data}")
156+
157+
elif sub_type == "orders_history":
158+
print(" 📜 Orders History Update")
159+
print(f" Data: {data}")
160+
161+
else:
162+
print(f" ❓ Unknown sub_type: {sub_type}")
163+
print(f" Raw data: {data}")
164+
165+
166+
# ============================================================================
167+
# INDIVIDUAL ENDPOINT EXAMPLES
168+
# ============================================================================
169+
170+
async def example_trades_stream(symbol="ADAUSDM", duration=30):
171+
"""Example: Subscribe to recent trades stream."""
172+
print(f"\n🚀 Starting TRADES stream for {symbol} (running for {duration}s)")
173+
print("=" * 60)
174+
175+
client = ApiClient(api_key=api_key)
176+
ws_client = client.websocket
177+
ws_client.register_handler("trade", handle_trade_message)
178+
179+
try:
180+
await ws_client.subscribe_trades(symbol)
181+
print(f"✅ Connected to trades stream for {symbol}")
182+
await asyncio.sleep(duration)
183+
except Exception as e:
184+
print(f"❌ Trades stream error: {e}")
185+
finally:
186+
await ws_client.disconnect()
187+
188+
189+
async def example_depth_stream(symbol="ADAUSDM", duration=30):
190+
"""Example: Subscribe to market depth stream."""
191+
print(f"\n🚀 Starting DEPTH stream for {symbol} (running for {duration}s)")
192+
print("=" * 60)
193+
194+
client = ApiClient(api_key=api_key)
195+
ws_client = client.websocket
196+
ws_client.register_handler("depth", handle_depth_message)
197+
198+
try:
199+
await ws_client.subscribe_depth(symbol)
200+
print(f"✅ Connected to depth stream for {symbol}")
201+
await asyncio.sleep(duration)
202+
except Exception as e:
203+
print(f"❌ Depth stream error: {e}")
204+
finally:
205+
await ws_client.disconnect()
206+
207+
208+
async def example_price_stream(symbol="ADAUSDM", duration=30):
209+
"""Example: Subscribe to market price stream."""
210+
print(f"\n🚀 Starting PRICE stream for {symbol} (running for {duration}s)")
211+
print("=" * 60)
212+
213+
client = ApiClient(api_key=api_key)
214+
ws_client = client.websocket
215+
ws_client.register_handler("price", handle_price_message)
216+
217+
try:
218+
await ws_client.subscribe_price(symbol)
219+
print(f"✅ Connected to price stream for {symbol}")
220+
await asyncio.sleep(duration)
221+
except Exception as e:
222+
print(f"❌ Price stream error: {e}")
223+
finally:
224+
await ws_client.disconnect()
225+
226+
227+
async def example_account_stream(duration=30):
228+
"""Example: Subscribe to account streams."""
229+
print(f"\n🚀 Starting ACCOUNT stream (running for {duration}s)")
230+
print("=" * 60)
231+
232+
client = ApiClient(api_key=api_key)
233+
ws_client = client.websocket
234+
ws_client.register_handler("account", handle_account_message)
235+
236+
try:
237+
await ws_client.subscribe_account()
238+
print("✅ Connected to account stream")
239+
await asyncio.sleep(duration)
240+
except Exception as e:
241+
print(f"❌ Account stream error: {e}")
242+
finally:
243+
await ws_client.disconnect()
244+
245+
246+
# ============================================================================
247+
# MAIN FUNCTION WITH ENDPOINT SELECTION
248+
# ============================================================================
249+
250+
async def main():
251+
"""Main function with endpoint selection menu."""
252+
if not api_key:
253+
print("❌ Error: DELTADEFI_API_KEY not found in environment variables")
254+
print("Please set your API key in the .env file")
255+
return
256+
257+
print("🔗 DeltaDeFi WebSocket Examples")
258+
print("=" * 50)
259+
print("Available endpoints:")
260+
print("1. Recent Trades Stream")
261+
print("2. Market Depth Stream")
262+
print("3. Market Price Stream")
263+
print("4. Account Streams")
264+
print("5. Run All Endpoints (Sequential)")
265+
print("0. Quick Trades Test (10 seconds)")
266+
267+
try:
268+
choice = input("\nSelect endpoint (0-5): ").strip()
269+
270+
if choice == "1":
271+
await example_trades_stream()
272+
elif choice == "2":
273+
await example_depth_stream()
274+
elif choice == "3":
275+
await example_price_stream()
276+
elif choice == "4":
277+
await example_account_stream()
278+
elif choice == "5":
279+
print("\n🔄 Running all endpoints sequentially...")
280+
await example_trades_stream(duration=15)
281+
await asyncio.sleep(2)
282+
await example_depth_stream(duration=15)
283+
await asyncio.sleep(2)
284+
await example_price_stream(duration=15)
285+
await asyncio.sleep(2)
286+
await example_account_stream(duration=15)
287+
elif choice == "0":
288+
await example_trades_stream(duration=10)
289+
else:
290+
print("❌ Invalid choice")
291+
292+
except KeyboardInterrupt:
293+
print("\n👋 Exiting...")
294+
except Exception as e:
295+
print(f"❌ Error: {e}")
296+
297+
print("\n✅ WebSocket examples completed!")
298+
299+
300+
if __name__ == "__main__":
301+
asyncio.run(main())

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ dependencies = [
3131
"urllib3>=2.2.3",
3232
"pycardano>=0.12.3",
3333
"python-dotenv>=0.9.9",
34+
"websockets>=12.0",
3435
]
3536

3637
[dependency-groups]

src/deltadefi/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# flake8: noqa
2-
from .clients import ApiClient
2+
from .clients import ApiClient, WebSocketClient

src/deltadefi/clients/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
# flake8: noqa
22
from .client import *
3+
from .websocket import WebSocketClient

src/deltadefi/clients/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from deltadefi.clients.accounts import Accounts
44
from deltadefi.clients.markets import Market
55
from deltadefi.clients.orders import Order
6+
from deltadefi.clients.websocket import WebSocketClient
67
from deltadefi.models.models import OrderSide, OrderType
78
from deltadefi.responses import PostOrderResponse
89

@@ -43,6 +44,14 @@ def __init__(
4344
self.accounts = Accounts(base_url=self.base_url, api_key=api_key)
4445
self.orders = Order(base_url=self.base_url, api_key=api_key)
4546
self.markets = Market(base_url=self.base_url, api_key=api_key)
47+
48+
# Initialize WebSocket client with correct stream URL
49+
if network == "mainnet":
50+
ws_base_url = "wss://stream.deltadefi.io" # TODO: Update when mainnet is available
51+
else:
52+
ws_base_url = "wss://stream-staging.deltadefi.io"
53+
54+
self.websocket = WebSocketClient(base_url=ws_base_url, api_key=api_key)
4655

4756
def load_operation_key(self, password: str):
4857
"""

0 commit comments

Comments
 (0)