Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(voyager): refactor message to be generic over contained data #1085

Merged
merged 4 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@
nativeBuildInputs = [ config.treefmt.build.wrapper ]
++ lib.attrsets.attrValues config.treefmt.build.programs;
GOPRIVATE = "github.com/unionlabs/*";

shellHook = ''
alias voy-send-msg='curl localhost:65534/msg -H "content-type: application/json" -d'
'';
};

treefmt =
Expand Down
2 changes: 1 addition & 1 deletion lib/chain-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<T: Clone> Pool<T> {
const RETRY_SECONDS: u64 = 3;

tracing::warn!(
"high traffic in queue of {}, ran out of items! trying again in {RETRY_SECONDS} seconds",
"high traffic in pool of {}, ran out of items! trying again in {RETRY_SECONDS} seconds",
std::any::type_name::<T>()
);

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 9 additions & 15 deletions lib/pg-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,19 @@ impl<T: DeserializeOwned + Serialize + Unpin + Send + Sync> Queue<T> {
.await?;
tx.commit().await?;
}
ProcessFlow::Success(new_msgs) => {
if !new_msgs.is_empty() {
let new_ids = query!(
ProcessFlow::Success(maybe_new_msg) => {
if let Some(new_msg) = maybe_new_msg {
let new_row = query!(
"INSERT INTO queue (item)
SELECT * FROM UNNEST($1::JSONB[])
VALUES ($1::JSONB)
RETURNING id",
&*new_msgs
.into_iter()
.map(|t| serde_json::to_value(t).expect(
"queue message should have infallible serialization"
))
.collect::<Vec<_>>(),
serde_json::to_value(new_msg)
.expect("queue message should have infallible serialization")
)
.fetch_all(tx.as_mut())
.fetch_one(tx.as_mut())
.await?;

for row in new_ids {
tracing::debug!(id = row.id, "inserted new messages");
}
tracing::debug!(id = new_row.id, "inserted new message");
}

tx.commit().await?;
Expand All @@ -164,7 +158,7 @@ impl<T: DeserializeOwned + Serialize + Unpin + Send + Sync> Queue<T> {
}

pub enum ProcessFlow<T> {
Success(Vec<T>),
Success(Option<T>),
Requeue,
Fail(String),
}
6 changes: 3 additions & 3 deletions tools/sqlx-cli/sqlx-cli.nix
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
version = "0.7.1";
nativeBuildInputs = [ pkgs.pkg-config ];

buildPhase = "cargo build --release --locked --offline -p ${name}";
buildPhase = "cargo build --release --locked --offline -p ${name} --bin cargo-sqlx";
installPhase = ''
mkdir -p $out/bin
mv target/release/sqlx $out/bin/sqlx
mv target/release/cargo-sqlx $out/bin/cargo-sqlx
'';

buildInputs = [ rust.toolchains.nightly pkgs.openssl ];

src = srcWithVendoredSources { inherit name; originalSrc = "${sqlx}"; };

meta.mainProgram = "sqlx";
meta.mainProgram = "cargo-sqlx";
};
};
}
3 changes: 1 addition & 2 deletions voyager/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![recursion_limit = "256"]
#![feature(trait_alias, extract_if)]
#![feature(trait_alias)]
// #![warn(clippy::pedantic)]
#![allow(
// required due to return_position_impl_trait_in_trait false positives
Expand Down
Loading