From 9d9756021935332f31598da08db2ac7db10681c2 Mon Sep 17 00:00:00 2001 From: museun Date: Sun, 6 Dec 2020 01:24:49 -0500 Subject: [PATCH 1/7] remove runtime stuff --- Cargo.toml | 66 +++----- src/connector.rs | 251 ----------------------------- src/connector/async_io/mod.rs | 13 -- src/connector/async_io/non_tls.rs | 38 ----- src/connector/async_io/tls.rs | 46 ------ src/connector/async_std/mod.rs | 10 -- src/connector/async_std/non_tls.rs | 38 ----- src/connector/async_std/tls.rs | 49 ------ src/connector/smol/mod.rs | 13 -- src/connector/smol/non_tls.rs | 38 ----- src/connector/smol/tls.rs | 48 ------ src/connector/tokio/mod.rs | 22 --- src/connector/tokio/native_tls.rs | 61 ------- src/connector/tokio/non_tls.rs | 42 ----- src/connector/tokio/openssl.rs | 62 ------- src/connector/tokio/rustls.rs | 61 ------- src/lib.rs | 5 +- src/runner/async_runner.rs | 1 - 18 files changed, 22 insertions(+), 842 deletions(-) delete mode 100644 src/connector.rs delete mode 100644 src/connector/async_io/mod.rs delete mode 100644 src/connector/async_io/non_tls.rs delete mode 100644 src/connector/async_io/tls.rs delete mode 100644 src/connector/async_std/mod.rs delete mode 100644 src/connector/async_std/non_tls.rs delete mode 100644 src/connector/async_std/tls.rs delete mode 100644 src/connector/smol/mod.rs delete mode 100644 src/connector/smol/non_tls.rs delete mode 100644 src/connector/smol/tls.rs delete mode 100644 src/connector/tokio/mod.rs delete mode 100644 src/connector/tokio/native_tls.rs delete mode 100644 src/connector/tokio/non_tls.rs delete mode 100644 src/connector/tokio/openssl.rs delete mode 100644 src/connector/tokio/rustls.rs diff --git a/Cargo.toml b/Cargo.toml index 14a5101..a7b7daf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,10 +16,10 @@ rustdoc-args = ["--cfg", "docsrs"] all-features = true [features] -default = [] +default = ["async"] testing = [ "async", - "async-mutex", + "async-mutex", ] async = [ @@ -57,61 +57,35 @@ fastrand = { version = "1.3", optional = true } # for optional serialization and deserialization serde = { version = "1.0", features = ["derive"], optional = true } -# optional runtimes (for TcpStream) -# these use the futures AsyncWrite+AsyncRead -async-io = { version = "1.1", optional = true } -smol = { version = "1.2", optional = true } -async-tls = { version = "0.10", default-features = false, features = ["client"], optional = true } -# TODO look into what their features do. the ones they have enabled by default seem important -async-std = { version = "1.6", optional = true } - -# tokio has its own AsyncWrite+AsyncRead -tokio = { version = "0.2", features = ["net"], optional = true } -tokio-util = { version = "0.3", features = ["compat"], optional = true } - -# rustls -tokio-rustls = { version = "0.14", optional = true } -webpki-roots = { version = "0.20", optional = true } - -# native-tls -tokio-native-tls = { version = "0.1", optional = true } -native-tls = { version = "0.2", optional = true } - -# openssl -tokio-openssl = { version = "0.4", optional = true } -openssl = { version = "0.10", optional = true, features = ["v110"] } - # for some test utilities async-mutex = { version = "1.4", optional = true } - [dev-dependencies] anyhow = "1.0" async-executor = { version = "1.3", default-features = false } serde_json = "1.0" rmp-serde = "0.14" -[[example]] -name = "message_parse" -required-features = ["async"] - -[[example]] -name = "smol_demo" -required-features = ["smol", "async"] +# [[example]] +# name = "message_parse" +# required-features = ["async"] -[[example]] -name = "async_io_demo" -required-features = ["async-io", "async"] +# [[example]] +# name = "smol_demo" +# required-features = ["smol", "async"] -[[example]] -name = "async_std_demo" -required-features = ["async-std", "async-std/attributes", "async"] +# [[example]] +# name = "async_io_demo" +# required-features = ["async-io", "async"] -[[example]] -name = "tokio_demo" -required-features = ["tokio/full", "tokio-util", "async"] +# [[example]] +# name = "async_std_demo" +# required-features = ["async-std", "async-std/attributes", "async"] -[[example]] -name = "simple_bot" -required-features = ["smol", "async"] +# [[example]] +# name = "tokio_demo" +# required-features = ["tokio/full", "tokio-util", "async"] +# [[example]] +# name = "simple_bot" +# required-features = ["smol", "async"] diff --git a/src/connector.rs b/src/connector.rs deleted file mode 100644 index 95cd8c5..0000000 --- a/src/connector.rs +++ /dev/null @@ -1,251 +0,0 @@ -//! This module lets you choose which runtime you want to use. -//! -//! By default, TLS is disabled to make building the crate on various platforms easier. -//! -//! To use.. -//! -//! | Read/Write provider | Features | -//! | --- | --- | -//! | [`async_io`](https://docs.rs/async-io/latest/async_io/) |`async-io` | -//! | [`smol`](https://docs.rs/smol/latest/smol/) |`smol` | -//! | [`async_std`](https://docs.rs/async-std/latest/async_std/) |`async-std` | -//! | [`tokio`](https://docs.rs/tokio/0.2/tokio/) |`tokio` and `tokio-util` | -//! -//! ## TLS -//! -//! If you want TLS supports, enable the above runtime and also enable the cooresponding features: -//! -//! | Read/Write provider | Runtime | Features | TLS backend | -//! | ---------------------------------------------------------- | ----------- | ---------------------------------------------------- | -------------------------- | -//! | [`async_io`](https://docs.rs/async-io/latest/async_io/) | `async_io` | `"async-tls"` | [`rustls`][rustls] | -//! | [`smol`](https://docs.rs/smol/latest/smol/) | `smol` | `"async-tls"` | [`rustls`][rustls] | -//! | [`async_std`](https://docs.rs/async-std/latest/async_std/) | `async_std` | `"async-tls"` | [`rustls`][rustls] | -//! | [`tokio`](https://docs.rs/tokio/0.2/tokio/) | `tokio` | `"tokio-util"`, `"tokio-rustls"`, `"webpki-roots"` | [`rustls`][rustls] | -//! | [`tokio`](https://docs.rs/tokio/0.2/tokio/) | `tokio` | `"tokio-util"`, `"tokio-native-tls"`, `"native-tls"` | [`native-tls`][native-tls] | -//! | [`tokio`](https://docs.rs/tokio/0.2/tokio/) | `tokio` | `"tokio-util"`, `"tokio-openssl"`, `"openssl"` | [`openssl`][openssl] | -//! -//! [rustls]: https://docs.rs/rustls/0.18.1/rustls/ -//! [native-tls]: https://docs.rs/native-tls/0.2.4/native_tls/ -//! [openssl]: https://docs.rs/openssl/0.10/openssl/ -//! -use futures_lite::{AsyncRead, AsyncWrite}; -use std::{future::Future, io::Result as IoResult, net::SocketAddr}; - -#[allow(unused_macros)] -macro_rules! connector_ctor { - (non_tls: $(#[$meta:meta])*) => { - #[doc = "Create a new"] - $(#[$meta])* - #[doc = "non-TLS connector that connects to the ***default Twitch*** address."] - pub fn twitch() -> ::std::io::Result { - Self::custom($crate::TWITCH_IRC_ADDRESS) - } - - #[doc = "Create a new"] - $(#[$meta])* - #[doc = "non-TLS connector with a custom address."] - pub fn custom(addrs: A) -> ::std::io::Result - where - A: ::std::net::ToSocketAddrs, - { - addrs.to_socket_addrs().map(|addrs| Self { - addrs: addrs.collect(), - }) - } - }; - - (tls: $(#[$meta:meta])*) => { - #[doc = "Create a new"] - $(#[$meta])* - #[doc = "TLS connector that connects to the ***default Twitch*** address."] - pub fn twitch() -> ::std::io::Result { - Self::custom($crate::TWITCH_IRC_ADDRESS_TLS, $crate::TWITCH_TLS_DOMAIN) - } - - - #[doc = "Create a new"] - $(#[$meta])* - #[doc = "TLS connector with a custom address and TLS domain."] - pub fn custom(addrs: A, domain: D) -> ::std::io::Result - where - A: ::std::net::ToSocketAddrs, - D: Into<::std::string::String>, - { - let tls_domain = domain.into(); - addrs.to_socket_addrs().map(|addrs| Self { - addrs: addrs.collect(), - tls_domain, - }) - } - }; -} - -#[cfg(feature = "async-io")] -/// Connector for using an [`async_io`](https://docs.rs/async-io/latest/async_io/) wrapper over [`std::net::TcpStream`](https://doc.rust-lang.org/std/net/struct.TcpStream.html) -pub mod async_io; - -#[cfg(feature = "async-io")] -#[doc(inline)] -pub use self::async_io::Connector as AsyncIoConnector; - -#[cfg(all(feature = "async-io", feature = "async-tls"))] -#[doc(inline)] -pub use self::async_io::ConnectorTls as AsyncIoConnectorTls; - -#[cfg(feature = "async-std")] -/// Connector for using an [`async_std::net::TcpStream`](https://docs.rs/async-std/latest/async_std/net/struct.TcpStream.html) -pub mod async_std; - -#[cfg(feature = "async-std")] -#[doc(inline)] -pub use self::async_std::Connector as AsyncStdConnector; - -#[cfg(all(feature = "async-std", feature = "async-tls"))] -#[doc(inline)] -pub use self::async_std::ConnectorTls as AsyncStdConnectorTls; - -#[cfg(feature = "smol")] -/// Connector for using a [`smol::Async`](https://docs.rs/smol/latest/smol/struct.Async.html) wrapper over [`std::net::TcpStream`](https://doc.rust-lang.org/std/net/struct.TcpStream.html) -pub mod smol; - -#[cfg(feature = "smol")] -#[doc(inline)] -pub use self::smol::Connector as SmolConnector; - -#[cfg(all(feature = "smol", feature = "async-tls"))] -#[doc(inline)] -pub use self::smol::ConnectorTls as SmolConnectorTls; - -#[cfg(all(feature = "tokio", feature = "tokio-util"))] -/// Connector for using a [`tokio::net::TcpStream`](https://docs.rs/tokio/0.2/tokio/net/struct.TcpStream.html) -pub mod tokio; - -#[cfg(all(feature = "tokio", feature = "tokio-util"))] -#[doc(inline)] -pub use self::tokio::Connector as TokioConnector; - -#[cfg(all( - feature = "tokio", - feature = "tokio-util", - feature = "tokio-rustls", - feature = "webpki-roots" -))] -#[doc(inline)] -pub use self::tokio::ConnectorRustTls as TokioConnectorRustTls; - -#[cfg(all( - feature = "tokio", - feature = "tokio-util", - feature = "tokio-native-tls", - feature = "native-tls" -))] -#[doc(inline)] -pub use self::tokio::ConnectorNativeTls as TokioConnectorNativeTls; - -#[cfg(all( - feature = "tokio", - feature = "tokio-util", - feature = "tokio-openssl", - feature = "openssl" -))] -#[doc(inline)] -pub use self::tokio::ConnectorOpenSsl as TokioConnectorOpenSsl; - -/// The connector trait. This is used to abstract out runtimes. -/// -/// You can implement this on your own type to provide a custom connection behavior. -pub trait Connector: Send + Sync + Clone { - /// Output IO type returned by calling `connect` - /// - /// This type must implement `futures::io::AsyncRead` and `futures::io::AsyncWrite` - type Output: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static; - /// The `connect` method. This should return a boxed future of a `std::io::Result` of the `Output` type. - /// - /// e.g. `Box::pin(async move { std::net::TcpStream::connect("someaddr") })` - fn connect(&mut self) -> crate::BoxedFuture>; -} - -// This is used because smol/async_io uses an indv. SocketAddr for their connect -// instead of the normal ToSocketAddrs trait -// -// thus this will be dead if those features aren't enabled. -#[allow(dead_code)] -async fn try_connect(addrs: &[SocketAddr], connect: F) -> IoResult -where - F: Fn(SocketAddr) -> R + Send, - R: Future> + Send, - T: Send, -{ - let mut last = None; - for addr in addrs { - let fut = connect(*addr); - match fut.await { - Ok(socket) => return Ok(socket), - Err(err) => last.replace(err), - }; - } - - match last { - Some(last) => Err(last), - None => Err(std::io::Error::new( - std::io::ErrorKind::ConnectionRefused, - "cannot connect with any provided address", - )), - } -} - -mod required { - #[cfg(all( - feature = "async-tls", - not(any(feature = "async-io", feature = "async-std", feature = "smol")) - ))] - compile_error! { - "'async-io' or 'async-std' or 'smol' must be enabled when 'async-tls' is enabled" - } - - #[cfg(all(feature = "tokio", not(feature = "tokio-util")))] - compile_error! { - "'tokio-util' must be enabled when 'tokio' is enabled" - } - - #[cfg(all( - feature = "tokio-native-tls", - not(all(feature = "tokio", feature = "tokio-util", feature = "native-tls")) - ))] - compile_error! { - "'tokio', 'tokio-util' and 'native-tls' must be enabled when 'tokio-native-tls' is enabled" - } - - #[cfg(all( - feature = "tokio-rustls", - not(all(feature = "tokio", feature = "tokio-util", feature = "webpki-roots")) - ))] - compile_error! { - "'tokio', 'tokio-util' and 'webpki-roots' must be enabled when 'tokio-rustls' is enabled" - } - - #[cfg(all( - feature = "tokio-openssl", - not(all(feature = "tokio", feature = "tokio-util", feature = "openssl")) - ))] - compile_error! { - "'tokio', 'tokio-util' and 'openssl' must be enabled when 'tokio-openssl' is enabled" - } -} - -#[cfg(test)] -#[allow(dead_code)] -mod testing { - use crate::connector::Connector as ConnectorTrait; - use futures_lite::{AsyncRead, AsyncWrite}; - - pub fn assert_connector() {} - pub fn assert_type_is_read_write() {} - pub fn assert_obj_is_sane(_obj: T) - where - T: ConnectorTrait, - T::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, - for<'a> &'a T::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin, - { - } -} diff --git a/src/connector/async_io/mod.rs b/src/connector/async_io/mod.rs deleted file mode 100644 index bf2d13f..0000000 --- a/src/connector/async_io/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -use crate::connector::try_connect; -use crate::BoxedFuture; - -type TcpStream = async_io::Async; - -mod non_tls; -pub use non_tls::*; - -#[cfg(feature = "async-tls")] -mod tls; - -#[cfg(feature = "async-tls")] -pub use tls::*; diff --git a/src/connector/async_io/non_tls.rs b/src/connector/async_io/non_tls.rs deleted file mode 100644 index f13aa1a..0000000 --- a/src/connector/async_io/non_tls.rs +++ /dev/null @@ -1,38 +0,0 @@ -use super::*; - -/// A `async_io` connector. This does not use TLS -#[derive(Debug, Clone, PartialEq)] -pub struct Connector { - addrs: Vec, -} - -impl Connector { - connector_ctor!(non_tls: - /// [`async_io`](https://docs.rs/async-io/latest/async_io/) - ); -} - -impl crate::connector::Connector for Connector { - type Output = TcpStream; - - fn connect(&mut self) -> BoxedFuture> { - let addrs = self.addrs.clone(); - let fut = async move { try_connect(&*addrs, TcpStream::connect).await }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(Connector::twitch().unwrap()); - } -} diff --git a/src/connector/async_io/tls.rs b/src/connector/async_io/tls.rs deleted file mode 100644 index 56e7435..0000000 --- a/src/connector/async_io/tls.rs +++ /dev/null @@ -1,46 +0,0 @@ -use super::*; -use std::io::Result; - -/// A `async_io` connector that uses `async-tls` (a `rustls` wrapper). This uses TLS. -#[derive(Debug, Clone, PartialEq)] -pub struct ConnectorTls { - addrs: Vec, - tls_domain: String, -} - -impl ConnectorTls { - connector_ctor!(tls: - /// [`async_io`](https://docs.rs/async-io/latest/async_io/) - ); -} - -impl crate::connector::Connector for ConnectorTls { - type Output = async_dup::Mutex>; - - fn connect(&mut self) -> BoxedFuture> { - let this = self.clone(); - let fut = async move { - let stream = try_connect(&*this.addrs, TcpStream::connect).await?; - async_tls::TlsConnector::new() - .connect(this.tls_domain, stream) - .await - .map(async_dup::Mutex::new) - }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(ConnectorTls::twitch().unwrap()); - } -} diff --git a/src/connector/async_std/mod.rs b/src/connector/async_std/mod.rs deleted file mode 100644 index d21eb3a..0000000 --- a/src/connector/async_std/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::BoxedFuture; - -mod non_tls; -pub use non_tls::*; - -#[cfg(feature = "async-tls")] -mod tls; - -#[cfg(feature = "async-tls")] -pub use tls::*; diff --git a/src/connector/async_std/non_tls.rs b/src/connector/async_std/non_tls.rs deleted file mode 100644 index bfb8d58..0000000 --- a/src/connector/async_std/non_tls.rs +++ /dev/null @@ -1,38 +0,0 @@ -use super::*; - -/// A `async_std` connector. This does not use TLS -#[derive(Debug, Clone, PartialEq)] -pub struct Connector { - addrs: Vec, -} - -impl Connector { - connector_ctor!(non_tls: - /// [`async-std`](https://docs.rs/async-std/latest/async_std/) - ); -} - -impl crate::connector::Connector for Connector { - type Output = async_std::net::TcpStream; - - fn connect(&mut self) -> BoxedFuture> { - let addrs = self.addrs.clone(); - let fut = async move { async_std::net::TcpStream::connect(&*addrs).await }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(Connector::twitch().unwrap()); - } -} diff --git a/src/connector/async_std/tls.rs b/src/connector/async_std/tls.rs deleted file mode 100644 index 70532bb..0000000 --- a/src/connector/async_std/tls.rs +++ /dev/null @@ -1,49 +0,0 @@ -use super::*; - -/// A `async_std` connector that uses `async-tls` (a `rustls` wrapper). This uses TLS. -/// -/// To use this type, ensure you set up the 'TLS Domain' in the configuration. -/// -/// The crate provides the 'TLS domain' for Twitch in the root of this crate. -#[derive(Debug, Clone, PartialEq)] -pub struct ConnectorTls { - addrs: Vec, - tls_domain: String, -} - -impl ConnectorTls { - connector_ctor!(tls: - /// [`async-std`](https://docs.rs/async-std/latest/async_std/) - ); -} - -impl crate::connector::Connector for ConnectorTls { - type Output = async_dup::Mutex>; - - fn connect(&mut self) -> BoxedFuture> { - let this = self.clone(); - let fut = async move { - let stream = async_std::net::TcpStream::connect(&*this.addrs).await?; - async_tls::TlsConnector::new() - .connect(this.tls_domain, stream) - .await - .map(async_dup::Mutex::new) - }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(ConnectorTls::twitch().unwrap()); - } -} diff --git a/src/connector/smol/mod.rs b/src/connector/smol/mod.rs deleted file mode 100644 index c0e5ddd..0000000 --- a/src/connector/smol/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -use crate::connector::try_connect; -use crate::BoxedFuture; - -type TcpStream = smol::Async; - -mod non_tls; -pub use non_tls::*; - -#[cfg(feature = "async-tls")] -mod tls; - -#[cfg(feature = "async-tls")] -pub use tls::*; diff --git a/src/connector/smol/non_tls.rs b/src/connector/smol/non_tls.rs deleted file mode 100644 index 9b66047..0000000 --- a/src/connector/smol/non_tls.rs +++ /dev/null @@ -1,38 +0,0 @@ -use super::*; - -/// A `smol` connector. This does not use TLS -#[derive(Debug, Clone, PartialEq)] -pub struct Connector { - addrs: Vec, -} - -impl Connector { - connector_ctor!(non_tls: - /// [`smol`](https://docs.rs/smol/latest/smol/) - ); -} - -impl crate::connector::Connector for Connector { - type Output = TcpStream; - - fn connect(&mut self) -> BoxedFuture> { - let addrs = self.addrs.clone(); - let fut = async move { try_connect(&*addrs, TcpStream::connect).await }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(Connector::twitch().unwrap()); - } -} diff --git a/src/connector/smol/tls.rs b/src/connector/smol/tls.rs deleted file mode 100644 index bb3271e..0000000 --- a/src/connector/smol/tls.rs +++ /dev/null @@ -1,48 +0,0 @@ -use super::*; - -/// A `smol` connector that uses `async-tls` (a `rustls` wrapper). This uses TLS. -/// -/// To use this type, ensure you set up the 'TLS Domain' in the -/// configuration. The crate provides the 'TLS domain' for Twitch in the root of this crate. -#[derive(Debug, Clone, PartialEq)] -pub struct ConnectorTls { - addrs: Vec, - tls_domain: String, -} - -impl ConnectorTls { - connector_ctor!(tls: - /// [`smol`](https://docs.rs/smol/latest/smol/) - ); -} - -impl crate::connector::Connector for ConnectorTls { - type Output = async_dup::Mutex>; - - fn connect(&mut self) -> BoxedFuture> { - let this = self.clone(); - let fut = async move { - let stream = try_connect(&*this.addrs, TcpStream::connect).await?; - async_tls::TlsConnector::new() - .connect(this.tls_domain, stream) - .await - .map(async_dup::Mutex::new) - }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(ConnectorTls::twitch().unwrap()); - } -} diff --git a/src/connector/tokio/mod.rs b/src/connector/tokio/mod.rs deleted file mode 100644 index f805974..0000000 --- a/src/connector/tokio/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::BoxedFuture; - -mod non_tls; -pub use non_tls::*; - -#[cfg(all(feature = "tokio-native-tls", feature = "native-tls"))] -mod native_tls; - -#[cfg(all(feature = "tokio-native-tls", feature = "native-tls"))] -pub use self::native_tls::*; - -#[cfg(all(feature = "tokio-rustls", feature = "webpki-roots"))] -mod rustls; - -#[cfg(all(feature = "tokio-rustls", feature = "webpki-roots"))] -pub use rustls::*; - -#[cfg(all(feature = "tokio-openssl", feature = "openssl"))] -mod openssl; - -#[cfg(all(feature = "tokio-openssl", feature = "openssl"))] -pub use self::openssl::*; diff --git a/src/connector/tokio/native_tls.rs b/src/connector/tokio/native_tls.rs deleted file mode 100644 index aa3e86a..0000000 --- a/src/connector/tokio/native_tls.rs +++ /dev/null @@ -1,61 +0,0 @@ -use super::*; - -/// A `tokio` connector that uses `tokio-native-tls` (a `native-tls` wrapper). This uses TLS. -/// -/// To use this type, ensure you set up the 'TLS Domain' in the configuration. -/// -/// The crate provides the 'TLS domain' for Twitch in the root of this crate. -#[derive(Debug, Clone, PartialEq)] -pub struct ConnectorNativeTls { - addrs: Vec, - tls_domain: String, -} - -impl ConnectorNativeTls { - connector_ctor!(tls: - /// [`tokio`](https://docs.rs/tokio/0.2/tokio/) (using [`tokio-native-tls`](https://docs.rs/tokio-native-tls/latest/tokio_native_tls/)) - ); -} - -type CloneStream = async_dup::Mutex>; -type Stream = tokio_native_tls::TlsStream; - -impl crate::connector::Connector for ConnectorNativeTls { - type Output = CloneStream; - - fn connect(&mut self) -> BoxedFuture> { - let this = self.clone(); - - let fut = async move { - use tokio_util::compat::Tokio02AsyncReadCompatExt as _; - - let connector: tokio_native_tls::TlsConnector = ::native_tls::TlsConnector::new() - .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))? - .into(); - - let stream = tokio::net::TcpStream::connect(&*this.addrs).await?; - let stream = connector - .connect(&this.tls_domain, stream) - .await - .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - - Ok(async_dup::Mutex::new(stream.compat())) - }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(ConnectorNativeTls::twitch().unwrap()); - } -} diff --git a/src/connector/tokio/non_tls.rs b/src/connector/tokio/non_tls.rs deleted file mode 100644 index d59e885..0000000 --- a/src/connector/tokio/non_tls.rs +++ /dev/null @@ -1,42 +0,0 @@ -use super::*; - -/// A `tokio` connector. This does not use TLS -#[derive(Debug, Clone, PartialEq)] -pub struct Connector { - addrs: Vec, -} - -impl Connector { - connector_ctor!(non_tls: - /// [`tokio`](https://docs.rs/tokio/0.2/tokio/) - ); -} - -impl crate::connector::Connector for Connector { - type Output = async_dup::Mutex>; - - fn connect(&mut self) -> BoxedFuture> { - let addrs = self.addrs.clone(); - let fut = async move { - use tokio_util::compat::Tokio02AsyncReadCompatExt as _; - let stream = tokio::net::TcpStream::connect(&*addrs).await?; - Ok(async_dup::Mutex::new(stream.compat())) - }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(Connector::twitch().unwrap()); - } -} diff --git a/src/connector/tokio/openssl.rs b/src/connector/tokio/openssl.rs deleted file mode 100644 index ce090ab..0000000 --- a/src/connector/tokio/openssl.rs +++ /dev/null @@ -1,62 +0,0 @@ -use super::*; - -use std::io::{Error, ErrorKind}; - -/// A `tokio` connector that uses `tokio-openssl` (an `openssl` wrapper). This uses TLS. -/// -/// To use this type, ensure you set up the 'TLS Domain' in the configuration. -/// -/// The crate provides the 'TLS domain' for Twitch in the root of this crate. -#[derive(Debug, Clone, PartialEq)] -pub struct ConnectorOpenSsl { - addrs: Vec, - tls_domain: String, -} - -impl ConnectorOpenSsl { - connector_ctor!(tls: - /// [`tokio`](https://docs.rs/tokio/0.2/tokio/) (using [`tokio-openssl`](https://docs.rs/tokio_openssl/latest/tokio_openssl/)) - ); -} - -type CloneStream = async_dup::Mutex>; -type Stream = tokio_openssl::SslStream; - -impl crate::connector::Connector for ConnectorOpenSsl { - type Output = CloneStream; - - fn connect(&mut self) -> BoxedFuture> { - let this = self.clone(); - - let fut = async move { - use tokio_util::compat::Tokio02AsyncReadCompatExt as _; - - let config = ::openssl::ssl::SslConnector::builder(::openssl::ssl::SslMethod::tls()) - .and_then(|c| c.build().configure()) - .map_err(|err| Error::new(ErrorKind::Other, err))?; - - let stream = tokio::net::TcpStream::connect(&*this.addrs).await?; - let stream = tokio_openssl::connect(config, &this.tls_domain, stream) - .await - .map_err(|err| Error::new(ErrorKind::Other, err))?; - - Ok(async_dup::Mutex::new(stream.compat())) - }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(ConnectorOpenSsl::twitch().unwrap()); - } -} diff --git a/src/connector/tokio/rustls.rs b/src/connector/tokio/rustls.rs deleted file mode 100644 index 85fb7ff..0000000 --- a/src/connector/tokio/rustls.rs +++ /dev/null @@ -1,61 +0,0 @@ -use super::*; - -/// A `tokio` connector that uses `tokio-rustls` (a `rustls` wrapper). This uses TLS. -/// -/// To use this type, ensure you set up the 'TLS Domain' in the configuration. -/// -/// The crate provides the 'TLS domain' for Twitch in the root of this crate. -#[derive(Debug, Clone, PartialEq)] -pub struct ConnectorRustTls { - addrs: Vec, - tls_domain: String, -} - -impl ConnectorRustTls { - connector_ctor!(tls: - /// [`tokio`](https://docs.rs/tokio/0.2/tokio/) (using [`tokio-rustls`](https://docs.rs/tokio-rustls/latest/tokio_rustls/)) - ); -} - -impl crate::connector::Connector for ConnectorRustTls { - type Output = async_dup::Mutex< - tokio_util::compat::Compat>, - >; - - fn connect(&mut self) -> BoxedFuture> { - let this = self.clone(); - let fut = async move { - use tokio_util::compat::Tokio02AsyncReadCompatExt as _; - let domain = tokio_rustls::webpki::DNSNameRef::try_from_ascii_str(&this.tls_domain) - .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - - let connector: tokio_rustls::TlsConnector = std::sync::Arc::new({ - let mut c = tokio_rustls::rustls::ClientConfig::new(); - c.root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - c - }) - .into(); - - let stream = tokio::net::TcpStream::connect(&*this.addrs).await?; - let stream = connector.connect(domain, stream).await?; - Ok(async_dup::Mutex::new(stream.compat())) - }; - Box::pin(fut) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn assert_connector_trait_is_fulfilled() { - use crate::connector::testing::*; - use crate::connector::Connector as C; - - assert_connector::(); - assert_type_is_read_write::<::Output>(); - assert_obj_is_sane(ConnectorRustTls::twitch().unwrap()); - } -} diff --git a/src/lib.rs b/src/lib.rs index be46bea..ff5bac2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,8 +66,8 @@ For just encoding messages: macro_rules! cfg_async { ($($item:item)*) => { $( - #[cfg(feature = "async")] - #[cfg_attr(docsrs, doc(cfg(feature = "async")))] + // #[cfg(feature = "async")] + // #[cfg_attr(docsrs, doc(cfg(feature = "async")))] $item )* }; @@ -112,7 +112,6 @@ cfg_async! { pub type Writer = crate::writer::AsyncWriter; } -cfg_async! { pub mod connector; } cfg_async! { pub mod writer; } cfg_async! { pub mod channel; } diff --git a/src/runner/async_runner.rs b/src/runner/async_runner.rs index 311e4a0..10af540 100644 --- a/src/runner/async_runner.rs +++ b/src/runner/async_runner.rs @@ -2,7 +2,6 @@ cfg_async! { use crate::{ channel::Receiver, commands, - connector::Connector, encoder::AsyncEncoder, messages::{Capability, Commands, MessageId}, rate_limit::{RateClass, RateLimit}, From 640e0b09bfc6240b9066e9825093e8c81e2e68ab Mon Sep 17 00:00:00 2001 From: museun Date: Sun, 6 Dec 2020 01:25:35 -0500 Subject: [PATCH 2/7] upgrade deps --- Cargo.toml | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a7b7daf..d366de5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,37 +34,37 @@ async = [ [dependencies] # logging support -log = { version = "0.4", optional = true, features = ["std"] } +log = { version = "0.4.11", optional = true, features = ["std"] } # just the futures traits -futures-lite = { version = "1.8", optional = true } +futures-lite = { version = "1.11.2", optional = true } # field pin projection -pin-project-lite = { version = "0.1", optional = true } +pin-project-lite = { version = "0.2.0", optional = true } # cloneable async writes -async-dup = { version = "1.2", optional = true } +async-dup = { version = "1.2.2", optional = true } # message passing -async-channel = { version = "1.4", optional = true } +async-channel = { version = "1.5.1", optional = true } # for timing out futures -futures-timer = { version = "3.0", optional = true } +futures-timer = { version = "3.0.2", optional = true } # for 'fairness' in the main loop -fastrand = { version = "1.3", optional = true } +fastrand = { version = "1.4.0", optional = true } # for optional serialization and deserialization -serde = { version = "1.0", features = ["derive"], optional = true } +serde = { version = "1.0.118", features = ["derive"], optional = true } # for some test utilities -async-mutex = { version = "1.4", optional = true } +async-mutex = { version = "1.4.0", optional = true } [dev-dependencies] -anyhow = "1.0" -async-executor = { version = "1.3", default-features = false } -serde_json = "1.0" -rmp-serde = "0.14" +anyhow = "1.0.35" +async-executor = { version = "1.4.0", default-features = false } +serde_json = "1.0.60" +rmp-serde = "0.14.4" # [[example]] # name = "message_parse" From 0ed0609f103dca4116b66f2662ec8484ad2ff67e Mon Sep 17 00:00:00 2001 From: museun Date: Sun, 6 Dec 2020 01:27:13 -0500 Subject: [PATCH 3/7] remove test utilities --- .gitignore | 2 +- Cargo.toml | 7 - src/irc/tag_indices.rs | 6 +- src/lib.rs | 3 - src/test/conn.rs | 157 --------------------- src/test/mod.rs | 13 -- src/test/str.rs | 2 - src/test/tags_builder.rs | 285 --------------------------------------- 8 files changed, 4 insertions(+), 471 deletions(-) delete mode 100644 src/test/conn.rs delete mode 100644 src/test/mod.rs delete mode 100644 src/test/str.rs delete mode 100644 src/test/tags_builder.rs diff --git a/.gitignore b/.gitignore index a1cf6a8..7a4e4d6 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,4 @@ Cargo.lock *.log *.html - +.trash* diff --git a/Cargo.toml b/Cargo.toml index d366de5..4661813 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,6 @@ all-features = true [features] default = ["async"] -testing = [ - "async", - "async-mutex", -] async = [ "async-channel", @@ -57,9 +53,6 @@ fastrand = { version = "1.4.0", optional = true } # for optional serialization and deserialization serde = { version = "1.0.118", features = ["derive"], optional = true } -# for some test utilities -async-mutex = { version = "1.4.0", optional = true } - [dev-dependencies] anyhow = "1.0.35" async-executor = { version = "1.4.0", default-features = false } diff --git a/src/irc/tag_indices.rs b/src/irc/tag_indices.rs index f832b28..28a807c 100644 --- a/src/irc/tag_indices.rs +++ b/src/irc/tag_indices.rs @@ -22,7 +22,7 @@ impl std::fmt::Debug for TagIndices { impl TagIndices { /// Build indices from this tags fragment /// - /// The fragment should be in the form of `'@k1=v2;k2=v2'` + /// The fragment should be in the form of `'@k1=v2;k2=v2'` pub fn build_indices(input: &str) -> Result { if !input.starts_with('@') { return Ok(Self::default()); @@ -60,12 +60,12 @@ impl TagIndices { // NOTE: this isn't public because they don't verify 'data' is the same as the built-indices data pub(crate) fn get_unescaped<'a>(&'a self, key: &str) -> Option> { - self.get(key).map(crate::test::unescape_str) + self.get(key).map(crate::irc::tags::unescape_str) } // NOTE: this isn't public because they don't verify 'data' is the same as the built-indices data pub(crate) fn get<'a>(&'a self, key: &str) -> Option<&'a str> { - let key = crate::test::escape_str(key); + let key = crate::irc::tags::escape_str(key); self.map .iter() .find_map(|(k, v)| if &key == k { Some(&**v) } else { None }) diff --git a/src/lib.rs b/src/lib.rs index ff5bac2..2ad0c03 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,9 +127,6 @@ pub mod messages; pub mod irc; pub use irc::{IrcMessage, MessageError}; -/// Helpful testing utilities -pub mod test; - #[doc(inline)] pub use irc::{FromIrcMessage, IntoIrcMessage}; diff --git a/src/test/conn.rs b/src/test/conn.rs deleted file mode 100644 index 80a4dd0..0000000 --- a/src/test/conn.rs +++ /dev/null @@ -1,157 +0,0 @@ -use std::{ - future::Future, - io::{Error, ErrorKind, Result}, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use async_mutex::Mutex; -use futures_lite::io::*; - -use crate::connector::Connector; - -/// A test connection that you can use to insert into and read messages from. -#[derive(Default, Debug, Clone)] -pub struct TestConn { - read: Arc>>>, - write: Arc>>>, -} - -fn take_cursor(cursor: &mut Cursor) -> T { - let out = std::mem::take(cursor.get_mut()); - cursor.set_position(0); - out -} - -impl TestConn { - /// Create a new TestConn - pub fn new() -> Self { - Self::default() - } - - /// Reset the instance and returning a clone - pub fn reset(&self) -> Self { - futures_lite::future::block_on(async { - take_cursor(&mut *self.read.lock().await); - take_cursor(&mut *self.write.lock().await); - }); - - self.clone() - } - - /// Write `data` to the underlying buffers. - /// - /// Whatever uses `AsyncRead` on this type will read from this buffer - pub async fn write_data(&self, data: impl AsRef<[u8]>) { - let mut read = self.read.lock().await; - let p = read.position(); - read.write_all(data.as_ref()).await.unwrap(); - read.set_position(p); - } - - /// Read all of the lines written via `AsyncWrite` - pub async fn read_all_lines(&self) -> Result> { - let data = take_cursor(&mut *self.write.lock().await); - Ok(String::from_utf8(data) - .map_err(|err| Error::new(ErrorKind::Other, err))? - .lines() - .map(|s| format!("{}\r\n", s)) - .collect()) - } - - /// Read the first line written via an `AsyncWrite` - pub async fn read_line(&self) -> Result { - let mut write = self.write.lock().await; - - write.set_position(0); - let mut line = Vec::new(); - let mut buf = [0_u8; 1]; // speed doesn't matter. - - while !line.ends_with(b"\r\n") { - write.read_exact(&mut buf).await?; - line.extend_from_slice(&buf); - } - - String::from_utf8(line).map_err(|err| Error::new(ErrorKind::Other, err)) - } -} - -macro_rules! impls { - ($($ty:ty)*) => { - $( - impl AsyncRead for $ty { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let this = self.get_mut(); - - let fut = this.read.lock(); - futures_lite::pin!(fut); - - let mut guard = futures_lite::ready!(fut.poll(cx)); - let guard = &mut *guard; - futures_lite::pin!(guard); - guard.poll_read(cx, buf) - } - } - - impl AsyncWrite for $ty { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let this = self.get_mut(); - - let fut = this.write.lock(); - futures_lite::pin!(fut); - - let mut guard = futures_lite::ready!(fut.poll(cx)); - guard.get_mut().extend_from_slice(buf); - - let fut = guard.seek(std::io::SeekFrom::Current(buf.len() as _)); - futures_lite::pin!(fut); - if let Err(err) = futures_lite::ready!(fut.poll(cx)) { - return Poll::Ready(Err(err)) - } - - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - } - )* - }; -} - -impls! { - &TestConn - TestConn -} - -/// A [Connector] that uses the [TestConn] -/// -/// Generally you'll pre-fill the 'read' buffers via -/// [connector.conn.write_data()](TestConn::write_data()) and then clone the [TestConnector] and give a -/// copy to the [AsyncRunner](crate::AsyncRunner) -/// -/// Once the [AsyncRunner](crate::AsyncRunner) has written to the [TestConn]. You can read what was written via the [TestConn::read_all_lines] method. -#[derive(Default, Debug, Clone)] -pub struct TestConnector { - /// The [TestConn]. You can read/write to this while the [AsyncRunner](crate::AsyncRunner) has the connector - pub conn: TestConn, -} - -impl Connector for TestConnector { - type Output = TestConn; - - fn connect(&mut self) -> crate::BoxedFuture> { - let conn = self.conn.clone(); - Box::pin(async move { Ok(conn) }) - } -} diff --git a/src/test/mod.rs b/src/test/mod.rs deleted file mode 100644 index cf2dae4..0000000 --- a/src/test/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod str; -pub use self::str::*; - -mod tags_builder; -pub use tags_builder::{BuilderError, TagsBuilder, UserTags}; - -#[cfg(feature = "testing")] -#[cfg_attr(docsrs, doc(cfg(feature = "testing")))] -mod conn; - -#[cfg(feature = "testing")] -#[cfg_attr(docsrs, doc(cfg(feature = "testing")))] -pub use conn::{TestConn, TestConnector}; diff --git a/src/test/str.rs b/src/test/str.rs deleted file mode 100644 index 48934b2..0000000 --- a/src/test/str.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[doc(inline)] -pub use crate::irc::tags::{escape_str, unescape_str}; diff --git a/src/test/tags_builder.rs b/src/test/tags_builder.rs deleted file mode 100644 index bd93159..0000000 --- a/src/test/tags_builder.rs +++ /dev/null @@ -1,285 +0,0 @@ -use std::borrow::Cow; -use std::collections::HashMap; - -use crate::irc::{TagIndices, Tags}; -use crate::MaybeOwned; - -#[derive(Debug)] -#[non_exhaustive] -#[allow(missing_copy_implementations)] -/// An error returned by the Tags builder -pub enum BuilderError { - /// An empty key was provided - EmptyKey, - /// An empty set of tags was provided - EmptyTags, -} - -impl std::fmt::Display for BuilderError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::EmptyKey => f.write_str("an empty key was provided"), - Self::EmptyTags => f.write_str("an empty set of tags was provided"), - } - } -} - -impl std::error::Error for BuilderError {} - -/// A builder for Tags -- this is useful for testing -/// -/// ```rust -/// # use twitchchat::irc::{Tags, TagIndices}; -/// # use twitchchat::twitch::{color::RGB, Color}; -/// use twitchchat::test::TagsBuilder; -/// -/// // create a builder -/// let user_tags = TagsBuilder::new() -/// // and add some key-values -/// .add("color", "#F0F0F0") -/// .add("display-name", "some-fancy-name") -/// // it'll escape both keys and values -/// .add("my-message", "my\nmessage\nspans\nmultiple\nlines") -/// // and return a type you can keep around -/// .build() -/// // or an error if you provided empty keys / or no keys -/// .unwrap(); -/// -/// // get the 'normal' tags from this type -/// let tags = user_tags.as_tags(); -/// -/// let color = tags.get_parsed::<_, Color>("color").unwrap(); -/// assert_eq!(color.rgb, RGB(0xF0, 0xF0, 0xF0)); -/// -/// assert_eq!(tags.get("display-name").unwrap(), "some-fancy-name"); -/// -/// // if the value was escaped, get will returned the escaped string -/// assert_eq!(tags.get("my-message").unwrap(), r"my\nmessage\nspans\nmultiple\nlines"); -/// -/// // you can get the unescaped value with `get_unescaped` -/// assert_eq!(tags.get_unescaped("my-message").unwrap(), "my\nmessage\nspans\nmultiple\nlines"); -/// ``` -#[derive(Default, Debug, Clone)] -pub struct TagsBuilder<'a> { - // the spec says 'last' key wins, and the order is irrelevant. - // so lets just use a hashmap - tags: HashMap, Cow<'a, str>>, -} - -impl<'a> TagsBuilder<'a> { - /// Create a new TagsBuilder - pub fn new() -> Self { - Self::default() - } - - /// Add this `key` with this `value` to the builder - /// - /// # NOTE - /// `value` can be empty. - /// `key` will replace any previous keys - /// - pub fn add(mut self, key: K, value: V) -> Self - where - K: Into>, - V: Into>, - { - self.tags.insert(key.into(), value.into()); - self - } - - /// Merge these pre-parsed tags into this collection - /// - /// # NOTE - /// This'll override any previously set keys. - pub fn merge(mut self, tags: &Tags<'_>) -> Self { - self.tags.extend( - tags.iter() - .map(|(k, v)| (Cow::Owned(k.to_owned()), Cow::Owned(v.to_owned()))), - ); - self - } - - /// Build the tags reference string and its indices. - /// - /// # Errors - /// If any empty keys were found, or no keys at all then an error will be returned. - pub fn build(self) -> Result { - use std::fmt::Write as _; - if self.tags.is_empty() { - return Err(BuilderError::EmptyTags); - } - - let mut buf = String::from("@"); - - for (i, (k, v)) in self.tags.iter().enumerate() { - if k.is_empty() { - return Err(BuilderError::EmptyKey); - } - - if i > 0 { - buf.push(';') - } - - write!( - &mut buf, - "{key}={val}", - key = super::escape_str(k), - val = super::escape_str(v) - ) - .expect("memory for string allocation"); - } - - let indices = TagIndices::build_indices(&buf).map_err(|err| match err { - crate::MessageError::MissingTagKey(_) => BuilderError::EmptyKey, - crate::MessageError::MissingTagValue(_) => BuilderError::EmptyTags, - _ => unreachable!(), - })?; - - Ok(UserTags { - data: buf.into(), - indices, - }) - } -} - -/// Tags built by the user -#[derive(Clone, Debug, PartialEq)] -pub struct UserTags { - /// The rendered string - /// - /// This is in the form of '@key=val;key=val' without the trailing space, but with the leading '@' - pub data: MaybeOwned<'static>, - /// Indices of tags in the string - pub indices: TagIndices, -} - -impl UserTags { - /// Get these tags as the 'normal' tags type - pub fn as_tags(&self) -> Tags<'_> { - Tags::from_data_indices(&self.data, &self.indices) - } -} - -#[cfg(test)] -mod tests { - use std::collections::BTreeMap; - use {super::*, crate::irc::tags}; - - #[test] - fn escape() { - let tests = &[ - // chars - (";", r"\:"), - (" ", r"\s"), - (r"\", r"\\"), - (r"\n", r"\\n"), - (r"\r", r"\\r"), - // strings - (r"\\", r"\\\\"), - ("hello;", r"hello\:"), - (" hello world", r"\shello\sworld"), - (r"\something", r"\\something"), - (r"the_end\n", r"the_end\\n"), - (r"the_win_end\r", r"the_win_end\\r"), - ]; - for (input, expected) in tests { - assert_eq!(tags::escape_str(*input), *expected) - } - - let tests = &["dont_escape+me", "foo=1234"]; - for test in tests { - assert_eq!(tags::escape_str(test), *test) - } - } - - #[test] - fn tags_builder() { - let mut builder = TagsBuilder::new(); - - let tests = &[("hello", "world"), ("and", "another thing"), ("len", "42")]; - for (k, v) in tests { - builder = builder.add(*k, *v); - } - - let user_tags = builder.build().unwrap(); - let tags = user_tags.as_tags(); - - for (k, v) in tests { - assert_eq!(tags.get_unescaped(k).unwrap(), *v) - } - - assert_eq!(tags.get_parsed::<_, i32>("len").unwrap(), 42); - - assert!(matches!( - TagsBuilder::new().build().unwrap_err(), - BuilderError::EmptyTags - )); - - let user_tags = TagsBuilder::new().add("empty", "").build().unwrap(); - let tags = user_tags.as_tags(); - assert_eq!(tags.get_unescaped("empty").unwrap(), ""); - - for escaped in &[" ", r"\n", r"\r", r"\", r"hello;"] { - let user_tags = TagsBuilder::new().add(*escaped, "").build().unwrap(); - let tags = user_tags.as_tags(); - assert_eq!(tags.get_unescaped(*escaped).unwrap(), ""); - } - - assert!(matches!( - TagsBuilder::new().add("", "hello").build().unwrap_err(), - BuilderError::EmptyKey - )); - } - - #[test] - fn merge() { - use crate::FromIrcMessage as _; - - let msg = "@badge-info=;badges=broadcaster/1;color=#FF69B4;display-name=museun;emote-only=1;emotes=25:0-4,6-10/81274:12-17;flags=;id=4e160a53-5482-4764-ba28-f224cd59a51f;mod=0;room-id=23196011;subscriber=0;tmi-sent-ts=1601079032426;turbo=0;user-id=23196011;user-type= :museun!museun@museun.tmi.twitch.tv PRIVMSG #museun :Kappa Kappa VoHiYo\r\n"; - let msg = crate::IrcMessage::parse(crate::MaybeOwned::Borrowed(msg)).unwrap(); - let pm = crate::messages::Privmsg::from_irc(msg).unwrap(); - let tags = pm.tags(); - - { - let user_tags = TagsBuilder::new().merge(&tags).build().unwrap(); - let new_tags = user_tags.as_tags(); - - // this ensures they are sorted the same for the tests - let old = tags.iter().collect::>(); - let new = new_tags.iter().collect::>(); - assert_eq!(old, new) - } - - // merging overrides previously set tags - { - let user_tags = TagsBuilder::new() - .add("color", "#FF00FF") - .merge(&tags) - .build() - .unwrap(); - let tags = user_tags.as_tags(); - assert_eq!(tags.get_unescaped("color").unwrap(), "#FF69B4"); - } - - // adding overrides previously set tags - { - let user_tags = TagsBuilder::new() - .merge(&tags) - .add("color", "#FF00FF") - .build() - .unwrap(); - let tags = user_tags.as_tags(); - assert_eq!(tags.get_unescaped("color").unwrap(), "#FF00FF"); - } - - { - let user_tags = TagsBuilder::new() - .add("color", "#FF00FF") - .add("color", "#FF0000") - .build() - .unwrap(); - let tags = user_tags.as_tags(); - assert_eq!(tags.get_unescaped("color").unwrap(), "#FF0000"); - } - } -} From 0c99f60dffae71ead01187483e48caa6d2cef62c Mon Sep 17 00:00:00 2001 From: museun Date: Sun, 6 Dec 2020 01:30:27 -0500 Subject: [PATCH 4/7] remove async_runner --- src/lib.rs | 1 - src/runner/async_runner.rs | 680 ------------------------------------- src/runner/mod.rs | 5 - 3 files changed, 686 deletions(-) delete mode 100644 src/runner/async_runner.rs diff --git a/src/lib.rs b/src/lib.rs index 2ad0c03..1a08a97 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,7 +117,6 @@ cfg_async! { pub mod channel; } pub mod runner; pub use runner::{Error as RunnerError, Status}; -cfg_async! { pub use runner::AsyncRunner; } pub mod rate_limit; diff --git a/src/runner/async_runner.rs b/src/runner/async_runner.rs deleted file mode 100644 index 10af540..0000000 --- a/src/runner/async_runner.rs +++ /dev/null @@ -1,680 +0,0 @@ -cfg_async! { -use crate::{ - channel::Receiver, - commands, - encoder::AsyncEncoder, - messages::{Capability, Commands, MessageId}, - rate_limit::{RateClass, RateLimit}, - twitch::UserConfig, - util::{Notify, NotifyHandle}, - writer::{AsyncWriter, MpscWriter}, - AsyncDecoder, DecodeError, Encodable, FromIrcMessage, IrcMessage, -}; - -use super::{ - channel::Channels, - timeout::{TimeoutState, RATE_LIMIT_WINDOW, TIMEOUT, WINDOW}, - Capabilities, Channel, Error, Identity, Status, StepResult, -}; - -use futures_lite::{AsyncRead, AsyncWrite, AsyncWriteExt, Stream}; -use std::{ - collections::{HashSet, VecDeque}, - pin::Pin, - task::{Context, Poll}, -}; - -/// An asynchronous runner -pub struct AsyncRunner { - /// You identity that Twitch gives when you connected - pub identity: Identity, - - channels: Channels, - - activity_rx: Receiver<()>, - writer_rx: Receiver>, - - notify: Notify, - // why don't we use this? - notify_handle: NotifyHandle, - - timeout_state: TimeoutState, - - decoder: AsyncDecoder>, - encoder: AsyncEncoder>, - - writer: AsyncWriter, - global_rate_limit: RateLimit, - - missed_messages: VecDeque>, -} - -impl std::fmt::Debug for AsyncRunner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("AsyncRunner { .. }").finish() - } -} - -impl AsyncRunner { - /// Connect with the provided connector and the provided UserConfig - /// - /// This returns the Runner with your identity set. - pub async fn connect(connector: C, user_config: &UserConfig) -> Result - where - C: Connector, - for<'a> &'a C::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin, - { - log::debug!("connecting"); - let mut stream = { connector }.connect().await?; - log::debug!("connection established"); - - log::debug!("registering"); - let mut buf = vec![]; - commands::register(user_config).encode(&mut buf)?; - stream.write_all(&buf).await?; - log::debug!("registered"); - - let read = async_dup::Arc::new(stream); - let write = read.clone(); - - let read: Box = Box::new(read); - let write: Box = Box::new(write); - - let mut decoder = AsyncDecoder::new(read); - let mut encoder = AsyncEncoder::new(write); - - log::debug!("waiting for the connection to be ready"); - let mut missed_messages = VecDeque::new(); - let identity = Self::wait_for_ready( - &mut decoder, - &mut encoder, - user_config, - &mut missed_messages, - ) - .await?; - log::debug!("connection is ready: {:?}", identity); - - let (writer_tx, writer_rx) = crate::channel::unbounded(); - let (notify, notify_handle) = Notify::new(); - let (activity_tx, activity_rx) = crate::channel::bounded(32); - - let writer = AsyncWriter::new(MpscWriter::new(writer_tx), activity_tx); - - let timeout_state = TimeoutState::Start; - let channels = Channels::default(); - - let global_rate_limit = RateLimit::from_class(RateClass::Regular); - - Ok(Self { - identity, - channels, - - activity_rx, - writer_rx, - - notify, - notify_handle, - - timeout_state, - - decoder, - encoder, - - writer, - global_rate_limit, - - missed_messages, - }) - } - - /// Check whether you're on this channel - pub fn is_on_channel(&self, channel: &str) -> bool { - self.channels.is_on(channel) - } - - /// Get a specific channel. - /// - /// This is useful for changing the rate limit/state manually. - pub fn get_channel_mut(&mut self, channel: &str) -> Option<&mut Channel> { - self.channels.get_mut(channel) - } - - /// Get a clonable writer you can use - pub fn writer(&self) -> AsyncWriter { - self.writer.clone() - } - - /// Get a handle that you can trigger a normal 'quit'. - /// - /// You can also do `AsyncWriter::quit`. - pub fn quit_handle(&self) -> NotifyHandle { - self.notify_handle.clone() - } - - /// Join `channel` and wait for it to complete - pub async fn join(&mut self, channel: &str) -> Result<(), Error> { - if self.is_on_channel(channel) { - return Err(Error::AlreadyOnChannel { - channel: channel.to_string(), - }); - } - - log::debug!("joining '{}'", channel); - self.encoder.encode(commands::join(channel)).await?; - - let channel = crate::commands::Channel::new(channel).to_string(); - log::debug!("waiting for a response"); - - let mut queue = VecDeque::new(); - - let status = self - .wait_for(&mut queue, |msg, this| match msg { - // check to see if it was us that joined the channel - Commands::Join(msg) => { - Ok(msg.channel() == channel && msg.name() == this.identity.username()) - } - - // check to see if we were banned - Commands::Notice(msg) if matches!(msg.msg_id(), Some(MessageId::MsgBanned)) => { - Err(Error::BannedFromChannel { - channel: msg.channel().to_string(), - }) - } - - _ => Ok(false), - }) - .await?; - - if let Some(status) = status { - match status { - Status::Quit | Status::Eof => return Err(Error::UnexpectedEof), - _ => unimplemented!(), - } - } - - self.missed_messages.extend(queue); - - log::debug!("joined '{}'", channel); - - Ok(()) - } - - /// Part `channel` and wait for it to complete - pub async fn part(&mut self, channel: &str) -> Result<(), Error> { - if !self.is_on_channel(channel) { - return Err(Error::NotOnChannel { - channel: channel.to_string(), - }); - } - - log::debug!("leaving '{}'", channel); - self.encoder.encode(commands::part(channel)).await?; - - let channel = crate::commands::Channel::new(channel).to_string(); - log::debug!("waiting for a response"); - - let mut queue = VecDeque::new(); - - let status = self - .wait_for(&mut queue, |msg, this| match msg { - // check to see if it was us that left the channel - Commands::Part(msg) => { - Ok(msg.channel() == channel && msg.name() == this.identity.username()) - } - _ => Ok(false), - }) - .await?; - - if let Some(status) = status { - match status { - Status::Quit | Status::Eof => return Err(Error::UnexpectedEof), - _ => unimplemented!(), - } - } - log::debug!("left '{}'", channel); - - self.missed_messages.extend(queue); - - Ok(()) - } - - /// Get the next message. You'll usually want to call this in a loop - pub async fn next_message(&mut self) -> Result, Error> { - use crate::util::{Either::*, FutExt as _}; - - loop { - match self.step().await? { - StepResult::Nothing => continue, - StepResult::Status(Status::Quit) => { - if let Left(_notified) = self.notify.wait().now_or_never().await { - // close everything - self.writer_rx.close(); - self.activity_rx.close(); - - // and then drain any remaining items - while self.available_queued_messages() > 0 { - self.drain_queued_messages().await?; - futures_lite::future::yield_now().await; - } - - // and finally send the quit - self.encoder.encode(commands::raw("QUIT\r\n")).await?; - - // and signal that we've quit - break Ok(Status::Quit); - } - } - StepResult::Status(status) => break Ok(status), - } - } - } - - /// Single step the loop. This is useful for testing. - pub async fn step(&mut self) -> Result, Error> { - use crate::util::*; - use crate::IntoOwned as _; - - if let Some(msg) = self.missed_messages.pop_front() { - return Ok(StepResult::Status(Status::Message(msg))); - } - - let select = self - .decoder - .read_message() - .either(self.activity_rx.recv()) - .either(self.writer_rx.recv()) - .either(self.notify.wait()) - .either(super::timeout::next_delay()) - .await; - - match select { - Left(Left(Left(Left(msg)))) => { - let msg = match msg { - Err(DecodeError::Eof) => { - log::info!("got an EOF, exiting main loop"); - return Ok(StepResult::Status(Status::Eof)); - } - Err(err) => { - log::warn!("read an error: {}", err); - return Err(err.into()); - } - Ok(msg) => msg, - }; - - self.timeout_state = TimeoutState::activity(); - - let all = Commands::from_irc(msg) // - .expect("msg identity conversion should be upheld") - .into_owned(); - - self.check_messages(&all).await?; - - return Ok(StepResult::Status(Status::Message(all))); - } - - Left(Left(Left(Right(Some(_activity))))) => { - self.timeout_state = TimeoutState::activity(); - } - - Left(Left(Right(Some(write_data)))) => { - // TODO provide a 'bytes' flavored parser - let msg = std::str::from_utf8(&*write_data).map_err(Error::InvalidUtf8)?; - let res = crate::irc::parse_one(msg) // - .expect("encoder should produce valid IRC messages"); - let msg = res.1; - - if let crate::irc::IrcMessage::PRIVMSG = msg.get_command() { - if let Some(ch) = msg.nth_arg(0) { - if !self.channels.is_on(ch) { - self.channels.add(ch) - } - - let ch = self.channels.get_mut(ch).unwrap(); - if ch.rated_limited_at.map(|s| s.elapsed()) > Some(RATE_LIMIT_WINDOW) { - ch.reset_rate_limit(); - } - - ch.rate_limited.enqueue(write_data) - } - } - } - - Left(Right(_notified)) => return Ok(StepResult::Status(Status::Quit)), - - Right(_timeout) => { - log::info!("idle connection detected, sending a ping"); - let ts = timestamp().to_string(); - self.encoder.encode(commands::ping(&ts)).await?; - self.timeout_state = TimeoutState::waiting_for_pong(); - } - - _ => { - return Ok(StepResult::Status(Status::Eof)); - } - } - - match self.timeout_state { - TimeoutState::WaitingForPong(dt) => { - if dt.elapsed() > TIMEOUT { - log::warn!("PING timeout detected, exiting"); - return Err(Error::TimedOut); - } - } - TimeoutState::Activity(dt) => { - if dt.elapsed() > WINDOW { - log::warn!("idle connectiond detected, sending a PING"); - let ts = timestamp().to_string(); - self.encoder.encode(crate::commands::ping(&ts)).await?; - self.timeout_state = TimeoutState::waiting_for_pong(); - } - } - TimeoutState::Start => {} - } - - log::trace!("draining messages"); - self.drain_queued_messages().await?; - - Ok(StepResult::Nothing) - } - - async fn check_messages(&mut self, all: &Commands<'static>) -> Result<(), Error> { - use {Commands::*, TimeoutState::*}; - - log::trace!("< {}", all.raw().escape_debug()); - - match &all { - Ping(msg) => { - let token = msg.token(); - log::debug!( - "got a ping from the server. responding with token '{}'", - token - ); - self.encoder.encode(commands::pong(token)).await?; - self.timeout_state = TimeoutState::activity(); - } - - Pong(..) if matches!(self.timeout_state, WaitingForPong {..}) => { - self.timeout_state = TimeoutState::activity() - } - - Join(msg) if msg.name() == self.identity.username() => { - log::debug!("starting tracking channel for '{}'", msg.channel()); - self.channels.add(msg.channel()); - } - - Part(msg) if msg.name() == self.identity.username() => { - log::debug!("stopping tracking of channel '{}'", msg.channel()); - self.channels.remove(msg.channel()); - } - - RoomState(msg) => { - if let Some(dur) = msg.is_slow_mode() { - if let Some(ch) = self.channels.get_mut(msg.channel()) { - ch.enable_slow_mode(dur) - } - } - } - - Notice(msg) => { - let ch = self.channels.get_mut(msg.channel()); - match (msg.msg_id(), ch) { - // we should enable slow mode - (Some(MessageId::SlowOn), Some(ch)) => ch.enable_slow_mode(30), - // we should disable slow mode - (Some(MessageId::SlowOff), Some(ch)) => ch.disable_slow_mode(), - // we've been rate limited on the channel - (Some(MessageId::MsgRatelimit), Some(ch)) => ch.set_rate_limited(), - // we cannot join/send to the channel because we're banned - (Some(MessageId::MsgBanned), ..) => self.channels.remove(msg.channel()), - _ => {} - } - } - - Reconnect(_) => return Err(Error::ShouldReconnect), - - _ => {} - } - - Ok(()) - } -} - -impl AsyncRunner { - async fn wait_for( - &mut self, - missed: &mut VecDeque>, - func: F, - ) -> Result>, Error> - where - F: Fn(&Commands<'static>, &Self) -> Result + Send + Sync, - { - loop { - match self.step().await? { - StepResult::Status(Status::Message(msg)) => { - if func(&msg, self)? { - break Ok(None); - } - missed.push_back(msg); - } - StepResult::Status(d) => return Ok(Some(d)), - StepResult::Nothing => continue, - } - } - } - - fn available_queued_messages(&self) -> usize { - self.channels - .map - .values() - .map(|s| s.rate_limited.queue.len()) - .sum() - } - - async fn drain_queued_messages(&mut self) -> std::io::Result<()> { - let enc = &mut self.encoder; - let limit = &mut self.global_rate_limit.get_available_tokens(); - - let start = *limit; - - // for each channel, try to take up to 'limit' tokens - for channel in self.channels.map.values_mut() { - if channel.rated_limited_at.map(|s| s.elapsed()) > Some(RATE_LIMIT_WINDOW) { - channel.reset_rate_limit(); - } - - // drain until we're out of messages, or tokens - channel - .rate_limited - .drain_until_blocked(&channel.name, limit, enc) - .await?; - - let left = std::cmp::max(start, *limit); - let right = std::cmp::min(start, *limit); - - let diff = left - right; - - if *limit == 0 { - log::warn!(target: "twitchchat::rate_limit", "global rate limit hit while draining '{}'", &channel.name); - break; - } - - // and throttle the global one - match self.global_rate_limit.consume(diff) { - // use the new remaining amount of tokens - Ok(rem) => *limit = rem, - - // we're globally rate limited, so just return - Err(..) => { - log::warn!(target: "twitchchat::rate_limit", "global rate limit hit while draining '{}'", &channel.name); - break; - } - } - } - - Ok(()) - } - - async fn wait_for_ready( - decoder: &mut AsyncDecoder, - encoder: &mut AsyncEncoder, - user_config: &UserConfig, - missed_messages: &mut VecDeque>, - ) -> Result - where - R: AsyncRead + Send + Sync + Unpin, - W: AsyncWrite + Send + Sync + Unpin, - { - use crate::IntoOwned as _; - - let is_anonymous = user_config.is_anonymous(); - - let mut looking_for: HashSet<_> = user_config.capabilities.iter().collect(); - let mut caps = Capabilities::default(); - let mut our_name = None; - - use crate::twitch::Capability as TwitchCap; - // Twitch says we'll be getting a GlobalUserState if we just send the - // Tags capability - // - // This is false. Twitch will only send GlobalUserState if we've sent - // the Commands capability and atleast 1 other capability. - // - // That other capability doesn't have to be Tags, interestingly enough. - // So a combination of both 'Commands' and 'Membership' will produce an - // empty GlobalUserState - // - // We'll check for both Tags and Commands - // - let will_be_getting_global_user_state_hopefully = - user_config.capabilities.contains(&TwitchCap::Tags) && - user_config.capabilities.contains(&TwitchCap::Commands); - - let identity = loop { - let msg: IrcMessage<'_> = decoder.read_message().await?; - - // this should always be infallible. its not marked infallible - // because of the 'non-exhaustive' attribute - use Commands::*; - let commands = Commands::from_irc(msg)?; - - // this is the simpliest way. and this'll only clone like 9 messages - missed_messages.push_back(commands.clone().into_owned()); - - match commands { - Ready(msg) => { - our_name.replace(msg.username().to_string()); - - // if we aren't going to be receiving tags, then we - // won't be looking for any more messages - - // if we're anonymous, we won't get GLOBALUSERSTATE even - // if we do send Tags - if is_anonymous { - break Identity::Anonymous { caps }; - } - - // if we're not looking for any more caps and we won't be - // getting a GlobalUserState just give them the basic - // Identity - if looking_for.is_empty() && !will_be_getting_global_user_state_hopefully { - break Identity::Basic { - name: our_name.take().unwrap(), - caps, - }; - } - } - - Cap(msg) => match msg.capability() { - Capability::Acknowledged(name) => { - use crate::twitch::Capability as Cap; - - let cap = match Cap::maybe_from_str(name) { - Some(cap) => cap, - // Twitch sent us an unknown capability - None => { - caps.unknown.insert(name.to_string()); - continue; - } - }; - - *match cap { - Cap::Tags => &mut caps.tags, - Cap::Membership => &mut caps.membership, - Cap::Commands => &mut caps.commands, - } = true; - - looking_for.remove(&cap); - } - - Capability::NotAcknowledged(name) => { - return Err(Error::InvalidCap { - cap: name.to_string(), - }) - } - }, - - // NOTE: This will only be sent when there's both Commands and atleast one other CAP requested - GlobalUserState(msg) => { - // TODO: this is so shitty. - let id = match msg.user_id { - Some(id) => id.parse().unwrap(), - // XXX: we can get this message without any tags - None => { - break Identity::Basic { - name: our_name.take().unwrap(), - caps, - }; - } - }; - - break Identity::Full { - // these unwraps should be safe because we'll have all of the TAGs here - name: our_name.unwrap(), - user_id: id, - display_name: msg.display_name.map(|s| s.to_string()), - color: msg.color, - caps, - }; - - } - - // Reply to any PINGs while waiting. Although Twitch doesn't - // currently send a PING for spoof detection on initial - // handshake, one day they may. Most IRC servers do this - // already - Ping(msg) => encoder.encode(commands::pong(msg.token())).await?, - - _ => { - // we have our name, but we won't be getting GlobalUserState and we've got all of our Caps - if our_name.is_some() && !will_be_getting_global_user_state_hopefully && looking_for.is_empty() { - break Identity::Basic { - name: our_name.take().unwrap(), - caps, - }; - } - } - }; - }; - - Ok(identity) - } -} - -impl Stream for AsyncRunner { - type Item = Commands<'static>; - - fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - use std::future::Future; - let fut = self.get_mut().next_message(); - futures_lite::pin!(fut); - - match futures_lite::ready!(fut.poll(ctx)) { - Ok(status) => match status { - Status::Message(msg) => Poll::Ready(Some(msg)), - Status::Quit | Status::Eof => Poll::Ready(None), - }, - Err(..) => Poll::Ready(None), - } - } -} -} diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 3fb631e..3133c81 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -34,11 +34,6 @@ cfg_async! { pub use channel::Channel; } -cfg_async! { - mod async_runner; - pub use async_runner::AsyncRunner; -} - cfg_async! { #[doc(inline)] pub use crate::util::NotifyHandle; From 373d5a4994a06f8a2a6918a558cf14c82dbecbe7 Mon Sep 17 00:00:00 2001 From: museun Date: Sun, 6 Dec 2020 01:30:42 -0500 Subject: [PATCH 5/7] remove examples --- examples/async_io_demo.rs | 85 -------------------- examples/async_std_demo.rs | 84 ------------------- examples/include/mod.rs | 97 ---------------------- examples/message_parse.rs | 161 ------------------------------------- examples/simple_bot.rs | 148 ---------------------------------- examples/smol_demo.rs | 84 ------------------- examples/tokio_demo.rs | 83 ------------------- 7 files changed, 742 deletions(-) delete mode 100644 examples/async_io_demo.rs delete mode 100644 examples/async_std_demo.rs delete mode 100644 examples/include/mod.rs delete mode 100644 examples/message_parse.rs delete mode 100644 examples/simple_bot.rs delete mode 100644 examples/smol_demo.rs delete mode 100644 examples/tokio_demo.rs diff --git a/examples/async_io_demo.rs b/examples/async_io_demo.rs deleted file mode 100644 index a0d99b2..0000000 --- a/examples/async_io_demo.rs +++ /dev/null @@ -1,85 +0,0 @@ -// NOTE: this demo requires `--feature async-io`. -use twitchchat::{commands, connector, runner::AsyncRunner, UserConfig}; - -// this is a helper module to reduce code deduplication -mod include; -use crate::include::{channels_to_join, get_user_config, main_loop}; - -async fn connect(user_config: &UserConfig, channels: &[String]) -> anyhow::Result { - // create a connector using ``async_io``, this connects to Twitch. - // you can provide a different address with `custom` - // this can fail if DNS resolution cannot happen - let connector = connector::async_io::Connector::twitch()?; - - println!("we're connecting!"); - // create a new runner. this is a provided async 'main loop' - // this method will block until you're ready - let mut runner = AsyncRunner::connect(connector, user_config).await?; - println!("..and we're connected"); - - // and the identity Twitch gave you - println!("our identity: {:#?}", runner.identity); - - for channel in channels { - // the runner itself has 'blocking' join/part to ensure you join/leave a channel. - // these two methods return whether the connection was closed early. - // we'll ignore it for this demo - println!("attempting to join '{}'", channel); - let _ = runner.join(channel).await?; - println!("joined '{}'!", channel); - } - - Ok(runner) -} - -fn main() -> anyhow::Result<()> { - // create a user configuration - let user_config = get_user_config()?; - // get some channels to join from the environment - let channels = channels_to_join()?; - - // any executor would work, we'll use async_executor so can spawn tasks - let executor = async_executor::Executor::new(); - futures_lite::future::block_on(executor.run(async { - // connect and join the provided channels - let runner = connect(&user_config, &channels).await?; - - // you can get a handle to shutdown the runner - let quit_handle = runner.quit_handle(); - - // you can get a clonable writer - let mut writer = runner.writer(); - - // spawn something off in the background that'll exit in 10 seconds - executor - .spawn({ - let mut writer = writer.clone(); - let channels = channels.clone(); - async move { - println!("in 10 seconds we'll exit"); - async_io::Timer::after(std::time::Duration::from_secs(10)).await; - - // send one final message to all channels - for channel in channels { - let cmd = commands::privmsg(&channel, "goodbye, world"); - writer.encode(cmd).await.unwrap(); - } - - println!("sending quit signal"); - quit_handle.notify().await; - } - }) - .detach(); - - // you can encode all sorts of 'commands' - for channel in &channels { - writer - .encode(commands::privmsg(channel, "hello world!")) - .await?; - } - - println!("starting main loop"); - // your 'main loop'. you'll just call next_message() until you're done - main_loop(runner).await - })) -} diff --git a/examples/async_std_demo.rs b/examples/async_std_demo.rs deleted file mode 100644 index 662fd72..0000000 --- a/examples/async_std_demo.rs +++ /dev/null @@ -1,84 +0,0 @@ -// NOTE: this demo requires `--features="async-std async-std/attributes"`. -use twitchchat::{ - commands, connector, messages, - runner::{AsyncRunner, Status}, - UserConfig, -}; - -// this is a helper module to reduce code deduplication -mod include; -use crate::include::{channels_to_join, get_user_config, main_loop}; - -async fn connect(user_config: &UserConfig, channels: &[String]) -> anyhow::Result { - // create a connector using ``async_std``, this connects to Twitch. - // you can provide a different address with `custom` - let connector = connector::async_std::Connector::twitch()?; - - println!("we're connecting!"); - // create a new runner. this is a provided async 'main loop' - // this method will block until you're ready - let mut runner = AsyncRunner::connect(connector, user_config).await?; - println!("..and we're connected"); - - // and the identity Twitch gave you - println!("our identity: {:#?}", runner.identity); - - for channel in channels { - // the runner itself has 'blocking' join/part to ensure you join/leave a channel. - // these two methods return whether the connection was closed early. - // we'll ignore it for this demo - println!("attempting to join '{}'", channel); - let _ = runner.join(&channel).await?; - println!("joined '{}'!", channel); - } - - Ok(runner) -} - -#[async_std::main] -async fn main() -> anyhow::Result<()> { - // create a user configuration - let user_config = get_user_config()?; - // get some channels to join from the environment - let channels = channels_to_join()?; - - // connect and join the provided channels - let runner = connect(&user_config, &channels).await?; - - // you can get a handle to shutdown the runner - let quit_handle = runner.quit_handle(); - - // you can get a clonable writer - let mut writer = runner.writer(); - - // spawn something off in the background that'll exit in 10 seconds - async_std::task::spawn({ - let mut writer = writer.clone(); - let channels = channels.clone(); - async move { - println!("in 10 seconds we'll exit"); - async_std::task::sleep(std::time::Duration::from_secs(10)).await; - - // send one final message to all channels - for channel in channels { - let cmd = commands::privmsg(&channel, "goodbye, world"); - writer.encode(cmd).await.unwrap(); - } - - println!("sending quit signal"); - assert!(quit_handle.notify().await); - } - }); - - // you can encode all sorts of 'commands' - - for channel in &channels { - writer - .encode(commands::privmsg(channel, "hello world!")) - .await?; - } - - println!("starting main loop"); - // your 'main loop'. you'll just call next_message() until you're done - main_loop(runner).await -} diff --git a/examples/include/mod.rs b/examples/include/mod.rs deleted file mode 100644 index 8925582..0000000 --- a/examples/include/mod.rs +++ /dev/null @@ -1,97 +0,0 @@ -#![allow(dead_code)] -use anyhow::Context as _; -use twitchchat::{messages, AsyncRunner, Status, UserConfig}; - -// some helpers for the demo -fn get_env_var(key: &str) -> anyhow::Result { - std::env::var(key).with_context(|| format!("please set `{}`", key)) -} - -pub fn get_user_config() -> anyhow::Result { - let name = get_env_var("TWITCH_NAME")?; - let token = get_env_var("TWITCH_TOKEN")?; - - // you need a `UserConfig` to connect to Twitch - let config = UserConfig::builder() - // the name of the associated twitch account - .name(name) - // and the provided OAuth token - .token(token) - // and enable all of the advanced message signaling from Twitch - .enable_all_capabilities() - .build()?; - - Ok(config) -} - -// channels can be either in the form of '#museun' or 'museun'. the crate will internally add the missing # -pub fn channels_to_join() -> anyhow::Result> { - let channels = get_env_var("TWITCH_CHANNEL")? - .split(',') - .map(ToString::to_string) - .collect(); - Ok(channels) -} - -// a 'main loop' -pub async fn main_loop(mut runner: AsyncRunner) -> anyhow::Result<()> { - loop { - match runner.next_message().await? { - // this is the parsed message -- across all channels (and notifications from Twitch) - Status::Message(msg) => { - handle_message(msg).await; - } - - // you signaled a quit - Status::Quit => { - println!("we signaled we wanted to quit"); - break; - } - // the connection closed normally - Status::Eof => { - println!("we got a 'normal' eof"); - break; - } - } - } - - Ok(()) -} - -// you can generally ignore the lifetime for these types. -async fn handle_message(msg: messages::Commands<'_>) { - use messages::Commands::*; - // All sorts of messages - match msg { - // This is the one users send to channels - Privmsg(msg) => println!("[{}] {}: {}", msg.channel(), msg.name(), msg.data()), - - // This one is special, if twitch adds any new message - // types, this will catch it until future releases of - // this crate add them. - Raw(_) => {} - - // These happen when you initially connect - IrcReady(_) => {} - Ready(_) => {} - Cap(_) => {} - - // and a bunch of other messages you may be interested in - ClearChat(_) => {} - ClearMsg(_) => {} - GlobalUserState(_) => {} - HostTarget(_) => {} - Join(_) => {} - Notice(_) => {} - Part(_) => {} - Ping(_) => {} - Pong(_) => {} - Reconnect(_) => {} - RoomState(_) => {} - UserNotice(_) => {} - UserState(_) => {} - Whisper(_) => {} - - _ => {} - } -} diff --git a/examples/message_parse.rs b/examples/message_parse.rs deleted file mode 100644 index 2c6917c..0000000 --- a/examples/message_parse.rs +++ /dev/null @@ -1,161 +0,0 @@ -use twitchchat::{ - messages, - // for `from_irc()` - FromIrcMessage as _, - // for into_owned() - IntoOwned as _, -}; - -fn main() { - // show off the low-level parser - parse_demo(); - - // this provides a 'reader'/'iterator' instead of just a boring parser - decoder_demo(); - - // and block on a future for the async decoder - futures_lite::future::block_on(decoder_async_demo()) -} - -fn parse_demo() { - let input = - "@key1=val1;key2=true;key3=42 :sender!sender@server PRIVMSG #museun :this is a test\r\n"; - - // you can get an iterator of messages that borrow from the input string - for msg in twitchchat::irc::parse(input) { - let msg: twitchchat::IrcMessage<'_> = msg.unwrap(); - // you can get the raw string back - assert_eq!(msg.get_raw(), input); - - // you can parse it into a specific type. e.g. a PRIVMSG. - // this continues to borrow from the original string slice - let pm = messages::Privmsg::from_irc(msg).unwrap(); - assert_eq!(pm.channel(), "#museun"); - - // you can consume the parsed message to get the raw string back out. - // this gives you a MaybeOwned<'a> because the type can be converted to an owned state (e.g. static); - let msg: twitchchat::maybe_owned::MaybeOwned<'_> = pm.into_inner(); - - // `MaybeOwned<'a>` can be used as a `&'a str`. - let msg = twitchchat::irc::parse(&*msg) - .next() - .map(|s| s.unwrap()) - .unwrap(); - - // parse it as an Commands, which wraps all of the provided messages - let all = messages::Commands::from_irc(msg).unwrap(); - assert!(matches!(all, messages::Commands::Privmsg{..})); - - // this is still borrowing from the 'input' from above. - let all: messages::Commands<'_> = all; - - // to turn it into an 'owned' version (e.g. a 'static lifetime) - let all = all.into_owned(); - let _all: messages::Commands<'static> = all; - } - - // double the string for the test - let old_len = input.len(); - let input = input.repeat(2); - - // you can also parse a 'single' message in a streaming fashion - // this returns a pos > 0 if the index of the start of the next possible message - let (pos, msg_a) = twitchchat::irc::parse_one(&input).unwrap(); - assert_eq!(pos, old_len); - - // and parse the rest of the message - // this returns a pos if 0 if this was the last message - let (pos, msg_b) = twitchchat::irc::parse_one(&input[pos..]).unwrap(); - assert_eq!(pos, 0); - - // and it should've parsed the same message twice - assert_eq!(msg_a, msg_b); - - // and you can get the a tags 'view' from the message, if any tags were provided - let msg = messages::Privmsg::from_irc(msg_a).unwrap(); - // you can get the string value for a key - assert_eq!(msg.tags().get("key1").unwrap(), "val1"); - // or it as a 'truthy' value - assert_eq!(msg.tags().get_as_bool("key2"), true); - // or as a FromStr parsed value - assert_eq!(msg.tags().get_parsed::<_, i32>("key3").unwrap(), 42); - - // you can convert a parsed message into an Commands easily by using From/Into; - let all: messages::Commands<'_> = msg_b.into(); - assert!(matches!(all, messages::Commands::Raw{..})); -} - -fn decoder_demo() { - let input = - "@key1=val1;key2=true;key3=42 :sender!sender@server PRIVMSG #museun :this is a test\r\n"; - - let source = input.repeat(5); - // Cursor> impl std::io::Read. using it for this demo - let reader = std::io::Cursor::new(source.into_bytes()); - - // you can make a decoder over an std::io::Read - let mut decoder = twitchchat::Decoder::new(reader); - - // you use use read_message than the 'msg' is borrowed until the next call of 'read_message' - while let Ok(_msg) = decoder.read_message() { - // msg is borrowed from the decoder here - } - - // you can get the inner reader out - let mut reader = decoder.into_inner(); - // seek back to the beginning for this demo - reader.set_position(0); - - { - // you can also just give it a &mut Reader - let _decoder = twitchchat::Decoder::new(&mut reader); - // which will drop the decoder here and you'll still have the 'reader' from above - } - - // the decoder is also an iterator. - // when using the iterator you'll get an 'owned' message back. - for msg in twitchchat::Decoder::new(&mut reader) { - // and msg is owned here ('static) - // error if it failed to parse, or an IO error. - let _msg: messages::IrcMessage<'static> = msg.unwrap(); - } -} - -// all of the Sync is also applicable to the Async version. -async fn decoder_async_demo() { - use futures_lite::StreamExt as _; // for 'next' on the Stream - - let input = - "@key1=val1;key2=true;key3=42 :sender!sender@server PRIVMSG #museun :this is a test\r\n"; - - let source = input.repeat(5); - // Cursor> impl std::io::Read. using it for this demo - let reader = futures_lite::io::Cursor::new(source.into_bytes()); - - // you can make a decoder over an std::io::Read - let mut decoder = twitchchat::AsyncDecoder::new(reader); - - // you use use read_message than the 'msg' is borrowed until the next call of 'read_message' - while let Ok(_msg) = decoder.read_message().await { - // msg is borrowed from the decoder here - } - - // you can get the inner reader out - let mut reader = decoder.into_inner(); - // seek back to the beginning for this demo - reader.set_position(0); - - { - // you can also just give it a &mut Reader - let _decoder = twitchchat::AsyncDecoder::new(&mut reader); - // which will drop the decoder here and you'll still have the 'reader' from above - } - - // the decoder is also an Stream. - // when using the Stream you'll get an 'owned' message back. - while let Some(msg) = twitchchat::AsyncDecoder::new(&mut reader).next().await { - // and msg is owned here ('static) - // error if it failed to parse, or an IO error. - let _msg: messages::IrcMessage<'static> = msg.unwrap(); - } -} diff --git a/examples/simple_bot.rs b/examples/simple_bot.rs deleted file mode 100644 index fa3705a..0000000 --- a/examples/simple_bot.rs +++ /dev/null @@ -1,148 +0,0 @@ -// note this uses `smol`. you can use `tokio` or `async_std` or `async_io` if you prefer. -// this is a helper module to reduce code deduplication -// extensions to the Privmsg type -use twitchchat::PrivmsgExt as _; -use twitchchat::{ - messages::{Commands, Privmsg}, - runner::{AsyncRunner, NotifyHandle, Status}, - UserConfig, -}; - -// this is a helper module to reduce code deduplication -mod include; -use crate::include::{channels_to_join, get_user_config}; - -use std::collections::HashMap; - -fn main() -> anyhow::Result<()> { - // you'll need a user configuration - let user_config = get_user_config()?; - // and some channels to join - let channels = channels_to_join()?; - - let start = std::time::Instant::now(); - - let mut bot = Bot::default() - .with_command("!hello", |args: Args| { - let output = format!("hello {}!", args.msg.name()); - // We can 'reply' to this message using a writer + our output message - args.writer.reply(args.msg, &output).unwrap(); - }) - .with_command("!uptime", move |args: Args| { - let output = format!("its been running for {:.2?}", start.elapsed()); - // We can send a message back (without quoting the sender) using a writer + our output message - args.writer.say(args.msg, &output).unwrap(); - }) - .with_command("!quit", move |args: Args| { - // because we're using sync stuff, turn async into sync with smol! - smol::block_on(async move { - // calling this will cause read_message() to eventually return Status::Quit - args.quit.notify().await - }); - }); - - // run the bot in the executor - smol::block_on(async move { bot.run(&user_config, &channels).await }) -} - -struct Args<'a, 'b: 'a> { - msg: &'a Privmsg<'b>, - writer: &'a mut twitchchat::Writer, - quit: NotifyHandle, -} - -trait Command: Send + Sync { - fn handle(&mut self, args: Args<'_, '_>); -} - -impl Command for F -where - F: Fn(Args<'_, '_>), - F: Send + Sync, -{ - fn handle(&mut self, args: Args<'_, '_>) { - (self)(args) - } -} - -#[derive(Default)] -struct Bot { - commands: HashMap>, -} - -impl Bot { - // add this command to the bot - fn with_command(mut self, name: impl Into, cmd: impl Command + 'static) -> Self { - self.commands.insert(name.into(), Box::new(cmd)); - self - } - - // run the bot until its done - async fn run(&mut self, user_config: &UserConfig, channels: &[String]) -> anyhow::Result<()> { - // this can fail if DNS resolution cannot happen - let connector = twitchchat::connector::smol::Connector::twitch()?; - - let mut runner = AsyncRunner::connect(connector, user_config).await?; - println!("connecting, we are: {}", runner.identity.username()); - - for channel in channels { - println!("joining: {}", channel); - if let Err(err) = runner.join(channel).await { - eprintln!("error while joining '{}': {}", channel, err); - } - } - - // if you store this somewhere, you can quit the bot gracefully - // let quit = runner.quit_handle(); - - println!("starting main loop"); - self.main_loop(&mut runner).await - } - - // the main loop of the bot - async fn main_loop(&mut self, runner: &mut AsyncRunner) -> anyhow::Result<()> { - // this is clonable, but we can just share it via &mut - // this is rate-limited writer - let mut writer = runner.writer(); - // this is clonable, but using it consumes it. - // this is used to 'quit' the main loop - let quit = runner.quit_handle(); - - loop { - // this drives the internal state of the crate - match runner.next_message().await? { - // if we get a Privmsg (you'll get an Commands enum for all messages received) - Status::Message(Commands::Privmsg(pm)) => { - // see if its a command and do stuff with it - if let Some(cmd) = Self::parse_command(pm.data()) { - if let Some(command) = self.commands.get_mut(cmd) { - println!("dispatching to: {}", cmd.escape_debug()); - - let args = Args { - msg: &pm, - writer: &mut writer, - quit: quit.clone(), - }; - - command.handle(args); - } - } - } - // stop if we're stopping - Status::Quit | Status::Eof => break, - // ignore the rest - Status::Message(..) => continue, - } - } - - println!("end of main loop"); - Ok(()) - } - - fn parse_command(input: &str) -> Option<&str> { - if !input.starts_with('!') { - return None; - } - input.splitn(2, ' ').next() - } -} diff --git a/examples/smol_demo.rs b/examples/smol_demo.rs deleted file mode 100644 index bf016fa..0000000 --- a/examples/smol_demo.rs +++ /dev/null @@ -1,84 +0,0 @@ -// NOTE: this demo requires `--feature smol`. -use twitchchat::{commands, connector, runner::AsyncRunner, UserConfig}; - -// this is a helper module to reduce code deduplication -mod include; -use crate::include::{channels_to_join, get_user_config, main_loop}; - -async fn connect(user_config: &UserConfig, channels: &[String]) -> anyhow::Result { - // create a connector using ``smol``, this connects to Twitch. - // you can provide a different address with `custom` - // this can fail if DNS resolution cannot happen - let connector = connector::smol::Connector::twitch()?; - - println!("we're connecting!"); - // create a new runner. this is a provided async 'main loop' - // this method will block until you're ready - let mut runner = AsyncRunner::connect(connector, user_config).await?; - println!("..and we're connected"); - - // and the identity Twitch gave you - println!("our identity: {:#?}", runner.identity); - - for channel in channels { - // the runner itself has 'blocking' join/part to ensure you join/leave a channel. - // these two methods return whether the connection was closed early. - // we'll ignore it for this demo - println!("attempting to join '{}'", channel); - let _ = runner.join(channel).await?; - println!("joined '{}'!", channel); - } - - Ok(runner) -} - -fn main() -> anyhow::Result<()> { - let fut = async move { - // create a user configuration - let user_config = get_user_config()?; - // get some channels to join from the environment - let channels = channels_to_join()?; - - // connect and join the provided channels - let runner = connect(&user_config, &channels).await?; - - // you can get a handle to shutdown the runner - let quit_handle = runner.quit_handle(); - - // you can get a clonable writer - let mut writer = runner.writer(); - - // spawn something off in the background that'll exit in 10 seconds - smol::spawn({ - let mut writer = writer.clone(); - let channels = channels.clone(); - async move { - println!("in 10 seconds we'll exit"); - smol::Timer::after(std::time::Duration::from_secs(10)).await; - - // send one final message to all channels - for channel in channels { - let cmd = commands::privmsg(&channel, "goodbye, world"); - writer.encode(cmd).await.unwrap(); - } - - println!("sending quit signal"); - quit_handle.notify().await; - } - }) - .detach(); - - // you can encode all sorts of 'commands' - for channel in &channels { - writer - .encode(commands::privmsg(channel, "hello world!")) - .await?; - } - - println!("starting main loop"); - // your 'main loop'. you'll just call next_message() until you're done - main_loop(runner).await - }; - - smol::block_on(fut) -} diff --git a/examples/tokio_demo.rs b/examples/tokio_demo.rs deleted file mode 100644 index 26bc7f6..0000000 --- a/examples/tokio_demo.rs +++ /dev/null @@ -1,83 +0,0 @@ -// NOTE: this demo requires `--features="tokio/full tokio-util"`. -use twitchchat::{ - commands, connector, messages, - runner::{AsyncRunner, Status}, - UserConfig, -}; - -// this is a helper module to reduce code deduplication -mod include; -use crate::include::{channels_to_join, get_user_config, main_loop}; - -async fn connect(user_config: &UserConfig, channels: &[String]) -> anyhow::Result { - // create a connector using ``tokio``, this connects to Twitch. - // you can provide a different address with `custom` - let connector = connector::tokio::Connector::twitch()?; - - println!("we're connecting!"); - // create a new runner. this is a provided async 'main loop' - // this method will block until you're ready - let mut runner = AsyncRunner::connect(connector, user_config).await?; - println!("..and we're connected"); - - // and the identity Twitch gave you - println!("our identity: {:#?}", runner.identity); - - for channel in channels { - // the runner itself has 'blocking' join/part to ensure you join/leave a channel. - // these two methods return whether the connection was closed early. - // we'll ignore it for this demo - println!("attempting to join '{}'", channel); - let _ = runner.join(&channel).await?; - println!("joined '{}'!", channel); - } - - Ok(runner) -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - // create a user configuration - let user_config = get_user_config()?; - // get some channels to join from the environment - let channels = channels_to_join()?; - - // connect and join the provided channels - let runner = connect(&user_config, &channels).await?; - - // you can get a handle to shutdown the runner - let quit_handle = runner.quit_handle(); - - // you can get a clonable writer - let mut writer = runner.writer(); - - // spawn something off in the background that'll exit in 10 seconds - tokio::spawn({ - let mut writer = writer.clone(); - let channels = channels.clone(); - async move { - println!("in 10 seconds we'll exit"); - tokio::time::delay_for(std::time::Duration::from_secs(10)).await; - - // send one final message to all channels - for channel in channels { - let cmd = commands::privmsg(&channel, "goodbye, world"); - writer.encode(cmd).await.unwrap(); - } - - println!("sending quit signal"); - quit_handle.notify().await; - } - }); - - // you can encode all sorts of 'commands' - for channel in &channels { - writer - .encode(commands::privmsg(channel, "hello world!")) - .await?; - } - - println!("starting main loop"); - // your 'main loop'. you'll just call next_message() until you're done - main_loop(runner).await -} From 9b374c75d9129debc1682af0e2340bae7a3ac2a4 Mon Sep 17 00:00:00 2001 From: museun Date: Sun, 6 Dec 2020 01:34:45 -0500 Subject: [PATCH 6/7] move macro to macros --- src/lib.rs | 10 ---------- src/macros.rs | 10 ++++++++++ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1a08a97..886de6d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,16 +63,6 @@ For just encoding messages: */ -macro_rules! cfg_async { - ($($item:item)*) => { - $( - // #[cfg(feature = "async")] - // #[cfg_attr(docsrs, doc(cfg(feature = "async")))] - $item - )* - }; -} - /// The Twitch IRC address for non-TLS connections pub const TWITCH_IRC_ADDRESS: &str = "irc.chat.twitch.tv:6667"; diff --git a/src/macros.rs b/src/macros.rs index dac5b7d..1c1181d 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,3 +1,13 @@ +macro_rules! cfg_async { + ($($item:item)*) => { + $( + #[cfg(feature = "async")] + #[cfg_attr(docsrs, doc(cfg(feature = "async")))] + $item + )* + }; +} + macro_rules! into_owned { ($ty:ident { $($field:ident),* $(,)? }) => { impl<'a> $crate::IntoOwned<'a> for $ty<'a> { From 827db672bdc2f0a55a07a7fc0d24e4261870da99 Mon Sep 17 00:00:00 2001 From: museun Date: Sun, 6 Dec 2020 01:36:04 -0500 Subject: [PATCH 7/7] remove rate limiter --- src/lib.rs | 2 - src/rate_limit.rs | 205 --------------------------------------- src/runner/channel.rs | 113 --------------------- src/runner/mod.rs | 9 -- src/runner/rate_limit.rs | 65 ------------- 5 files changed, 394 deletions(-) delete mode 100644 src/rate_limit.rs delete mode 100644 src/runner/channel.rs delete mode 100644 src/runner/rate_limit.rs diff --git a/src/lib.rs b/src/lib.rs index 886de6d..5cb2f7e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -108,8 +108,6 @@ cfg_async! { pub mod channel; } pub mod runner; pub use runner::{Error as RunnerError, Status}; -pub mod rate_limit; - pub mod commands; pub mod messages; diff --git a/src/rate_limit.rs b/src/rate_limit.rs deleted file mode 100644 index a20c3d1..0000000 --- a/src/rate_limit.rs +++ /dev/null @@ -1,205 +0,0 @@ -// TODO actually write tests for this -#![allow(dead_code)] -/*! -A simple leaky-bucket style token-based rate limiter -*/ - -use std::time::{Duration, Instant}; - -/// A preset number of tokens as described by Twitch -#[non_exhaustive] -#[derive(Copy, Clone, Debug)] -pub enum RateClass { - /// `20` per `30` seconds - Regular, - /// `100` per `30` seconds - Moderator, - /// `50` per `30` seconds - Known, - /// `7500` per `30` seconds - Verified, -} - -impl Default for RateClass { - fn default() -> Self { - Self::Regular - } -} - -impl RateClass { - /// Number of tickets available for this class - pub fn tickets(self) -> u64 { - match self { - Self::Regular => 20, - Self::Moderator => 100, - Self::Known => 50, - Self::Verified => 7500, - } - } - - /// Period specified by Twitch - pub const fn period() -> Duration { - Duration::from_secs(30) - } -} - -/// A leaky-bucket style token-based rate limiter -#[derive(Debug, Clone)] -pub struct RateLimit { - cap: u64, - bucket: Bucket, -} - -impl Default for RateLimit { - fn default() -> Self { - Self::from_class(<_>::default()) - } -} - -impl RateLimit { - /// Overwrite the current capacity with this value - pub fn set_cap(&mut self, cap: u64) { - self.cap = cap - } - - /// Overwrite the current period with this value - pub fn set_period(&mut self, period: Duration) { - self.bucket.period = period; - } - - /// Get the current capacity with this value - pub fn get_cap(&self) -> u64 { - self.cap - } - - /// Get the current period with this value - pub fn get_period(&self) -> Duration { - self.bucket.period - } - - /// Create a rate limit from a RateClass - pub fn from_class(rate_class: RateClass) -> Self { - Self::full(rate_class.tickets(), RateClass::period()) - } - - /// Create a new rate limiter of `capacity` with an `initial` number of - /// token and the `period` between refills - pub fn new(cap: u64, initial: u64, period: Duration) -> Self { - Self { - cap, - bucket: Bucket::new(cap, initial, period), - } - } - - /// Create a new rate limiter that is pre-filled - /// - /// `cap` is the number of total tokens available - /// - /// `period` is how long it'll take to refill all of the tokens - pub fn full(cap: u64, period: Duration) -> Self { - Self { - cap, - bucket: Bucket::new(cap, cap, period), - } - } - - /// Create am empty rate limiter - /// - /// `cap` is the number of total tokens available - /// - /// `period` is how long it'll take to refill all of the tokens - /// - /// This will block, at first, atleast one `period` until its filled - pub fn empty(cap: u64, period: Duration) -> Self { - Self { - cap, - bucket: Bucket::new(cap, 0, period), - } - } - - /// Get the current available tokens - pub fn get_available_tokens(&self) -> u64 { - self.bucket.tokens - } - - /// Tries to get the current RateClass. - pub fn get_current_rate_class(&self) -> Option { - const DUR: Duration = Duration::from_secs(30); - - let class = match (self.get_cap(), self.get_period()) { - (20, DUR) => RateClass::Regular, - (50, DUR) => RateClass::Known, - (100, DUR) => RateClass::Moderator, - (7500, DUR) => RateClass::Verified, - _ => return None, - }; - Some(class) - } - - /// Consume a specific ammount of tokens - /// - /// # Returns - /// * Successful consumption (e.g. not blocking) will return how many tokens - /// are left - /// * Failure to consume (e.g. out of tokens) will return a Duration of when - /// the bucket will be refilled - pub fn consume(&mut self, tokens: u64) -> Result { - let Self { bucket, .. } = self; - - let now = Instant::now(); - if let Some(n) = bucket.refill(now) { - bucket.tokens = std::cmp::min(bucket.tokens + n, self.cap); - } - - if tokens <= bucket.tokens { - bucket.tokens -= tokens; - bucket.backoff = 0; - return Ok(bucket.tokens); - } - - let prev = bucket.tokens; - Err(bucket.estimate(tokens - prev, now)) - } -} - -#[derive(Debug, Clone)] -struct Bucket { - tokens: u64, - backoff: u32, - next: Instant, - last: Instant, - quantum: u64, - period: Duration, -} - -impl Bucket { - fn new(tokens: u64, initial: u64, period: Duration) -> Self { - let now = Instant::now(); - Self { - tokens: initial, - backoff: 0, - next: now + period, - last: now, - quantum: tokens, - period, - } - } - - fn refill(&mut self, now: Instant) -> Option { - if now < self.next { - return None; - } - - let last = now.duration_since(self.last); - let periods = last.as_nanos().checked_div(self.period.as_nanos())? as u64; - self.last += self.period * (periods as u32); - self.next = self.last + self.period; - (periods * self.quantum).into() - } - - fn estimate(&mut self, tokens: u64, now: Instant) -> Duration { - let until = self.next.duration_since(now); - let periods = (tokens.checked_add(self.quantum).unwrap() - 1) / self.quantum; - until + self.period * (periods as u32 - 1) - } -} diff --git a/src/runner/channel.rs b/src/runner/channel.rs deleted file mode 100644 index 202be44..0000000 --- a/src/runner/channel.rs +++ /dev/null @@ -1,113 +0,0 @@ -cfg_async! { -use super::rate_limit::{PreviousRate, RateLimitedEncoder}; -use crate::rate_limit::{RateClass, RateLimit}; -use std::{ - collections::{HashMap, VecDeque}, - time::Duration, -}; - -/// A channel that you are on. -/// -/// This is exposed for 'advanced' users who want to modify the rate limiter. -/// -/// # Warning -/// You shouldn't need to touch this unless you have a good reason to do so. -/// -/// Improperly using this could result in Twitch disconnecting you, at best and -/// a ban at worst. -pub struct Channel { - pub(crate) name: String, - pub(crate) rate_limited: RateLimitedEncoder, - pub(crate) previous: Option, - pub(crate) rated_limited_at: Option, -} - -impl std::fmt::Debug for Channel { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Channel").field("name", &self.name).finish() - } -} - -impl Channel { - pub(crate) fn new(name: String) -> Self { - let rate_limit = RateLimit::from_class(RateClass::Regular); - let rate_limited = RateLimitedEncoder { - rate_limit, - queue: VecDeque::new(), - }; - Self { - name, - rate_limited, - previous: None, - rated_limited_at: None, - } - } - - /// Set the [RateClass] for this channel - pub fn set_rate_class(&mut self, rate_class: RateClass) { - self.rate_limited.rate_limit = RateLimit::from_class(rate_class); - self.rated_limited_at.take(); - } - - /// Mark this channel as being under slow mode for `duration` - pub fn enable_slow_mode(&mut self, duration: u64) { - let rate = &mut self.rate_limited.rate_limit; - self.previous.replace(PreviousRate { - cap: rate.get_cap(), - period: rate.get_period(), - }); - - rate.set_period(Duration::from_secs(duration)) - } - - /// Mark this channel as not being in slow mode - pub fn disable_slow_mode(&mut self) { - let PreviousRate { cap, period } = self.previous.take().unwrap_or_default(); - let rate = &mut self.rate_limited.rate_limit; - rate.set_cap(cap); - rate.set_period(period); - } - - /// Mark that you've been rate limited on this channel - pub fn set_rate_limited(&mut self) { - self.rate_limited.rate_limit.set_cap(1); - self.rated_limited_at.replace(std::time::Instant::now()); - } - - /// Reset to the default rate class - pub fn reset_rate_limit(&mut self) { - let PreviousRate { cap, period } = self.previous.take().unwrap_or_default(); - self.rate_limited.rate_limit = RateLimit::full(cap, period); - self.rated_limited_at.take(); - } -} - -#[derive(Debug, Default)] -pub struct Channels { - pub map: HashMap, -} - -impl Channels { - pub fn is_on(&self, name: &str) -> bool { - self.map.contains_key(name) - } - - pub fn get_mut(&mut self, name: &str) -> Option<&mut Channel> { - self.map.get_mut(name) - } - - pub fn add(&mut self, name: &str) { - // we already have this channel (there was a sync issue) - if self.map.contains_key(name) { - return; - } - - let channel = Channel::new(name.to_string()); - self.map.insert(name.to_string(), channel); - } - - pub fn remove(&mut self, name: &str) { - self.map.remove(name); - } -} -} diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 3133c81..61af5d7 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -25,15 +25,6 @@ pub use error::Error; #[allow(dead_code)] mod timeout; -cfg_async! { - mod rate_limit; -} - -cfg_async! { - mod channel; - pub use channel::Channel; -} - cfg_async! { #[doc(inline)] pub use crate::util::NotifyHandle; diff --git a/src/runner/rate_limit.rs b/src/runner/rate_limit.rs deleted file mode 100644 index af39248..0000000 --- a/src/runner/rate_limit.rs +++ /dev/null @@ -1,65 +0,0 @@ -use crate::rate_limit::{RateClass, RateLimit}; -use futures_lite::{AsyncWrite, AsyncWriteExt}; -use std::{collections::VecDeque, time::Duration}; - -pub struct RateLimitedEncoder { - pub(crate) rate_limit: RateLimit, - pub(crate) queue: VecDeque>, -} - -impl RateLimitedEncoder { - pub async fn drain_until_blocked( - &mut self, - name: &str, - limit: &mut u64, - sink: &mut W, - ) -> std::io::Result<()> - where - W: AsyncWrite + Send + Sync + Unpin + ?Sized, - { - while let Some(data) = self.queue.pop_front() { - match self.rate_limit.consume(1) { - Ok(..) => { - *limit = limit.saturating_sub(1); - log::trace!( - target: "twitchchat::encoder", - "> {}", - std::str::from_utf8(&*data).unwrap().escape_debug() - ); - sink.write_all(&*data).await?; - } - Err(..) => { - log::warn!( - target: "twitchchat::rate_limit", - "local rate limit for '{}' hit", - name - ); - break; - } - } - if *limit == 0 { - break; - } - } - - Ok(()) - } - - pub fn enqueue(&mut self, msg: Box<[u8]>) { - self.queue.push_back(msg); - } -} - -pub struct PreviousRate { - pub cap: u64, - pub period: Duration, -} - -impl Default for PreviousRate { - fn default() -> Self { - Self { - cap: RateClass::Regular.tickets(), - period: RateClass::period(), - } - } -}