Skip to content

Commit 1826e8c

Browse files
committed
feat(sync):daemon support for advanced sync
1 parent 869d65e commit 1826e8c

File tree

1 file changed

+97
-6
lines changed

1 file changed

+97
-6
lines changed

aw-sync/src/main.rs

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
// - [x] Setup local sync bucket
33
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
44
// - [x] Import buckets and sync events from remotes
5-
// - [ ] Add CLI arguments
5+
// - [x] Add CLI arguments
66
// - [x] For which local server to use
77
// - [x] For which sync dir to use
8-
// - [ ] Date to start syncing from
8+
// - [x] Date to start syncing from
99

1010
#[macro_use]
1111
extern crate log;
@@ -60,7 +60,28 @@ struct Opts {
6060
enum Commands {
6161
/// Daemon subcommand
6262
/// Starts aw-sync as a daemon, which will sync every 5 minutes.
63-
Daemon {},
63+
Daemon {
64+
/// Use advanced sync mode
65+
#[clap(long)]
66+
advanced: bool,
67+
68+
/// Date to start syncing from.
69+
/// If not specified, start from beginning.
70+
/// Format: YYYY-MM-DD
71+
#[clap(long, value_parser=parse_start_date)]
72+
start_date: Option<DateTime<Utc>>,
73+
74+
/// Specify buckets to sync using a comma-separated list.
75+
/// If not specified, all buckets will be synced.
76+
#[clap(long, value_parser=parse_list)]
77+
buckets: Option<Vec<String>>,
78+
79+
/// Full path to sync db file
80+
/// Useful for syncing buckets from a specific db file in the sync directory.
81+
/// Must be a valid absolute path to a file in the sync directory.
82+
#[clap(long)]
83+
sync_db: Option<PathBuf>,
84+
},
6485

6586
/// Sync subcommand (basic)
6687
///
@@ -140,11 +161,27 @@ fn main() -> Result<(), Box<dyn Error>> {
140161
let client = AwClient::new(&opts.host, port, "aw-sync")?;
141162

142163
// if opts.command is None, then we're using the default subcommand (Sync)
143-
match opts.command.unwrap_or(Commands::Daemon {}) {
164+
match opts.command.unwrap_or(Commands::Daemon {
165+
advanced: false,
166+
start_date: None,
167+
buckets: None,
168+
sync_db: None,
169+
}) {
144170
// Start daemon
145-
Commands::Daemon {} => {
171+
Commands::Daemon {
172+
advanced,
173+
start_date,
174+
buckets,
175+
sync_db,
176+
} => {
146177
info!("Starting daemon...");
147-
daemon(&client)?;
178+
if advanced {
179+
info!("Using advanced sync mode");
180+
daemon_advanced(&client, start_date, buckets, sync_db)?;
181+
} else {
182+
info!("Using basic sync mode");
183+
daemon(&client)?;
184+
}
148185
}
149186
// Perform basic sync
150187
Commands::Sync { host } => {
@@ -167,6 +204,7 @@ fn main() -> Result<(), Box<dyn Error>> {
167204
sync_wrapper::push(&client)?
168205
}
169206
// Perform two-way sync
207+
// Only way to sync non-window buckets
170208
Commands::SyncAdvanced {
171209
start_date,
172210
buckets,
@@ -247,3 +285,56 @@ fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
247285

248286
Ok(())
249287
}
288+
289+
fn daemon_advanced(
290+
client: &AwClient,
291+
start_date: Option<DateTime<Utc>>,
292+
buckets: Option<Vec<String>>,
293+
sync_db: Option<PathBuf>,
294+
) -> Result<(), Box<dyn Error>> {
295+
let (tx, rx) = channel();
296+
297+
ctrlc::set_handler(move || {
298+
let _ = tx.send(());
299+
})?;
300+
301+
let sync_dir = dirs::get_sync_dir()?;
302+
if let Some(db_path) = &sync_db {
303+
info!("Using sync db: {}", &db_path.display());
304+
305+
if !db_path.is_absolute() {
306+
Err("Sync db path must be absolute")?
307+
}
308+
if !db_path.starts_with(&sync_dir) {
309+
Err("Sync db path must be in sync directory")?
310+
}
311+
}
312+
313+
let sync_spec = sync::SyncSpec {
314+
path: sync_dir,
315+
path_db: sync_db,
316+
buckets,
317+
start: start_date,
318+
};
319+
320+
loop {
321+
if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) {
322+
error!("Error during sync cycle: {}", e);
323+
return Err(e);
324+
}
325+
326+
info!("Advanced sync pass done, sleeping for 5 minutes");
327+
328+
match rx.recv_timeout(Duration::from_secs(300)) {
329+
Ok(_) | Err(RecvTimeoutError::Disconnected) => {
330+
info!("Termination signal received, shutting down.");
331+
break;
332+
}
333+
Err(RecvTimeoutError::Timeout) => {
334+
// Continue the loop if the timeout occurs
335+
}
336+
}
337+
}
338+
339+
Ok(())
340+
}

0 commit comments

Comments
 (0)