Skip to content

Commit e5dbadd

Browse files
committed
feat(rust): merge Processor into Worker
1 parent ccd1590 commit e5dbadd

File tree

99 files changed

+444
-1231
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+444
-1231
lines changed

examples/rust/file_transfer/examples/receiver.rs

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ struct FileReception {
2121

2222
#[ockam::worker]
2323
impl Worker for FileReception {
24-
type Context = Context;
2524
type Message = FileData;
2625

2726
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Self::Message>) -> Result<()> {

examples/rust/get_started/examples/bob.rs

-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ struct Echoer;
1010
// echoes it back on its return route.
1111
#[ockam::worker]
1212
impl Worker for Echoer {
13-
type Context = Context;
1413
type Message = String;
1514

1615
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {

examples/rust/get_started/src/echoer.rs

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ pub struct Echoer;
44

55
#[ockam::worker]
66
impl Worker for Echoer {
7-
type Context = Context;
87
type Message = String;
98

109
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {

examples/rust/get_started/src/hop.rs

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ pub struct Hop;
44

55
#[ockam::worker]
66
impl Worker for Hop {
7-
type Context = Context;
87
type Message = Any;
98

109
/// This handle function takes any incoming message and forwards

examples/rust/get_started/src/logger.rs

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ pub struct Logger;
44

55
#[ockam::worker]
66
impl Worker for Logger {
7-
type Context = Context;
87
type Message = Any;
98

109
/// This handle function takes any incoming message and print its content as a UTF-8 string

examples/rust/get_started/src/relay.rs

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ impl Relay {
1818

1919
#[ockam::worker]
2020
impl Worker for Relay {
21-
type Context = Context;
2221
type Message = Any;
2322

2423
/// This handle function takes any incoming message and forwards

examples/rust/get_started/tests/tests.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ fn run_05_secure_channel_over_two_transport_hops() -> Result<(), Error> {
101101
// Launch responder, wait for it to start up
102102
let resp =
103103
CmdBuilder::new("cargo run --locked --example 05-secure-channel-over-two-transport-hops-responder").spawn()?;
104-
resp.match_stdout("Initializing ockam processor")?;
104+
resp.match_stdout("Initializing ockam worker")?;
105105

106106
// Launch middle, wait for it to start up
107107
let mid =
108108
CmdBuilder::new("cargo run --locked --example 05-secure-channel-over-two-transport-hops-middle").spawn()?;
109-
mid.match_stdout("Initializing ockam processor")?;
109+
mid.match_stdout("Initializing ockam worker")?;
110110

111111
// Run initiator to completion
112112
let (exitcode, stdout) =

examples/rust/mitm_node/src/tcp_interceptor/workers/listener.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::tcp_interceptor::{Role, TcpMitmProcessor, TcpMitmRegistry, TcpMitmTransport};
22
use ockam_core::{async_trait, compat::net::SocketAddr};
3-
use ockam_core::{Address, Processor, Result};
4-
use ockam_node::Context;
3+
use ockam_core::{Address, Result};
4+
use ockam_node::{Context, Worker};
55
use ockam_transport_core::TransportError;
66
use tokio::net::{TcpListener, TcpStream};
77
use tracing::debug;
@@ -34,29 +34,29 @@ impl TcpMitmListenProcessor {
3434
target_addr,
3535
};
3636

37-
ctx.start_processor(address.clone(), processor)?;
37+
ctx.start_worker(address.clone(), processor)?;
3838

3939
Ok((saddr, address))
4040
}
4141
}
4242

4343
#[async_trait]
44-
impl Processor for TcpMitmListenProcessor {
45-
type Context = Context;
44+
impl Worker for TcpMitmListenProcessor {
45+
type Message = ();
4646

4747
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
4848
self.registry.add_listener(ctx.primary_address());
4949

5050
Ok(())
5151
}
5252

53-
async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
53+
async fn shutdown(&mut self, ctx: &mut Context) -> Result<()> {
5454
self.registry.remove_listener(ctx.primary_address());
5555

5656
Ok(())
5757
}
5858

59-
async fn process(&mut self, ctx: &mut Self::Context) -> Result<bool> {
59+
async fn process(&mut self, ctx: &mut Context) -> Result<bool> {
6060
debug!("Waiting for incoming TCP connection...");
6161

6262
let (stream, _peer) = self.inner.accept().await.map_err(TransportError::from)?;

examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use crate::tcp_interceptor::{Role, TcpMitmRegistry};
22
use ockam_core::compat::sync::Arc;
3+
use ockam_core::Result;
34
use ockam_core::{async_trait, Address, AllowAll};
4-
use ockam_core::{Processor, Result};
55
use ockam_node::compat::asynchronous::Mutex;
6-
use ockam_node::Context;
6+
use ockam_node::{Context, Worker};
77
use tokio::io::AsyncWriteExt;
88
use tokio::net::tcp::OwnedWriteHalf;
99
use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf};
@@ -47,15 +47,15 @@ impl TcpMitmProcessor {
4747

4848
let receiver = Self::new(address_of_other_processor, role, read_half, write_half, registry);
4949

50-
ctx.start_processor_with_access_control(address, receiver, AllowAll, AllowAll)?;
50+
ctx.start_worker_with_access_control(address, receiver, AllowAll, AllowAll)?;
5151

5252
Ok(())
5353
}
5454
}
5555

5656
#[async_trait]
57-
impl Processor for TcpMitmProcessor {
58-
type Context = Context;
57+
impl Worker for TcpMitmProcessor {
58+
type Message = ();
5959

6060
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
6161
self.registry
@@ -66,7 +66,7 @@ impl Processor for TcpMitmProcessor {
6666
Ok(())
6767
}
6868

69-
async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
69+
async fn shutdown(&mut self, ctx: &mut Context) -> Result<()> {
7070
self.registry.remove_processor(ctx.primary_address());
7171

7272
debug!("Shutdown {}", ctx.primary_address());

examples/rust/no_std/src/echoer.rs

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ pub struct Echoer;
66

77
#[ockam::worker]
88
impl Worker for Echoer {
9-
type Context = Context;
109
type Message = String;
1110

1211
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {

examples/rust/no_std/src/hop.rs

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ pub struct Hop;
66

77
#[ockam::worker]
88
impl Worker for Hop {
9-
type Context = Context;
109
type Message = Any;
1110

1211
/// This handle function takes any incoming message and forwards

examples/rust/tcp_inlet_and_outlet/tests/tests.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ fn run_01_inlet_outlet_one_process() -> Result<(), Error> {
88
"cargo run --locked --example 01-inlet-outlet 127.0.0.1:{port} ockam.io:80"
99
))
1010
.spawn()?;
11-
runner.match_stdout(r"(?i)Starting new processor")?;
11+
runner.match_stdout(r"(?i)Starting new worker")?;
1212

1313
// Run curl and check for a successful run
1414
let (exitcode, stdout) =

implementations/rust/ockam/ockam/src/lib.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub use ockam_core::processor;
6767
pub use ockam_core::worker;
6868
pub use ockam_core::{
6969
allow, deny, errcode, route, Address, Any, Encoded, Error, LocalMessage, Mailbox, Mailboxes,
70-
Message, Processor, ProtocolId, Result, Route, Routed, TransportMessage, TryClone, Worker,
70+
Message, ProtocolId, Result, Route, Routed, TransportMessage, TryClone,
7171
};
7272
pub use ockam_identity as identity;
7373
// ---
@@ -78,7 +78,7 @@ pub use ockam_macros::{node, test};
7878
pub use ockam_node::database::*;
7979
pub use ockam_node::{
8080
debugger, Context, DelayedEvent, Executor, MessageReceiveOptions, MessageSendReceiveOptions,
81-
NodeBuilder, WorkerBuilder,
81+
NodeBuilder, Worker, WorkerBuilder,
8282
};
8383
#[cfg(feature = "ockam_transport_tcp")]
8484
/// TCP transport

implementations/rust/ockam/ockam/src/node.rs

+5-35
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ use ockam_core::compat::string::String;
22
use ockam_core::compat::sync::Arc;
33
use ockam_core::flow_control::FlowControls;
44
use ockam_core::{
5-
Address, IncomingAccessControl, Message, OutgoingAccessControl, Processor, Result, Route,
6-
Routed, TryClone, Worker,
5+
Address, IncomingAccessControl, Message, OutgoingAccessControl, Result, Route, Routed, TryClone,
76
};
87
use ockam_identity::{
98
CredentialRepository, IdentitiesAttributes, IdentitiesVerification,
109
IdentityAttributesRepository, PurposeKeys, Vault,
1110
};
12-
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions};
11+
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions, Worker};
1312
use ockam_vault::storage::SecretsRepository;
1413
use ockam_vault::SigningSecretKeyHandle;
1514

@@ -161,51 +160,22 @@ impl Node {
161160
}
162161

163162
/// Start a new worker instance at the given address. Default Access Control is AllowAll
164-
pub fn start_worker<W>(&self, address: impl Into<Address>, worker: W) -> Result<()>
165-
where
166-
W: Worker<Context = Context>,
167-
{
163+
pub fn start_worker<W: Worker>(&self, address: impl Into<Address>, worker: W) -> Result<()> {
168164
self.context.start_worker(address, worker)
169165
}
170166

171167
/// Start a new worker instance at the given address with given Access Controls
172-
pub fn start_worker_with_access_control<W>(
168+
pub fn start_worker_with_access_control<W: Worker>(
173169
&self,
174170
address: impl Into<Address>,
175171
worker: W,
176172
incoming: impl IncomingAccessControl,
177173
outgoing: impl OutgoingAccessControl,
178-
) -> Result<()>
179-
where
180-
W: Worker<Context = Context>,
181-
{
174+
) -> Result<()> {
182175
self.context
183176
.start_worker_with_access_control(address, worker, incoming, outgoing)
184177
}
185178

186-
/// Start a new processor instance at the given address. Default Access Control is DenyAll
187-
pub fn start_processor<P>(&self, address: impl Into<Address>, processor: P) -> Result<()>
188-
where
189-
P: Processor<Context = Context>,
190-
{
191-
self.context.start_processor(address, processor)
192-
}
193-
194-
/// Start a new processor instance at the given address with given Access Controls
195-
pub fn start_processor_with_access_control<P>(
196-
&self,
197-
address: impl Into<Address>,
198-
processor: P,
199-
incoming: impl IncomingAccessControl,
200-
outgoing: impl OutgoingAccessControl,
201-
) -> Result<()>
202-
where
203-
P: Processor<Context = Context>,
204-
{
205-
self.context
206-
.start_processor_with_access_control(address, processor, incoming, outgoing)
207-
}
208-
209179
/// Signal to the local runtime to shut down
210180
pub async fn shutdown(&mut self) -> Result<()> {
211181
self.context.shutdown_node().await

implementations/rust/ockam/ockam/src/relay_service/relay.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use ockam_core::compat::sync::Arc;
33
use ockam_core::compat::{boxed::Box, vec::Vec};
44
use ockam_core::{
55
route, Address, AllowAll, AllowOnwardAddress, Any, IncomingAccessControl, LocalMessage,
6-
OutgoingAccessControl, Result, Route, Routed, Worker,
6+
OutgoingAccessControl, Result, Route, Routed,
77
};
8-
use ockam_node::WorkerBuilder;
8+
use ockam_node::{Worker, WorkerBuilder};
99
use tracing::info;
1010

1111
pub(super) struct Relay {
@@ -52,10 +52,9 @@ impl Relay {
5252

5353
#[crate::worker]
5454
impl Worker for Relay {
55-
type Context = Context;
5655
type Message = Any;
5756

58-
async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
57+
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
5958
let payload = self
6059
.payload
6160
.take()
@@ -77,7 +76,7 @@ impl Worker for Relay {
7776

7877
async fn handle_message(
7978
&mut self,
80-
ctx: &mut Self::Context,
79+
ctx: &mut Context,
8180
msg: Routed<Self::Message>,
8281
) -> Result<()> {
8382
let mut local_message = msg.into_local_message();

implementations/rust/ockam/ockam/src/relay_service/relay_service.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use alloc::string::String;
55
use ockam_core::compat::boxed::Box;
66
use ockam_core::compat::sync::Arc;
77
use ockam_core::{
8-
Address, DenyAll, Encodable, Mailbox, Mailboxes, Result, Routed, SecureChannelLocalInfo, Worker,
8+
Address, DenyAll, Encodable, Mailbox, Mailboxes, Result, Routed, SecureChannelLocalInfo,
99
};
10-
use ockam_node::WorkerBuilder;
10+
use ockam_node::{Worker, WorkerBuilder};
1111

1212
/// Alias worker to register remote workers under local names.
1313
///
@@ -62,12 +62,11 @@ impl RelayService {
6262

6363
#[crate::worker]
6464
impl Worker for RelayService {
65-
type Context = Context;
6665
type Message = String;
6766

6867
async fn handle_message(
6968
&mut self,
70-
ctx: &mut Self::Context,
69+
ctx: &mut Context,
7170
message: Routed<Self::Message>,
7271
) -> Result<()> {
7372
let secure_channel_local_info =

implementations/rust/ockam/ockam/src/remote/worker.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ use ockam_core::compat::{
44
boxed::Box,
55
string::{String, ToString},
66
};
7-
use ockam_core::{Any, Decodable, Result, Routed, Worker};
7+
use ockam_core::{Any, Decodable, Result, Routed};
8+
use ockam_node::Worker;
89
use tracing::{debug, info};
910

1011
#[crate::worker]
1112
impl Worker for RemoteRelay {
12-
type Context = Context;
1313
type Message = Any;
1414

15-
async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
15+
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
1616
debug!(registration_route = %self.registration_route, "RemoteRelay initializing...");
1717

1818
ctx.send_from_address(

implementations/rust/ockam/ockam_api/src/authenticator/credential_issuer/credential_issuer_worker.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use ockam_core::api::{Method, RequestHeader, Response};
1010
use ockam_core::compat::boxed::Box;
1111
use ockam_core::compat::sync::Arc;
1212
use ockam_core::compat::vec::Vec;
13-
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
14-
use ockam_node::Context;
13+
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
14+
use ockam_node::{Context, Worker};
1515

1616
/// This struct runs as a Worker to issue credentials based on a request/response protocol
1717
pub struct CredentialIssuerWorker {
@@ -48,7 +48,6 @@ impl CredentialIssuerWorker {
4848

4949
#[ockam_core::worker]
5050
impl Worker for CredentialIssuerWorker {
51-
type Context = Context;
5251
type Message = Vec<u8>;
5352

5453
async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {

implementations/rust/ockam/ockam_api/src/authenticator/direct/direct_authenticator_worker.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use tracing::trace;
55
use ockam::identity::{Identifier, IdentitiesAttributes};
66
use ockam_core::api::{Method, RequestHeader, Response};
77
use ockam_core::compat::sync::Arc;
8-
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
9-
use ockam_node::Context;
8+
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
9+
use ockam_node::{Context, Worker};
1010

1111
use crate::authenticator::direct::types::AddMember;
1212
use crate::authenticator::direct::DirectAuthenticator;
@@ -37,7 +37,6 @@ impl DirectAuthenticatorWorker {
3737
#[ockam_core::worker]
3838
impl Worker for DirectAuthenticatorWorker {
3939
type Message = Vec<u8>;
40-
type Context = Context;
4140

4241
async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
4342
let secure_channel_info = match SecureChannelLocalInfo::find_info(m.local_message()) {

implementations/rust/ockam/ockam_api/src/authenticator/enrollment_tokens/acceptor_worker.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use minicbor::Decoder;
66
use ockam::identity::Identifier;
77
use ockam_core::api::{Method, RequestHeader, Response};
88
use ockam_core::compat::sync::Arc;
9-
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
10-
use ockam_node::Context;
9+
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
10+
use ockam_node::{Context, Worker};
1111
use tracing::trace;
1212

1313
pub struct EnrollmentTokenAcceptorWorker {
@@ -27,7 +27,6 @@ impl EnrollmentTokenAcceptorWorker {
2727

2828
#[ockam_core::worker]
2929
impl Worker for EnrollmentTokenAcceptorWorker {
30-
type Context = Context;
3130
type Message = Vec<u8>;
3231

3332
async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {

0 commit comments

Comments
 (0)