Skip to content

Commit f746520

Browse files
committed
perf: rm global gc lock & rm most spin locks
use condvar instead of spin lock
1 parent 866e917 commit f746520

File tree

2 files changed

+66
-49
lines changed

2 files changed

+66
-49
lines changed

immix/src/collector.rs

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ use std::{
22
cell::RefCell,
33
ptr::drop_in_place,
44
sync::atomic::{AtomicPtr, Ordering},
5-
thread::yield_now,
65
};
76

87
use libc::malloc;
9-
use parking_lot::lock_api::{RawRwLock, RawRwLockRecursive};
108
use vector_map::VecMap;
119

1210
#[cfg(feature = "llvm_stackmap")]
@@ -15,7 +13,7 @@ use crate::{
1513
allocator::{GlobalAllocator, ThreadLocalAllocator},
1614
block::{Block, LineHeaderExt, ObjectType},
1715
gc_is_auto_collect_enabled, spin_until, HeaderExt, ENABLE_EVA, GC_COLLECTOR_COUNT, GC_ID,
18-
GC_MARKING, GC_MARK_WAITING, GC_RUNNING, GC_RW_LOCK, GC_SWEEPING, GC_SWEEPPING_NUM, LINE_SIZE,
16+
GC_MARKING, GC_MARK_COND, GC_RUNNING, GC_SWEEPING, GC_SWEEPPING_NUM, LINE_SIZE,
1917
NUM_LINES_PER_BLOCK, THRESHOLD_PROPORTION,
2018
};
2119

@@ -58,7 +56,10 @@ impl Drop for Collector {
5856
// println!("Collector {} is dropped", self.id);
5957
unsafe {
6058
if (*self.thread_local_allocator).is_live() {
61-
GC_COLLECTOR_COUNT.fetch_sub(1, Ordering::SeqCst);
59+
let mut v = GC_COLLECTOR_COUNT.lock();
60+
v.0 = v.0 - 1;
61+
drop(v);
62+
GC_MARK_COND.notify_all();
6263
drop_in_place(self.thread_local_allocator);
6364
}
6465
libc::free(self.thread_local_allocator as *mut libc::c_void);
@@ -75,7 +76,10 @@ impl Collector {
7576
/// ## Parameters
7677
/// * `heap_size` - heap size
7778
pub fn new(ga: &mut GlobalAllocator) -> Self {
78-
GC_COLLECTOR_COUNT.fetch_add(1, Ordering::SeqCst);
79+
let mut v = GC_COLLECTOR_COUNT.lock();
80+
v.0 = v.0 + 1;
81+
drop(v);
82+
GC_MARK_COND.notify_all();
7983
let id = GC_ID.fetch_add(1, Ordering::Relaxed);
8084
unsafe {
8185
let tla = ThreadLocalAllocator::new(ga);
@@ -108,7 +112,10 @@ impl Collector {
108112
}
109113

110114
pub fn unregister_current_thread(&self) {
111-
GC_COLLECTOR_COUNT.fetch_sub(1, Ordering::SeqCst);
115+
let mut v = GC_COLLECTOR_COUNT.lock();
116+
v.0 = v.0 - 1;
117+
drop(v);
118+
GC_MARK_COND.notify_all();
112119
unsafe {
113120
drop_in_place(self.thread_local_allocator);
114121
}
@@ -367,30 +374,29 @@ impl Collector {
367374
/// this mark function is __precise__
368375
pub fn mark(&self) {
369376
GC_RUNNING.store(true, Ordering::Release);
370-
let gcs = GC_COLLECTOR_COUNT.load(Ordering::Acquire);
371-
// println!("gc {}: mark start gcs {}", self.id,gcs);
372-
let v = GC_MARK_WAITING.fetch_add(1, Ordering::Acquire);
373-
if v + 1 != gcs {
374-
let mut i = 0;
375-
while !GC_MARKING.load(Ordering::Acquire) {
376-
// 防止 gc count 改变(一个线程在gc时消失了
377-
let gcs = GC_COLLECTOR_COUNT.load(Ordering::Acquire);
378-
if gcs == v + 1 {
377+
378+
let mut v = GC_COLLECTOR_COUNT.lock();
379+
let (count, mut waiting) = *v;
380+
waiting += 1;
381+
*v = (count, waiting);
382+
// println!("gc mark {}: waiting: {}, count: {}", self.id, waiting, count);
383+
if waiting != count {
384+
GC_MARK_COND.wait_while(&mut v, |(c, _)| {
385+
// 线程数量变化了?
386+
if waiting == *c {
379387
GC_MARKING.store(true, Ordering::Release);
380-
break;
381-
}
382-
core::hint::spin_loop();
383-
i += 1;
384-
if i % 100 == 0 {
385-
yield_now();
388+
GC_MARK_COND.notify_all();
389+
return false;
386390
}
387-
// if i % 10000000==0{
388-
// println!("gc run {:?}", GC_RUNNING.load(Ordering::Acquire))
389-
// }
390-
}
391+
!GC_MARKING.load(Ordering::Acquire)
392+
});
393+
drop(v);
391394
} else {
392395
GC_MARKING.store(true, Ordering::Release);
396+
drop(v);
397+
GC_MARK_COND.notify_all();
393398
}
399+
394400
#[cfg(feature = "shadow_stack")]
395401
{
396402
for (root, obj_type) in self.roots.iter() {
@@ -463,18 +469,25 @@ impl Collector {
463469
}
464470
}
465471

466-
let v = GC_MARK_WAITING.fetch_sub(1, Ordering::AcqRel);
467-
if v - 1 == 0 {
468-
GC_MARKING.store(false, Ordering::Release);
472+
let mut v = GC_COLLECTOR_COUNT.lock();
473+
let (count, mut waiting) = *v;
474+
waiting -= 1;
475+
*v = (count, waiting);
476+
// println!("gc {}: waiting: {}, count: {}", self.id, waiting, count);
477+
if waiting != 0 {
478+
GC_MARK_COND.wait_while(&mut v, |_| GC_MARKING.load(Ordering::Acquire));
469479
} else {
470-
spin_until!(!GC_MARKING.load(Ordering::Acquire));
480+
GC_MARKING.store(false, Ordering::Release);
481+
drop(v);
482+
GC_MARK_COND.notify_all();
471483
}
472484
}
473485

474486
/// # sweep
475487
///
476488
/// since we did synchronization in mark, we don't need to do synchronization again in sweep
477489
pub fn sweep(&self) -> usize {
490+
GC_RUNNING.store(false, Ordering::Release);
478491
GC_SWEEPPING_NUM.fetch_add(1, Ordering::AcqRel);
479492
GC_SWEEPING.store(true, Ordering::Release);
480493
let used = unsafe {
@@ -487,9 +500,6 @@ impl Collector {
487500
let v = GC_SWEEPPING_NUM.fetch_sub(1, Ordering::AcqRel);
488501
if v - 1 == 0 {
489502
GC_SWEEPING.store(false, Ordering::Release);
490-
GC_RUNNING.store(false, Ordering::Release);
491-
} else {
492-
spin_until!(!GC_SWEEPING.load(Ordering::Acquire));
493503
}
494504
used
495505
}
@@ -548,8 +558,6 @@ impl Collector {
548558
}
549559
(*self.mark_histogram).clear();
550560
}
551-
let lock = unsafe { GC_RW_LOCK.raw() };
552-
spin_until!(lock.try_lock_shared_recursive());
553561
let time = std::time::Instant::now();
554562
unsafe {
555563
self.thread_local_allocator
@@ -566,7 +574,6 @@ impl Collector {
566574
.as_mut()
567575
.unwrap()
568576
.set_collect_mode(false);
569-
lock.unlock_shared();
570577
}
571578
log::info!(
572579
"gc {} collect done, mark: {:?}, sweep: {:?}, used heap size: {} byte, total: {:?}",
@@ -658,6 +665,8 @@ mod tests {
658665
fn test_basic_multiple_thread_gc() {
659666
let _lock = THE_RESOURCE.lock();
660667
let mut handles = vec![];
668+
gc_disable_auto_collect();
669+
no_gc_thread();
661670
for _ in 0..10 {
662671
let t = std::thread::spawn(|| {
663672
SPACE.with(|gc| unsafe {

immix/src/lib.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use std::{
22
cell::RefCell,
3-
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
3+
sync::{
4+
atomic::{AtomicBool, AtomicUsize, Ordering},
5+
Arc,
6+
},
47
};
58

69
pub use int_enum::IntEnum;
@@ -24,17 +27,14 @@ pub use block::*;
2427
pub use collector::*;
2528
pub use consts::*;
2629

27-
use parking_lot::{lock_api::RawRwLock, RwLock};
30+
use parking_lot::{Condvar, Mutex};
2831
#[cfg(feature = "llvm_stackmap")]
2932
use rustc_hash::FxHashMap;
3033

3134
thread_local! {
3235
pub static SPACE: RefCell<Collector> = unsafe {
3336
// gc运行中的时候不能增加线程
34-
let l = GC_RW_LOCK.raw();
35-
spin_until!(l.try_lock_exclusive());
3637
let gc = Collector::new(GLOBAL_ALLOCATOR.0.as_mut().unwrap());
37-
l.unlock_exclusive();
3838
RefCell::new(gc)
3939
};
4040
}
@@ -156,16 +156,22 @@ pub fn gc_init(ptr: *mut u8) {
156156
/// during thread stucking, if a gc is triggered, it will skip waiting for this thread to
157157
/// reach a safe point
158158
pub fn thread_stuck_start() {
159-
GC_COLLECTOR_COUNT.fetch_sub(1, Ordering::SeqCst);
159+
let mut v = GC_COLLECTOR_COUNT.lock();
160+
v.0 = v.0 - 1;
161+
drop(v);
162+
GC_MARK_COND.notify_all();
160163
}
161164

162165
/// notify gc a thread is not stuck anymore
163166
///
164167
/// if a gc is triggered during thread stucking, this function
165168
/// will block until the gc is finished
166169
pub fn thread_stuck_end() {
167-
spin_until!(!GC_RUNNING.load(Ordering::Acquire));
168-
GC_COLLECTOR_COUNT.fetch_add(1, Ordering::SeqCst);
170+
let mut v = GC_COLLECTOR_COUNT.lock();
171+
GC_MARK_COND.wait_while(&mut v, |_| GC_RUNNING.load(Ordering::SeqCst));
172+
v.0 = v.0 + 1;
173+
drop(v);
174+
GC_MARK_COND.notify_all();
169175
}
170176

171177
/// # set evacuation
@@ -196,12 +202,18 @@ impl GAWrapper {
196202
/// collector count
197203
///
198204
/// should be the same as the number of threads
199-
static GC_COLLECTOR_COUNT: AtomicUsize = AtomicUsize::new(0);
205+
static GC_COLLECTOR_COUNT: Mutex<(usize, usize)> = Mutex::new((0, 0));
200206

201-
static GC_MARK_WAITING: AtomicUsize = AtomicUsize::new(0);
207+
// static GC_MARK_WAITING: AtomicUsize = AtomicUsize::new(0);
202208

203209
static GC_MARKING: AtomicBool = AtomicBool::new(false);
204210

211+
// static GC_MARK_COND: Arc< Condvar> = Arc::new( Condvar::new());
212+
213+
lazy_static! {
214+
static ref GC_MARK_COND: Arc<Condvar> = Arc::new(Condvar::new());
215+
}
216+
205217
static GC_SWEEPPING_NUM: AtomicUsize = AtomicUsize::new(0);
206218

207219
static GC_SWEEPING: AtomicBool = AtomicBool::new(false);
@@ -217,8 +229,4 @@ static GC_AUTOCOLLECT_ENABLE: AtomicBool = AtomicBool::new(true);
217229
#[cfg(not(feature = "auto_gc"))]
218230
static GC_AUTOCOLLECT_ENABLE: AtomicBool = AtomicBool::new(false);
219231

220-
lazy_static! {
221-
pub static ref GC_RW_LOCK: RwLock<()> = RwLock::new(());
222-
}
223-
224232
unsafe impl Sync for GAWrapper {}

0 commit comments

Comments
 (0)