Skip to content

Commit c3c8373

Browse files
committed
fix
1 parent 35c9c43 commit c3c8373

File tree

2 files changed

+141
-81
lines changed

2 files changed

+141
-81
lines changed

src/indexer/pangea_event_handler.rs

Lines changed: 140 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use chrono::Utc;
55
use prometheus::{register_int_counter, IntCounter};
66
use serde::{Deserialize, Serialize};
77
use std::sync::Arc;
8-
use tracing::{debug, error, warn};
8+
use tracing::{info, error, warn};
99

1010
lazy_static::lazy_static! {
1111
static ref PROCESSED_ORDERS_TOTAL: IntCounter = register_int_counter!(
@@ -42,124 +42,187 @@ pub async fn handle_order_event(
4242
matching_orders: Arc<MatchingOrders>,
4343
event: PangeaOrderEvent,
4444
) {
45+
if let Some(ref et) = event.event_type {
46+
info!(
47+
target: "pangea_events",
48+
">> handle_order_event: event_type = {:?}, order_id = {}, block = {}, tx = {}, log_index = {}",
49+
et,
50+
event.order_id,
51+
event.block_number,
52+
event.transaction_hash,
53+
event.log_index
54+
);
55+
} else {
56+
info!(
57+
target: "pangea_events",
58+
">> handle_order_event: NO event_type, order_id = {}, block = {}, tx = {}, log_index = {}",
59+
event.order_id,
60+
event.block_number,
61+
event.transaction_hash,
62+
event.log_index
63+
);
64+
}
65+
4566
if let Some(event_type) = event.event_type.as_deref() {
4667
match event_type {
4768
"Open" => {
4869
if let Some(order) = create_new_order_from_event(&event) {
70+
info!(
71+
">> Adding new order to order_book: id = {}, amount = {}, price = {}, limit_type = {:?}",
72+
order.id, order.amount, order.price, order.limit_type
73+
);
4974
order_book.add_order(order);
50-
debug!("🟢 New order added (id: {})", event.order_id);
75+
info!("🟢 New order added (id: {})", event.order_id);
76+
} else {
77+
warn!(
78+
"Open event received but could not create SpotOrder (fields missing?) for order_id = {}",
79+
event.order_id
80+
);
5181
}
5282
}
5383
"Trade" => {
5484
matching_orders.remove(&event.order_id);
55-
debug!(
85+
info!(
5686
"🔄 Order {} removed from matching_orders (Trade event)",
5787
&event.order_id
5888
);
5989
if let Some(match_size) = event.amount {
6090
let o_type = event.order_type_to_enum();
6191
let l_type = event.limit_type_to_enum();
92+
93+
info!(
94+
">> process_trade called. event_tx_id = {:?}, order_id = {}, match_size = {}, order_type = {:?}, limit_type = {:?}",
95+
event.transaction_hash, event.order_id, match_size, o_type, l_type
96+
);
97+
6298
process_trade(&order_book, &event.order_id, match_size, o_type, l_type);
99+
} else {
100+
warn!(
101+
"Trade event received without amount (match_size) for order_id = {}",
102+
event.order_id
103+
);
63104
}
64105
}
65106
"Cancel" => {
66107
matching_orders.remove(&event.order_id);
67-
debug!(
108+
info!(
68109
"🔄 Order {} removed from matching_orders (Cancel event)",
69110
&event.order_id
70111
);
71112
order_book.remove_order(&event.order_id, event.order_type_to_enum());
72-
debug!(
113+
info!(
73114
"Removed order with id: {} due to Cancel event",
74115
event.order_id
75116
);
76117
}
77118
_ => {
78-
error!("❌ Received order event without type: {:?}", event);
119+
error!("❌ Received order event with unsupported type: {:?}", event.event_type);
79120
}
80121
}
122+
} else {
123+
error!("❌ Received order event without type: {:?}", event);
81124
}
82125
}
83126

84127
fn create_new_order_from_event(event: &PangeaOrderEvent) -> Option<SpotOrder> {
85-
if let (Some(price), Some(amount), Some(order_type), Some(limit_type), Some(user)) = (
86-
event.price,
87-
event.amount,
88-
event.order_type.as_deref(),
89-
event.limit_type.as_deref(),
90-
&event.user,
91-
) {
92-
let order_type_enum = match order_type {
93-
"Buy" => OrderType::Buy,
94-
"Sell" => OrderType::Sell,
95-
_ => return None,
96-
};
97-
let limit_type_enum = match limit_type {
98-
"GTC" => Some(LimitType::GTC),
99-
"IOC" => Some(LimitType::IOC),
100-
"FOK" => Some(LimitType::FOK),
101-
"MKT" => Some(LimitType::MKT),
102-
_ => None,
103-
};
104-
105-
Some(SpotOrder {
106-
id: event.order_id.clone(),
107-
user: user.clone(),
108-
asset: event.asset.clone().unwrap_or_default(),
109-
amount,
110-
price,
111-
timestamp: Utc::now().timestamp_millis() as u64,
112-
order_type: order_type_enum,
113-
limit_type: limit_type_enum,
114-
status: Some(OrderStatus::New),
115-
})
116-
} else {
117-
warn!("Missing required fields in event: {:?}", event);
118-
None
119-
}
128+
let price = event.price?;
129+
let amount = event.amount?;
130+
let order_type_str = event.order_type.as_deref()?;
131+
let user = event.user.as_ref()?;
132+
133+
let order_type_enum = match order_type_str {
134+
"Buy" => OrderType::Buy,
135+
"Sell" => OrderType::Sell,
136+
_ => {
137+
warn!("Unknown order_type: {} for order_id {}", order_type_str, event.order_id);
138+
return None;
139+
}
140+
};
141+
142+
let limit_type_enum = event.limit_type_to_enum();
143+
144+
Some(SpotOrder {
145+
id: event.order_id.clone(),
146+
user: user.clone(),
147+
asset: event.asset.clone().unwrap_or_default(),
148+
amount,
149+
price,
150+
timestamp: Utc::now().timestamp_millis() as u64,
151+
order_type: order_type_enum,
152+
limit_type: Some(limit_type_enum),
153+
status: Some(OrderStatus::New),
154+
})
120155
}
121156

122157
pub fn process_trade(
123158
order_book: &OrderBook,
124159
order_id: &str,
125160
trade_amount: u128,
126161
order_type: Option<OrderType>,
127-
limit_type: Option<LimitType>,
162+
limit_type: LimitType,
128163
) {
129-
match (order_type, limit_type) {
130-
(Some(order_type), Some(limit_type)) => match limit_type {
131-
LimitType::GTC => {
132-
if let Some(mut order) = order_book.get_order(order_id, order_type) {
133-
if order.amount > trade_amount {
134-
order.amount -= trade_amount;
135-
order.status = Some(OrderStatus::PartiallyMatched);
136-
order_book.update_order(order.clone());
137-
138-
debug!(
139-
"Updated order with id: {} - partially matched, remaining amount: {}",
140-
order_id, order.amount
141-
);
142-
} else {
143-
order.status = Some(OrderStatus::Matched);
144-
order_book.remove_order(order_id, Some(order_type));
145-
debug!("Removed order with id: {} - fully matched", order_id);
146-
}
164+
let order_type = match order_type {
165+
Some(t) => t,
166+
None => {
167+
error!(
168+
"!!! process_trade: Unknown order_type for {}. Ignoring trade event.",
169+
order_id
170+
);
171+
return;
172+
}
173+
};
174+
175+
info!(
176+
"process_trade START: order_id = {}, trade_amount = {}, order_type = {:?}, limit_type = {:?}",
177+
order_id, trade_amount, order_type, limit_type
178+
);
179+
180+
match limit_type {
181+
LimitType::GTC | LimitType::MKT => {
182+
if let Some(mut order) = order_book.get_order(order_id, order_type) {
183+
info!(
184+
">> Found order in order_book: id = {}, current_amount = {}, status = {:?}",
185+
order.id, order.amount, order.status
186+
);
187+
188+
if order.amount > trade_amount {
189+
order.amount -= trade_amount;
190+
order.status = Some(OrderStatus::PartiallyMatched);
191+
order_book.update_order(order.clone());
192+
193+
info!(
194+
"🟡 Order {} partially matched. Trade amount: {}, remaining amount: {}",
195+
order_id, trade_amount, order.amount
196+
);
147197
} else {
148-
error!("Order with id: {} not found for trade event", order_id);
198+
order.status = Some(OrderStatus::Matched);
199+
order_book.remove_order(order_id, Some(order_type));
200+
201+
info!(
202+
"✅ Order {} fully matched and removed. Trade amount: {}, previous amount: {}",
203+
order_id,
204+
trade_amount,
205+
order.amount
206+
);
149207
}
208+
} else {
209+
error!("❌ process_trade: order {} not found in order_book", order_id);
150210
}
151-
_ => {
152-
order_book.remove_order(order_id, Some(order_type));
153-
debug!("Removed order with id: {} - FOK or IOC matched", order_id);
154-
}
155-
},
156-
_ => {
157-
error!(
158-
"Order type or limit type is None for order_id: {}. Cannot process trade event.",
159-
order_id
211+
}
212+
LimitType::FOK | LimitType::IOC => {
213+
order_book.remove_order(order_id, Some(order_type));
214+
info!(
215+
"🗑️ Order {} removed (FOK/IOC). trade_amount = {}",
216+
order_id, trade_amount
160217
);
161218
}
162219
}
220+
221+
// Дополнительный лог, когда выходим
222+
info!(
223+
"process_trade END: order_id = {}, limit_type = {:?}",
224+
order_id, limit_type
225+
);
163226
}
164227

165228
impl PangeaOrderEvent {
@@ -173,15 +236,12 @@ impl PangeaOrderEvent {
173236
})
174237
}
175238

176-
pub fn limit_type_to_enum(&self) -> Option<LimitType> {
177-
self.limit_type
178-
.as_deref()
179-
.and_then(|limit_type| match limit_type {
180-
"FOK" => Some(LimitType::FOK),
181-
"IOC" => Some(LimitType::IOC),
182-
"GTC" => Some(LimitType::GTC),
183-
"MKT" => Some(LimitType::MKT),
184-
_ => None,
185-
})
239+
pub fn limit_type_to_enum(&self) -> LimitType {
240+
match self.limit_type.as_deref() {
241+
Some("FOK") => LimitType::FOK,
242+
Some("IOC") => LimitType::IOC,
243+
Some("MKT") => LimitType::MKT,
244+
_ => LimitType::GTC,
245+
}
186246
}
187247
}

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ async fn main() -> Result<(), Error> {
4343
dotenv::dotenv().ok();
4444

4545
tracing_subscriber::fmt()
46-
.with_target(false)
46+
.with_target(true)
4747
.with_level(true)
4848
.init();
4949
info!("Logger initialized");

0 commit comments

Comments
 (0)