Skip to content

Commit e4cbc0c

Browse files
committed
feat(sync):daemon support for advanced sync
1 parent c640979 commit e4cbc0c

File tree

3 files changed

+145
-82
lines changed

3 files changed

+145
-82
lines changed

aw-sync/src/main.rs

Lines changed: 135 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
// - [x] Setup local sync bucket
33
// - [x] Import local buckets and sync events from aw-server (either through API or through creating a read-only Datastore)
44
// - [x] Import buckets and sync events from remotes
5-
// - [ ] Add CLI arguments
5+
// - [x] Add CLI arguments
66
// - [x] For which local server to use
77
// - [x] For which sync dir to use
8-
// - [ ] Date to start syncing from
8+
// - [x] Date to start syncing from
99

1010
#[macro_use]
1111
extern crate log;
@@ -60,35 +60,45 @@ struct Opts {
6060
enum Commands {
6161
/// Daemon subcommand
6262
/// Starts aw-sync as a daemon, which will sync every 5 minutes.
63-
Daemon {},
63+
Daemon {
64+
/// Date to start syncing from.
65+
/// If not specified, start from beginning.
66+
/// Format: YYYY-MM-DD
67+
#[clap(long, value_parser=parse_start_date)]
68+
start_date: Option<DateTime<Utc>>,
69+
70+
/// Specify buckets to sync using a comma-separated list.
71+
/// By default, all buckets are synced.
72+
#[clap(long)]
73+
buckets: Option<String>,
74+
75+
/// Full path to sync db file
76+
/// Useful for syncing buckets from a specific db file in the sync directory.
77+
/// Must be a valid absolute path to a file in the sync directory.
78+
#[clap(long)]
79+
sync_db: Option<PathBuf>,
80+
},
6481

65-
/// Sync subcommand (basic)
82+
/// Sync subcommand
6683
///
67-
/// Pulls remote buckets then pushes local buckets.
84+
/// Syncs data between local aw-server and sync directory.
85+
/// First pulls remote buckets from the sync directory to the local aw-server.
86+
/// Then pushes local buckets from the aw-server to the local sync directory.
6887
Sync {
6988
/// Host(s) to pull from, comma separated. Will pull from all hosts if not specified.
7089
#[clap(long, value_parser=parse_list)]
7190
host: Option<Vec<String>>,
72-
},
7391

74-
/// Sync subcommand (advanced)
75-
///
76-
/// Pulls remote buckets then pushes local buckets.
77-
/// First pulls remote buckets in the sync directory to the local aw-server.
78-
/// Then pushes local buckets from the aw-server to the local sync directory.
79-
#[clap(arg_required_else_help = true)]
80-
SyncAdvanced {
8192
/// Date to start syncing from.
8293
/// If not specified, start from beginning.
83-
/// NOTE: might be unstable, as count cannot be used to verify integrity of sync.
8494
/// Format: YYYY-MM-DD
8595
#[clap(long, value_parser=parse_start_date)]
8696
start_date: Option<DateTime<Utc>>,
8797

8898
/// Specify buckets to sync using a comma-separated list.
89-
/// If not specified, all buckets will be synced.
90-
#[clap(long, value_parser=parse_list)]
91-
buckets: Option<Vec<String>>,
99+
/// By default, all buckets are synced.
100+
#[clap(long)]
101+
buckets: Option<String>,
92102

93103
/// Mode to sync in. Can be "push", "pull", or "both".
94104
/// Defaults to "both".
@@ -111,6 +121,13 @@ fn parse_start_date(arg: &str) -> Result<DateTime<Utc>, chrono::ParseError> {
111121
}
112122

113123
fn parse_list(arg: &str) -> Result<Vec<String>, clap::Error> {
124+
// If the argument is empty or just whitespace, return an empty Vec
125+
// This handles the case when --buckets is used without a value
126+
if arg.trim().is_empty() {
127+
return Ok(vec![]);
128+
}
129+
130+
// Otherwise, split by comma as usual
114131
Ok(arg.split(',').map(|s| s.to_string()).collect())
115132
}
116133

@@ -139,60 +156,94 @@ fn main() -> Result<(), Box<dyn Error>> {
139156

140157
let client = AwClient::new(&opts.host, port, "aw-sync")?;
141158

142-
// if opts.command is None, then we're using the default subcommand (Sync)
143-
match opts.command.unwrap_or(Commands::Daemon {}) {
159+
// if opts.command is None, then we're using the default subcommand (Daemon)
160+
match opts.command.unwrap_or(Commands::Daemon {
161+
start_date: None,
162+
buckets: None,
163+
sync_db: None,
164+
}) {
144165
// Start daemon
145-
Commands::Daemon {} => {
166+
Commands::Daemon {
167+
start_date,
168+
buckets,
169+
sync_db,
170+
} => {
146171
info!("Starting daemon...");
147-
daemon(&client)?;
148-
}
149-
// Perform basic sync
150-
Commands::Sync { host } => {
151-
// Pull
152-
match host {
153-
Some(hosts) => {
154-
for host in hosts.iter() {
155-
info!("Pulling from host: {}", host);
156-
sync_wrapper::pull(host, &client)?;
157-
}
158-
}
159-
None => {
160-
info!("Pulling from all hosts");
161-
sync_wrapper::pull_all(&client)?;
162-
}
163-
}
164172

165-
// Push
166-
info!("Pushing local data");
167-
sync_wrapper::push(&client)?
173+
// Use an empty vector to sync all buckets for these cases:
174+
// 1. When --buckets '*' is supplied
175+
// 2. When no bucket argument is provided (default)
176+
let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() {
177+
Some(vec![])
178+
} else if let Some(buckets_str) = buckets {
179+
Some(buckets_str.split(',').map(|s| s.to_string()).collect())
180+
} else {
181+
None
182+
};
183+
184+
daemon(&client, start_date, effective_buckets, sync_db)?;
168185
}
169-
// Perform two-way sync
170-
Commands::SyncAdvanced {
186+
// Perform sync
187+
Commands::Sync {
188+
host,
171189
start_date,
172190
buckets,
173191
mode,
174192
sync_db,
175193
} => {
176-
let sync_dir = dirs::get_sync_dir()?;
177-
if let Some(db_path) = &sync_db {
178-
info!("Using sync db: {}", &db_path.display());
194+
// Use an empty vector to sync all buckets for these cases:
195+
// 1. When --buckets '*' is supplied
196+
// 2. When no bucket argument is provided (default)
197+
let effective_buckets = if buckets.as_deref() == Some("*") || buckets.is_none() {
198+
Some(vec![])
199+
} else if let Some(buckets_str) = buckets {
200+
Some(buckets_str.split(',').map(|s| s.to_string()).collect())
201+
} else {
202+
None
203+
};
179204

180-
if !db_path.is_absolute() {
181-
Err("Sync db path must be absolute")?
182-
}
183-
if !db_path.starts_with(&sync_dir) {
184-
Err("Sync db path must be in sync directory")?
205+
// If advanced options are provided, use advanced sync mode
206+
if start_date.is_some() || effective_buckets.is_some() || sync_db.is_some() {
207+
let sync_dir = dirs::get_sync_dir()?;
208+
if let Some(db_path) = &sync_db {
209+
info!("Using sync db: {}", &db_path.display());
210+
211+
if !db_path.is_absolute() {
212+
Err("Sync db path must be absolute")?
213+
}
214+
if !db_path.starts_with(&sync_dir) {
215+
Err("Sync db path must be in sync directory")?
216+
}
185217
}
186-
}
187218

188-
let sync_spec = sync::SyncSpec {
189-
path: sync_dir,
190-
path_db: sync_db,
191-
buckets,
192-
start: start_date,
193-
};
219+
let sync_spec = sync::SyncSpec {
220+
path: sync_dir,
221+
path_db: sync_db,
222+
buckets: effective_buckets,
223+
start: start_date,
224+
};
225+
226+
sync::sync_run(&client, &sync_spec, mode)?
227+
} else {
228+
// Simple host-based sync mode (backwards compatibility)
229+
// Pull
230+
match host {
231+
Some(hosts) => {
232+
for host in hosts.iter() {
233+
info!("Pulling from host: {}", host);
234+
sync_wrapper::pull(host, &client)?;
235+
}
236+
}
237+
None => {
238+
info!("Pulling from all hosts");
239+
sync_wrapper::pull_all(&client)?;
240+
}
241+
}
194242

195-
sync::sync_run(&client, &sync_spec, mode)?
243+
// Push
244+
info!("Pushing local data");
245+
sync_wrapper::push(&client)?
246+
}
196247
}
197248

198249
// List all buckets
@@ -207,23 +258,45 @@ fn main() -> Result<(), Box<dyn Error>> {
207258
Ok(())
208259
}
209260

210-
fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
261+
fn daemon(
262+
client: &AwClient,
263+
start_date: Option<DateTime<Utc>>,
264+
buckets: Option<Vec<String>>,
265+
sync_db: Option<PathBuf>,
266+
) -> Result<(), Box<dyn Error>> {
211267
let (tx, rx) = channel();
212268

213269
ctrlc::set_handler(move || {
214270
let _ = tx.send(());
215271
})?;
216272

273+
let sync_dir = dirs::get_sync_dir()?;
274+
if let Some(db_path) = &sync_db {
275+
info!("Using sync db: {}", &db_path.display());
276+
277+
if !db_path.is_absolute() {
278+
Err("Sync db path must be absolute")?
279+
}
280+
if !db_path.starts_with(&sync_dir) {
281+
Err("Sync db path must be in sync directory")?
282+
}
283+
}
284+
285+
let sync_spec = sync::SyncSpec {
286+
path: sync_dir,
287+
buckets,
288+
path_db: sync_db,
289+
start: start_date,
290+
};
291+
217292
loop {
218-
if let Err(e) = daemon_sync_cycle(client) {
293+
if let Err(e) = sync::sync_run(client, &sync_spec, sync::SyncMode::Both) {
219294
error!("Error during sync cycle: {}", e);
220-
// Re-throw the error
221295
return Err(e);
222296
}
223297

224298
info!("Sync pass done, sleeping for 5 minutes");
225299

226-
// Wait for either the sleep duration or a termination signal
227300
match rx.recv_timeout(Duration::from_secs(300)) {
228301
Ok(_) | Err(RecvTimeoutError::Disconnected) => {
229302
info!("Termination signal received, shutting down.");
@@ -237,13 +310,3 @@ fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
237310

238311
Ok(())
239312
}
240-
241-
fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
242-
info!("Pulling from all hosts");
243-
sync_wrapper::pull_all(client)?;
244-
245-
info!("Pushing local data");
246-
sync_wrapper::push(client)?;
247-
248-
Ok(())
249-
}

aw-sync/src/sync.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,12 +247,18 @@ pub fn sync_datastores(
247247
.get_buckets()
248248
.unwrap()
249249
.iter_mut()
250-
// If buckets vec isn't empty, filter out buckets not in the buckets vec
250+
// Only filter buckets if specific bucket IDs are provided
251251
.filter(|tup| {
252252
let bucket = &tup.1;
253253
if let Some(buckets) = &sync_spec.buckets {
254-
buckets.iter().any(|b_id| b_id == &bucket.id)
254+
// If "*" is in the buckets list or no buckets specified, sync all buckets
255+
if buckets.iter().any(|b_id| b_id == "*") || buckets.is_empty() {
256+
true
257+
} else {
258+
buckets.iter().any(|b_id| b_id == &bucket.id)
259+
}
255260
} else {
261+
// By default, sync all buckets
256262
true
257263
}
258264
})

aw-sync/src/sync_wrapper.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
4848
let sync_spec = SyncSpec {
4949
path: sync_dir.clone(),
5050
path_db: Some(db.path().clone()),
51-
buckets: Some(vec![
52-
format!("aw-watcher-window_{}", host),
53-
format!("aw-watcher-afk_{}", host),
54-
]),
51+
buckets: None, // Sync all buckets by default
5552
start: None,
5653
};
5754
sync_run(client, &sync_spec, SyncMode::Pull)?;
@@ -67,10 +64,7 @@ pub fn push(client: &AwClient) -> Result<(), Box<dyn Error>> {
6764
let sync_spec = SyncSpec {
6865
path: sync_dir,
6966
path_db: None,
70-
buckets: Some(vec![
71-
format!("aw-watcher-window_{}", client.hostname),
72-
format!("aw-watcher-afk_{}", client.hostname),
73-
]),
67+
buckets: None, // Sync all buckets by default
7468
start: None,
7569
};
7670
sync_run(client, &sync_spec, SyncMode::Push)?;

0 commit comments

Comments
 (0)