shared_rwlock\sys/
rwlock_inner.rs

1// SPDX-License-Identifier: Apache-2.0 OR MIT
2//
3// # Forked rust (ver. 1.84.0)
4// See: https://github.com/rust-lang/rust/blob/1.84.0/library/std/src/sys/sync/rwlock/futex.rs
5// See Rust license detail: https://github.com/rust-lang/rust/pull/43498
6
7#[cfg(target_os = "windows")]
8use super::windows::futex::{Futex, Primitive, futex_wait, futex_wake, futex_wake_all};
9use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
10
11/// It exists in the SharedMemory situation and atomically edits the data involved in the lock.
12///
13/// No `Self::new` to create by `core::mem::transmute` SharedMemory.
14///
15/// # Safety
16/// The behavior when other threads directly tamper with this memory is undefined.
17pub struct RwLock {
18    // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
19    // Bits 0..30:
20    //   0: Unlocked
21    //   1..=0x3FFF_FFFE: Locked by N readers
22    //   0x3FFF_FFFF: Write locked
23    // Bit 30: Readers are waiting on this futex.
24    // Bit 31: Writers are waiting on the writer_notify futex.
25    state: Futex,
26    // The 'condition variable' to notify writers through.
27    // Incremented on every signal.
28    writer_notify: Futex,
29
30    _reserved8: u64,  // 0x8
31    _reserved10: u64, // 0x10
32    _reserved18: u64, // 0x18
33    _reserved20: u64, // 0x20
34    _reserved28: u64, // 0x28
35    _reserved30: u64, // 0x30
36
37                      // _reserved38: u64, // 0x38: Wrapper field(poison: bool + u8 + u32)
38}
39const _: () = assert!(core::mem::size_of::<RwLock>() == 64 - 8);
40
41const READ_LOCKED: Primitive = 1;
42const MASK: Primitive = (1 << 30) - 1;
43const WRITE_LOCKED: Primitive = MASK;
44const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED
45const MAX_READERS: Primitive = MASK - 1;
46const READERS_WAITING: Primitive = 1 << 30;
47const WRITERS_WAITING: Primitive = 1 << 31;
48
49#[inline]
50const fn is_unlocked(state: Primitive) -> bool {
51    state & MASK == 0
52}
53
54#[inline]
55const fn is_write_locked(state: Primitive) -> bool {
56    state & MASK == WRITE_LOCKED
57}
58
59#[inline]
60const fn has_readers_waiting(state: Primitive) -> bool {
61    state & READERS_WAITING != 0
62}
63
64#[inline]
65const fn has_writers_waiting(state: Primitive) -> bool {
66    state & WRITERS_WAITING != 0
67}
68
69#[inline]
70const fn is_read_lockable(state: Primitive) -> bool {
71    // This also returns false if the counter could overflow if we tried to read lock it.
72    //
73    // We don't allow read-locking if there's readers waiting, even if the lock is unlocked
74    // and there's no writers waiting. The only situation when this happens is after unlocking,
75    // at which point the unlocking thread might be waking up writers, which have priority over readers.
76    // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
77    state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
78}
79
80#[inline]
81const fn is_read_lockable_after_wakeup(state: Primitive) -> bool {
82    // We make a special case for checking if we can read-lock _after_ a reader thread that went to
83    // sleep has been woken up by a call to `downgrade`.
84    //
85    // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be
86    // no readers waiting and the lock should be read-locked (not write-locked or unlocked).
87    //
88    // Note that we do not check if any writers are waiting. This is because a call to `downgrade`
89    // implies that the caller wants other readers to read the value protected by the lock. If we
90    // did not allow readers to acquire the lock before writers after a `downgrade`, then only the
91    // original writer would be able to read the value, thus defeating the purpose of `downgrade`.
92    state & MASK < MAX_READERS
93        && !has_readers_waiting(state)
94        && !is_write_locked(state)
95        && !is_unlocked(state)
96}
97
98#[inline]
99const fn has_reached_max_readers(state: Primitive) -> bool {
100    state & MASK == MAX_READERS
101}
102
103impl RwLock {
104    #[allow(unused)]
105    #[inline]
106    pub const fn new() -> Self {
107        Self {
108            state: Futex::new(0),
109            writer_notify: Futex::new(0),
110            _reserved8: 0,
111            _reserved10: 0,
112            _reserved18: 0,
113            _reserved20: 0,
114            _reserved28: 0,
115            _reserved30: 0,
116        }
117    }
118
119    #[inline]
120    pub fn try_read(&self) -> bool {
121        self.state
122            .fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
123            .is_ok()
124    }
125
126    #[inline]
127    pub fn read(&self) {
128        let state = self.state.load(Relaxed);
129        if !is_read_lockable(state)
130            || self
131                .state
132                .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
133                .is_err()
134        {
135            self.read_contended();
136        }
137    }
138
139    /// # Safety
140    ///
141    /// The `RwLock` must be read-locked (N readers) in order to call this.
142    #[inline]
143    pub unsafe fn read_unlock(&self) {
144        let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
145
146        // It's impossible for a reader to be waiting on a read-locked RwLock,
147        // except if there is also a writer waiting.
148        debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
149
150        // Wake up a writer if we were the last reader and there's a writer waiting.
151        if is_unlocked(state) && has_writers_waiting(state) {
152            self.wake_writer_or_readers(state);
153        }
154    }
155
156    #[cold]
157    fn read_contended(&self) {
158        let mut has_slept = false;
159        let mut state = self.spin_read();
160
161        loop {
162            // If we have just been woken up, first check for a `downgrade` call.
163            // Otherwise, if we can read-lock it, lock it.
164            if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) {
165                match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
166                {
167                    Ok(_) => return, // Locked!
168                    Err(s) => {
169                        state = s;
170                        continue;
171                    }
172                }
173            }
174
175            // Check for overflow.
176            assert!(!has_reached_max_readers(state), "too many active read locks on RwLock");
177
178            // Make sure the readers waiting bit is set before we go to sleep.
179            if !has_readers_waiting(state) {
180                if let Err(s) =
181                    self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
182                {
183                    state = s;
184                    continue;
185                }
186            }
187
188            // Wait for the state to change.
189            futex_wait(&self.state, state | READERS_WAITING, None);
190            has_slept = true;
191
192            // Spin again after waking up.
193            state = self.spin_read();
194        }
195    }
196
197    #[inline]
198    pub fn try_write(&self) -> bool {
199        self.state
200            .fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
201            .is_ok()
202    }
203
204    #[inline]
205    pub fn write(&self) {
206        if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
207            self.write_contended();
208        }
209    }
210
211    /// # Safety
212    ///
213    /// The `RwLock` must be write-locked (single writer) in order to call this.
214    #[inline]
215    pub unsafe fn write_unlock(&self) {
216        let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
217
218        debug_assert!(is_unlocked(state));
219
220        if has_writers_waiting(state) || has_readers_waiting(state) {
221            self.wake_writer_or_readers(state);
222        }
223    }
224
225    /// # Safety
226    ///
227    /// The `RwLock` must be write-locked (single writer) in order to call this.
228    #[inline]
229    pub unsafe fn downgrade(&self) {
230        // Removes all write bits and adds a single read bit.
231        let state = self.state.fetch_add(DOWNGRADE, Release);
232        debug_assert!(is_write_locked(state), "RwLock must be write locked to call `downgrade`");
233
234        if has_readers_waiting(state) {
235            // Since we had the exclusive lock, nobody else can unset this bit.
236            self.state.fetch_sub(READERS_WAITING, Relaxed);
237            futex_wake_all(&self.state);
238        }
239    }
240
241    #[cold]
242    fn write_contended(&self) {
243        let mut state = self.spin_write();
244
245        let mut other_writers_waiting = 0;
246
247        loop {
248            // If it's unlocked, we try to lock it.
249            if is_unlocked(state) {
250                match self.state.compare_exchange_weak(
251                    state,
252                    state | WRITE_LOCKED | other_writers_waiting,
253                    Acquire,
254                    Relaxed,
255                ) {
256                    Ok(_) => return, // Locked!
257                    Err(s) => {
258                        state = s;
259                        continue;
260                    }
261                }
262            }
263
264            // Set the waiting bit indicating that we're waiting on it.
265            if !has_writers_waiting(state) {
266                if let Err(s) =
267                    self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
268                {
269                    state = s;
270                    continue;
271                }
272            }
273
274            // Other writers might be waiting now too, so we should make sure
275            // we keep that bit on once we manage lock it.
276            other_writers_waiting = WRITERS_WAITING;
277
278            // Examine the notification counter before we check if `state` has changed,
279            // to make sure we don't miss any notifications.
280            let seq = self.writer_notify.load(Acquire);
281
282            // Don't go to sleep if the lock has become available,
283            // or if the writers waiting bit is no longer set.
284            state = self.state.load(Relaxed);
285            if is_unlocked(state) || !has_writers_waiting(state) {
286                continue;
287            }
288
289            // Wait for the state to change.
290            futex_wait(&self.writer_notify, seq, None);
291
292            // Spin again after waking up.
293            state = self.spin_write();
294        }
295    }
296
297    /// Wakes up waiting threads after unlocking.
298    ///
299    /// If both are waiting, this will wake up only one writer, but will fall
300    /// back to waking up readers if there was no writer to wake up.
301    #[cold]
302    fn wake_writer_or_readers(&self, mut state: Primitive) {
303        assert!(is_unlocked(state));
304
305        // The readers waiting bit might be turned on at any point now,
306        // since readers will block when there's anything waiting.
307        // Writers will just lock the lock though, regardless of the waiting bits,
308        // so we don't have to worry about the writer waiting bit.
309        //
310        // If the lock gets locked in the meantime, we don't have to do
311        // anything, because then the thread that locked the lock will take
312        // care of waking up waiters when it unlocks.
313
314        // If only writers are waiting, wake one of them up.
315        if state == WRITERS_WAITING {
316            match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
317                Ok(_) => {
318                    self.wake_writer();
319                    return;
320                }
321                Err(s) => {
322                    // Maybe some readers are now waiting too. So, continue to the next `if`.
323                    state = s;
324                }
325            }
326        }
327
328        // If both writers and readers are waiting, leave the readers waiting
329        // and only wake up one writer.
330        if state == READERS_WAITING + WRITERS_WAITING {
331            if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
332                // The lock got locked. Not our problem anymore.
333                return;
334            }
335            if self.wake_writer() {
336                return;
337            }
338            // No writers were actually blocked on futex_wait, so we continue
339            // to wake up readers instead, since we can't be sure if we notified a writer.
340            state = READERS_WAITING;
341        }
342
343        // If readers are waiting, wake them all up.
344        if state == READERS_WAITING
345            && self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok()
346        {
347            futex_wake_all(&self.state);
348        }
349    }
350
351    /// This wakes one writer and returns true if we woke up a writer that was
352    /// blocked on futex_wait.
353    ///
354    /// If this returns false, it might still be the case that we notified a
355    /// writer that was about to go to sleep.
356    fn wake_writer(&self) -> bool {
357        self.writer_notify.fetch_add(1, Release);
358        futex_wake(&self.writer_notify)
359        // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
360        // up any threads or not, and always return `false` here. That still
361        // results in correct behavior: it just means readers get woken up as
362        // well in case both readers and writers were waiting.
363    }
364
365    /// Spin for a while, but stop directly at the given condition.
366    #[inline]
367    fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
368        let mut spin = 100; // Chosen by fair dice roll.
369        loop {
370            let state = self.state.load(Relaxed);
371            if f(state) || spin == 0 {
372                return state;
373            }
374            core::hint::spin_loop();
375            spin -= 1;
376        }
377    }
378
379    #[inline]
380    fn spin_write(&self) -> Primitive {
381        // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
382        self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
383    }
384
385    #[inline]
386    fn spin_read(&self) -> Primitive {
387        // Stop spinning when it's unlocked or read locked, or when there's waiting threads.
388        self.spin_until(|state| {
389            !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
390        })
391    }
392}