Skip to content

Commit

Permalink
Add ntrip client [main branch] [CPP-952] (#1088)
Browse files Browse the repository at this point in the history
Ports NTRIP client support to the main branch:
#1033
  • Loading branch information
adrian-kong authored Jun 26, 2023
1 parent 5fc0bb4 commit 1aebe7b
Show file tree
Hide file tree
Showing 31 changed files with 1,081 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"console_backend/tests/data/*" filter=lfs diff=lfs merge=lfs -text
installers/Windows/NSIS/*.zip filter=lfs diff=lfs merge=lfs -text
binaries/** filter=lfs diff=lfs merge=lfs -text
9 changes: 8 additions & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,12 @@ args = [
command = "${DIST_PYTHON}"
args = ["-m", "pip", "install", "flit"]

[tasks.check-git-status]
command = "git"
args = ["status"]

[tasks.build-frontend-wheel]
dependencies = ["dist-install-pip-flit"]
dependencies = ["dist-install-pip-flit", "check-git-status"]
command = "${DIST_PYTHON}"
args = ["-m", "flit", "build", "--no-setup-py"]

Expand Down Expand Up @@ -572,6 +576,9 @@ args = ["-m", "pip", "install", "console_backend/dist/${BACKEND_WHEEL}"]
script_runner = "@duckscript"
script = '''
cp src/main/resources py311-dist/
os = os_family
glob_cp binaries/${os}/rtcm3tosbp* py311-dist/binaries/${os}
'''

[tasks.build-dist-freeze]
Expand Down
3 changes: 3 additions & 0 deletions binaries/linux/rtcm3tosbp
Git LFS file not shown
3 changes: 3 additions & 0 deletions binaries/mac/rtcm3tosbp
Git LFS file not shown
3 changes: 3 additions & 0 deletions binaries/windows/rtcm3tosbp.exe
Git LFS file not shown
4 changes: 4 additions & 0 deletions console_backend/src/cli_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ pub struct CliOptions {
#[clap(long)]
pub enable_map: bool,

/// Enable ntrip client
#[clap(long)]
pub enable_ntrip: bool,

/// Path to a yaml file containing known piksi settings.
#[clap(long)]
pub settings_yaml: Option<PathBuf>,
Expand Down
2 changes: 2 additions & 0 deletions console_backend/src/common_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ pub enum Keys {
NOTIFICATION,
#[strum(serialize = "SOLUTION_LINE")]
SOLUTION_LINE,
#[strum(serialize = "NTRIP_DISPLAY")]
NTRIP_DISPLAY,
}

#[derive(Clone, Debug, Display, EnumString, EnumVariantNames, Eq, Hash, PartialEq)]
Expand Down
1 change: 1 addition & 0 deletions console_backend/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ fn conn_manager_thd(
ConnectionState::Connected {
conn: conn.clone(),
stop_token,
msg_sender: msg_sender.clone(),
},
&client_sender,
);
Expand Down
23 changes: 14 additions & 9 deletions console_backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod fft_monitor;
pub mod fileio;
pub mod fusion_status_flags;
pub mod log_panel;
pub mod ntrip_output;
pub mod output;
pub mod piksi_tools_constants;
pub mod process_messages;
Expand All @@ -47,9 +48,9 @@ pub mod updater;
pub mod utils;
pub mod watch;

use crate::client_sender::BoxedClientSender;
use crate::shared_state::SharedState;
use crate::status_bar::StatusBar;
use std::sync::Mutex;

use crate::tabs::{
advanced_tab::{
advanced_imu_tab::AdvancedImuTab, advanced_magnetometer_tab::AdvancedMagnetometerTab,
Expand All @@ -69,6 +70,8 @@ use crate::tabs::{
},
update_tab::UpdateTab,
};
use crate::types::MsgSender;
use std::sync::Mutex;

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
Expand All @@ -89,13 +92,14 @@ struct Tabs {
pub status_bar: Mutex<StatusBar>,
pub update: Mutex<UpdateTab>,
pub settings: Option<SettingsTab>, // settings only enabled on TCP / Serial
pub shared_state: SharedState,
}

impl Tabs {
fn new(
shared_state: shared_state::SharedState,
client_sender: client_sender::BoxedClientSender,
msg_sender: types::MsgSender,
shared_state: SharedState,
client_sender: BoxedClientSender,
msg_sender: MsgSender,
) -> Self {
Self {
main: MainTab::new(shared_state.clone(), client_sender.clone()).into(),
Expand Down Expand Up @@ -139,15 +143,16 @@ impl Tabs {
)
.into(),
status_bar: StatusBar::new(shared_state.clone()).into(),
update: UpdateTab::new(shared_state).into(),
update: UpdateTab::new(shared_state.clone()).into(),
settings: None,
shared_state,
}
}

fn with_settings(
shared_state: shared_state::SharedState,
client_sender: client_sender::BoxedClientSender,
msg_sender: types::MsgSender,
shared_state: SharedState,
client_sender: BoxedClientSender,
msg_sender: MsgSender,
) -> Self {
let mut tabs = Self::new(
shared_state.clone(),
Expand Down
103 changes: 103 additions & 0 deletions console_backend/src/ntrip_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use crate::tabs::advanced_tab::ntrip_tab::OutputType;
use crate::types::Result;
use crate::utils::pythonhome_dir;
use anyhow::Context;
use crossbeam::channel::Receiver;
use log::{error, info};
use std::io::Write;
use std::process::{Command, Stdio};
use std::{io, thread};

pub struct MessageConverter {
in_rx: Receiver<Vec<u8>>,
output_type: OutputType,
}

impl MessageConverter {
pub fn new(in_rx: Receiver<Vec<u8>>, output_type: OutputType) -> Self {
Self { in_rx, output_type }
}

pub fn start<W: Write + Send + 'static>(&mut self, out: W) -> Result<()> {
match self.output_type {
OutputType::RTCM => self.output_rtcm(out),
OutputType::SBP => self.output_sbp(out),
}
}

/// Just redirects directly to writer
fn output_rtcm<W: Write + Send + 'static>(&mut self, mut out: W) -> Result<()> {
let in_rx = self.in_rx.clone();
thread::spawn(move || loop {
if let Ok(data) = in_rx.recv() {
if let Err(e) = out.write(&data) {
error!("failed to write to device {e}");
}
}
});
Ok(())
}

/// Runs rtcm3tosbp converter
fn output_sbp<W: Write + Send + 'static>(&mut self, mut out: W) -> Result<()> {
let mut child = if cfg!(target_os = "windows") {
let mut cmd = Command::new("cmd");
let rtcm = pythonhome_dir()?
.join("binaries")
.join("windows")
.join("rtcm3tosbp.exe")
.to_string_lossy()
.to_string();
info!("running rtcm3tosbp from \"{}\"", rtcm);
cmd.args(["/C", &rtcm]);
cmd
} else if cfg!(target_os = "macos") {
let mut cmd = Command::new("sh");
let rtcm = pythonhome_dir()?
.join("binaries")
.join("mac")
.join("rtcm3tosbp")
.to_string_lossy()
.to_string();
info!("running rtcm3tosbp from \"{}\"", rtcm);
cmd.args(["-c", &rtcm]);
cmd
} else {
let mut cmd = Command::new("sh");
let rtcm = pythonhome_dir()?
.join("binaries")
.join("linux")
.join("rtcm3tosbp")
.to_string_lossy()
.to_string();
info!("running rtcm3tosbp from \"{}\"", rtcm);
cmd.args(["-c", &rtcm]);
cmd
};
let mut child = child
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.context("rtcm converter process failed")?;

let mut child_in = child.stdin.take().context("rtcm3tosbp stdin missing")?;
let mut child_out = child.stdout.take().context("rtcm3tosbp stdout missing")?;
let in_rx = self.in_rx.clone();

thread::spawn(move || {
if let Err(e) = io::copy(&mut child_out, &mut out) {
error!("failed to write to device {e}");
}
});
thread::spawn(move || {
while let Ok(data) = in_rx.recv() {
if let Err(e) = child_in.write_all(&data) {
error!("failed to write to rtcm3tosbp {e}")
}
}
});
thread::spawn(move || child.wait());
Ok(())
}
}
7 changes: 6 additions & 1 deletion console_backend/src/process_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,12 @@ fn register_events(link: sbp::link::Link<Tabs>) {
.lock()
.unwrap()
.handle_pos_llh(msg.clone());
tabs.status_bar.lock().unwrap().handle_pos_llh(msg);
tabs.status_bar.lock().unwrap().handle_pos_llh(msg.clone());

// ntrip tab dynamic position
let mut guard = tabs.shared_state.lock();
let ntrip = &mut guard.ntrip_tab;
ntrip.set_last_data(msg);
});
link.register(|tabs: &Tabs, msg: MsgPosLlhCov| {
tabs.solution_position
Expand Down
51 changes: 50 additions & 1 deletion console_backend/src/server_recv_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use crate::errors::{
};
use crate::log_panel::LogLevel;
use crate::output::CsvLogging;
use crate::shared_state::{AdvancedNetworkingState, SharedState};
use crate::shared_state::{AdvancedNetworkingState, ConnectionState, SharedState};
use crate::tabs::advanced_tab::ntrip_tab::NtripOptions;
use crate::tabs::{
settings_tab::SaveRequest, solution_tab::LatLonUnits, update_tab::UpdateTabUpdate,
};
Expand Down Expand Up @@ -344,6 +345,54 @@ pub fn server_recv_thread(
.expect(CAP_N_PROTO_DESERIALIZATION_FAILURE);
shared_state.switch_tab(curr_tab);
}
m::message::NtripConnect(Ok(cv_in)) => {
let url = cv_in
.get_url()
.expect(CAP_N_PROTO_DESERIALIZATION_FAILURE)
.to_string();
let usr = cv_in
.get_username()
.expect(CAP_N_PROTO_DESERIALIZATION_FAILURE)
.to_string();
let pwd = cv_in
.get_password()
.expect(CAP_N_PROTO_DESERIALIZATION_FAILURE)
.to_string();
let gga_period = cv_in.get_gga_period();
let output_type = cv_in
.get_output_type()
.expect(CAP_N_PROTO_DESERIALIZATION_FAILURE)
.to_string();
let position: Option<(f64, f64, f64)> = match cv_in.get_position().which() {
Ok(m::ntrip_connect::position::Pos(Ok(pos))) => {
Some((pos.get_lat(), pos.get_lon(), pos.get_alt()))
}
Err(e) => {
error!("{}", e);
None
}
_ => None,
};
let mut guard = shared_state.lock();
let heartbeat = guard.heartbeat_data.clone();
match guard.conn.get() {
ConnectionState::Connected { msg_sender, .. } => {
let options = NtripOptions::new(
url,
usr,
pwd,
position,
gga_period,
&output_type,
);
guard.ntrip_tab.connect(msg_sender, heartbeat, options);
}
_ => error!("ntrip unable to find connected device"),
}
}
m::message::NtripDisconnect(Ok(_)) => {
shared_state.lock().ntrip_tab.disconnect();
}
_ => {
error!("unknown message from front-end");
}
Expand Down
13 changes: 12 additions & 1 deletion console_backend/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ use crate::log_panel::LogLevel;
use crate::output::{CsvLogging, CsvSerializer};
use crate::process_messages::StopToken;
use crate::shared_state::EventType::Refresh;
use crate::tabs::advanced_tab::ntrip_tab::NtripState;
use crate::tabs::{settings_tab, solution_tab::LatLonUnits, update_tab::UpdateTabUpdate};
use crate::utils::{send_conn_state, OkOrLog};
use crate::watch::{WatchReceiver, Watched};
use crate::{common_constants::ConnectionType, connection::Connection};
use crate::{common_constants::ConnectionType, connection::Connection, MsgSender};
use crate::{
common_constants::{self as cc, SbpLogging},
status_bar::Heartbeat,
Expand Down Expand Up @@ -347,6 +348,13 @@ impl SharedState {
self.lock().heartbeat_data.clone()
}

pub fn msg_sender(&self) -> Option<MsgSender> {
match self.connection() {
ConnectionState::Connected { msg_sender, .. } => Some(msg_sender),
_ => None,
}
}

pub fn set_check_visibility(&self, check_visibility: Vec<String>) {
let mut guard = self.lock();
guard.tracking_tab.signals_tab.check_visibility = check_visibility;
Expand Down Expand Up @@ -379,6 +387,7 @@ impl Clone for SharedState {
pub struct SharedStateInner {
pub(crate) logging_bar: LoggingBarState,
pub(crate) log_panel: LogPanelState,
pub(crate) ntrip_tab: NtripState,
pub(crate) tracking_tab: TrackingTabState,
pub(crate) connection_history: ConnectionHistory,
pub(crate) conn: Watched<ConnectionState>,
Expand Down Expand Up @@ -408,6 +417,7 @@ impl SharedStateInner {
SharedStateInner {
logging_bar: LoggingBarState::new(log_directory),
log_panel: LogPanelState::new(),
ntrip_tab: NtripState::default(),
tracking_tab: TrackingTabState::new(),
debug: false,
connection_history,
Expand Down Expand Up @@ -922,6 +932,7 @@ pub enum ConnectionState {
Connected {
conn: Connection,
stop_token: StopToken,
msg_sender: MsgSender,
},

/// Attempting to connect
Expand Down
Loading

0 comments on commit 1aebe7b

Please sign in to comment.