Skip to content

Commit a2f1442

Browse files
committed
Debugged to some extend
1 parent dc89c34 commit a2f1442

13 files changed

Lines changed: 473 additions & 140 deletions

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ edition = "2021"
55

66
[dependencies]
77

8-
my-web-socket-client = { tag = "0.2.0", git = "https://github.com/MyJetTools/my-web-socket-client.git" }
8+
my-web-socket-client = { path = "../my-web-socket-client" }
99
socket-io-utils = { tag = "0.2.0", git = "https://github.com/MyJetTools/socket-io-utils.git" }
1010

1111
rust-extensions = { tag = "0.1.5", git = "https://github.com/MyJetTools/rust-extensions.git", features = [
@@ -15,3 +15,4 @@ async-trait = "*"
1515
tokio = { version = "*", features = ["full"] }
1616

1717
serde = { version = "*", features = ["derive"] }
18+
serde_json = "*"

README.md

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
### How to use it
2+
3+
```rust
4+
5+
pub struct AppSettings{
6+
pub my_socket_io_url: String,
7+
}
8+
9+
#[async_trait::async_trait]
10+
impl WsClientSettings for AppSettings {
11+
async fn get_url(&self, client_name: &str) -> String {
12+
13+
if client_name == "my-client-name"{
14+
return self.my_socket_io_url.to_string();
15+
}
16+
17+
panic!("Unknown socket-io client: '{}'", client_name);
18+
}
19+
}
20+
21+
```
22+
23+
24+
## How to setup subscriber
25+
26+
This example subscribes to the stream event and deserializes the payload based on the `type` field.
27+
28+
```rust
29+
use my_socket_io_client::*;
30+
use serde::*;
31+
32+
pub struct StreamsSocketIo;
33+
34+
#[derive(Debug)]
35+
pub enum SocketIoStreamModel {
36+
AccountType(AccountTypeSocketIoModel),
37+
Property(PropertySocketIoModel),
38+
}
39+
40+
impl SocketIoSubscribeEventModel for SocketIoStreamModel {
41+
const NAME_SPACE: &'static str = "/brand-socket";
42+
43+
const EVENT_NAME: &'static str = "stream";
44+
45+
fn deserialize(payload: &str) -> Self {
46+
let type_model: StreamTypeModel = serde_json::from_str(payload).unwrap();
47+
48+
match type_model.r#type.as_str() {
49+
"AccountStatus" => Self::AccountType(serde_json::from_str(payload).unwrap()),
50+
"Property" => Self::Property(serde_json::from_str(payload).unwrap()),
51+
_ => {
52+
panic!("Unknown stream type: {}", type_model.r#type);
53+
}
54+
}
55+
}
56+
}
57+
58+
#[derive(Debug, Serialize, Deserialize)]
59+
pub struct StreamTypeModel {
60+
#[serde(rename = "type")]
61+
pub r#type: String,
62+
}
63+
64+
#[derive(Debug, Serialize, Deserialize)]
65+
pub struct AccountTypeSocketIoModel {
66+
#[serde(rename = "accountId")]
67+
pub account_id: String,
68+
pub currency: String,
69+
pub balance: Option<String>,
70+
#[serde(rename = "marginAvailable")]
71+
pub margin_available: Option<String>,
72+
pub credit: Option<String>,
73+
}
74+
75+
#[derive(Debug, Serialize, Deserialize)]
76+
pub struct PropertySocketIoModel {
77+
pub name: String,
78+
}
79+
80+
#[async_trait::async_trait]
81+
impl SocketIoEventSubscriberCallback<SocketIoStreamModel, ()> for StreamsSocketIo {
82+
async fn on_event(&self, event_payload: SocketIoStreamModel) -> () {
83+
println!("Received event: {:?}", event_payload);
84+
()
85+
}
86+
}
87+
88+
89+
90+
```
91+
92+
93+
94+
### Setup socket-io client
95+
96+
```rust
97+
use std::sync::Arc;
98+
99+
use my_socket_io_client::*;
100+
use streams_socket_io::StreamsSocketIo;
101+
102+
mod streams_socket_io;
103+
104+
#[tokio::main]
105+
async fn main() {
106+
let settings = Arc::new(AppSettings);
107+
108+
my_web_socket_client::my_tls::install_default_crypto_providers();
109+
110+
let callbacks = Arc::new(AppSocketIoCallbacks);
111+
let socket_io_client = MySocketIoClient::new(
112+
"my-client-name",
113+
settings,
114+
callbacks,
115+
my_logger::LOGGER.clone(),
116+
)
117+
.set_debug_payloads(true);
118+
119+
socket_io_client
120+
.register_subscriber(Arc::new(StreamsSocketIo))
121+
.await;
122+
123+
socket_io_client.start();
124+
println!("Starting");
125+
loop {
126+
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
127+
}
128+
}
129+
130+
131+
pub struct AppSocketIoCallbacks;
132+
133+
#[async_trait::async_trait]
134+
impl SocketIoCallbacks for AppSocketIoCallbacks {
135+
async fn before_connect(&self) -> SocketIoBeforeConnectResult {
136+
SocketIoBeforeConnectResult {
137+
append_headers: vec![("brand-api-key".into(), "key".into())].into(),
138+
//append_headers: None,
139+
append_query_params: vec![("type".into(), "LIVE".into())].into(),
140+
}
141+
}
142+
async fn on_connect(&self, _socket: Arc<SocketIoConnection>) {
143+
println!("Connected to Socket-Io");
144+
}
145+
async fn on_disconnect(&self, _socket: Arc<SocketIoConnection>) {
146+
println!("Disconnected from Socket-Io");
147+
}
148+
}
149+
150+
151+
```

src/client.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use my_web_socket_client::*;
44
use rust_extensions::{Logger, StrOrString};
55

6-
use crate::{client_inner::ClientInner, SocketIoCallbacks};
6+
use crate::{client_inner::*, *};
77

88
pub struct MySocketIoClient {
99
ws_client: WebSocketClient,
@@ -17,11 +17,13 @@ impl MySocketIoClient {
1717
callbacks: Arc<dyn SocketIoCallbacks + Send + Sync + 'static>,
1818
logger: Arc<dyn Logger + Send + Sync + 'static>,
1919
) -> Self {
20-
let ws_client = WebSocketClient::new(name, settings, logger);
20+
let name = name.into();
21+
let name = Arc::new(name);
22+
let ws_client = WebSocketClient::new(name.clone(), settings, logger.clone());
2123

2224
MySocketIoClient {
2325
ws_client,
24-
inner: Arc::new(ClientInner::new(callbacks)),
26+
inner: Arc::new(ClientInner::new(name, callbacks, logger)),
2527
}
2628
}
2729

@@ -35,4 +37,21 @@ impl MySocketIoClient {
3537
pub fn start(&self) {
3638
self.ws_client.start(None, self.inner.clone());
3739
}
40+
41+
pub async fn register_subscriber<
42+
TModel: SocketIoSubscribeEventModel + Send + Sync + 'static,
43+
TOutModel: SocketIoSubscribeOutModel + Send + Sync + 'static + serde::Serialize,
44+
>(
45+
&self,
46+
callbacks: Arc<
47+
dyn SocketIoEventSubscriberCallback<TModel, TOutModel> + Send + Sync + 'static,
48+
>,
49+
) {
50+
let subscriber = SocketIoEventSubscriber { callbacks };
51+
let subscriber = Arc::new(subscriber);
52+
self.inner
53+
.event_subscribers
54+
.register(TModel::NAME_SPACE, TModel::EVENT_NAME, subscriber)
55+
.await;
56+
}
3857
}

0 commit comments

Comments
 (0)