From ccacf9798cf9a6331a8c2deba9c52316c01949cd Mon Sep 17 00:00:00 2001 From: Pascal Kuthe Date: Sun, 6 Aug 2023 03:44:19 +0200 Subject: [PATCH] cleanup memory ordering --- src/boxcar.rs | 28 +++++++++++++++------------- src/worker.rs | 5 +++-- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/boxcar.rs b/src/boxcar.rs index 1d8ae1d..2e64c62 100644 --- a/src/boxcar.rs +++ b/src/boxcar.rs @@ -24,7 +24,7 @@ use std::alloc::Layout; use std::cell::UnsafeCell; use std::mem::MaybeUninit; -use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, Ordering}; use std::{ptr, slice}; use crate::{Item, Utf32String}; @@ -41,8 +41,6 @@ pub(crate) struct Vec { inflight: AtomicU64, /// buckets of length 32, 64 .. 2^31 buckets: [Bucket; BUCKETS as usize], - /// the number of initialized elements in this vector - count: AtomicU32, /// the number of matcher columns in this vector, its absolutely critical that /// this remains constant and after initilaziaton (safety invariant) since /// it is used to calculate the Entry layout @@ -69,7 +67,6 @@ impl Vec { Vec { buckets: buckets.map(Bucket::new), inflight: AtomicU64::new(0), - count: AtomicU32::new(0), columns, } } @@ -78,8 +75,11 @@ impl Vec { } /// Returns the number of elements in the vector. + #[inline] pub fn count(&self) -> u32 { - self.count.load(Ordering::Acquire) + self.inflight + .load(Ordering::Acquire) + .min(MAX_ENTRIES as u64) as u32 } // Returns a reference to the element at the given index. @@ -87,19 +87,23 @@ impl Vec { // # Safety // // Entry at `index` must be initialized. + #[inline] pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { let location = Location::of(index); - // safety: caller guarantees the entry is initialized unsafe { let entries = self .buckets .get_unchecked(location.bucket as usize) .entries - .load(Ordering::Acquire); + .load(Ordering::Relaxed); debug_assert!(!entries.is_null()); let entry = Bucket::::get(entries, location.entry, self.columns); - debug_assert!((*entry).active.load(Ordering::Acquire)); + // this looks odd but is necessary to ensure cross + // thread synchronizaton (essentailly acting as a memory barrier) + // since the caller must only gurantee that he has observed active on any thread + // but the current thread might still have an old value cached (altough unlikely) + let _ = (*entry).active.load(Ordering::Acquire); Entry::read(entry, self.columns) } } @@ -114,7 +118,7 @@ impl Vec { .buckets .get_unchecked(location.bucket as usize) .entries - .load(Ordering::Acquire); + .load(Ordering::Relaxed); // bucket is uninitialized if entries.is_null() { @@ -175,8 +179,6 @@ impl Vec { (*entry).active.store(true, Ordering::Release); } - // increase the true count - self.count.fetch_add(1, Ordering::Release); index } @@ -275,7 +277,7 @@ impl<'v, T> Iterator for Iter<'v, T> { return None; } debug_assert!(self.idx < self.end, "huh {} {}", self.idx, self.end); - debug_assert!(self.end as u64 <= self.vec.inflight.load(Ordering::Acquire)); + debug_assert!(self.end as u64 <= self.vec.inflight.load(Ordering::Relaxed)); loop { let entries = unsafe { @@ -283,7 +285,7 @@ impl<'v, T> Iterator for Iter<'v, T> { .buckets .get_unchecked(self.location.bucket as usize) .entries - .load(Ordering::Acquire) + .load(Ordering::Relaxed) }; debug_assert!(self.location.bucket < BUCKETS); diff --git a/src/worker.rs b/src/worker.rs index 22df09b..73c81b6 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -161,7 +161,7 @@ impl Worker { // there are usually only very few in flight items (one for each writer) self.remove_in_flight_matches(); self.process_new_items_trivial(); - if self.should_notify.load(atomic::Ordering::Acquire) { + if self.should_notify.load(atomic::Ordering::Relaxed) { (self.notify)(); } return; @@ -184,6 +184,7 @@ impl Worker { .take_any_while(|_| !self.canceled.load(atomic::Ordering::Relaxed)) .for_each(|match_| { if match_.idx == u32::MAX { + debug_assert_eq!(match_.score, 0); unmatched.fetch_add(1, atomic::Ordering::Relaxed); return; } @@ -240,7 +241,7 @@ impl Worker { } else { self.matches .truncate(self.matches.len() - take(unmatched.get_mut()) as usize); - if self.should_notify.load(atomic::Ordering::Acquire) { + if self.should_notify.load(atomic::Ordering::Relaxed) { (self.notify)(); } }