shared_rwlock\sys/rwlock_inner.rs
1// SPDX-License-Identifier: Apache-2.0 OR MIT
2//
3// # Forked rust (ver. 1.84.0)
4// See: https://github.com/rust-lang/rust/blob/1.84.0/library/std/src/sys/sync/rwlock/futex.rs
5// See Rust license detail: https://github.com/rust-lang/rust/pull/43498
6
7#[cfg(target_os = "windows")]
8use super::windows::futex::{Futex, Primitive, futex_wait, futex_wake, futex_wake_all};
9use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
10
11/// It exists in the SharedMemory situation and atomically edits the data involved in the lock.
12///
13/// No `Self::new` to create by `core::mem::transmute` SharedMemory.
14///
15/// # Safety
16/// The behavior when other threads directly tamper with this memory is undefined.
17pub struct RwLock {
18 // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag.
19 // Bits 0..30:
20 // 0: Unlocked
21 // 1..=0x3FFF_FFFE: Locked by N readers
22 // 0x3FFF_FFFF: Write locked
23 // Bit 30: Readers are waiting on this futex.
24 // Bit 31: Writers are waiting on the writer_notify futex.
25 state: Futex,
26 // The 'condition variable' to notify writers through.
27 // Incremented on every signal.
28 writer_notify: Futex,
29
30 _reserved8: u64, // 0x8
31 _reserved10: u64, // 0x10
32 _reserved18: u64, // 0x18
33 _reserved20: u64, // 0x20
34 _reserved28: u64, // 0x28
35 _reserved30: u64, // 0x30
36
37 // _reserved38: u64, // 0x38: Wrapper field(poison: bool + u8 + u32)
38}
39const _: () = assert!(core::mem::size_of::<RwLock>() == 64 - 8);
40
41const READ_LOCKED: Primitive = 1;
42const MASK: Primitive = (1 << 30) - 1;
43const WRITE_LOCKED: Primitive = MASK;
44const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED
45const MAX_READERS: Primitive = MASK - 1;
46const READERS_WAITING: Primitive = 1 << 30;
47const WRITERS_WAITING: Primitive = 1 << 31;
48
49#[inline]
50const fn is_unlocked(state: Primitive) -> bool {
51 state & MASK == 0
52}
53
54#[inline]
55const fn is_write_locked(state: Primitive) -> bool {
56 state & MASK == WRITE_LOCKED
57}
58
59#[inline]
60const fn has_readers_waiting(state: Primitive) -> bool {
61 state & READERS_WAITING != 0
62}
63
64#[inline]
65const fn has_writers_waiting(state: Primitive) -> bool {
66 state & WRITERS_WAITING != 0
67}
68
69#[inline]
70const fn is_read_lockable(state: Primitive) -> bool {
71 // This also returns false if the counter could overflow if we tried to read lock it.
72 //
73 // We don't allow read-locking if there's readers waiting, even if the lock is unlocked
74 // and there's no writers waiting. The only situation when this happens is after unlocking,
75 // at which point the unlocking thread might be waking up writers, which have priority over readers.
76 // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary.
77 state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
78}
79
80#[inline]
81const fn is_read_lockable_after_wakeup(state: Primitive) -> bool {
82 // We make a special case for checking if we can read-lock _after_ a reader thread that went to
83 // sleep has been woken up by a call to `downgrade`.
84 //
85 // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be
86 // no readers waiting and the lock should be read-locked (not write-locked or unlocked).
87 //
88 // Note that we do not check if any writers are waiting. This is because a call to `downgrade`
89 // implies that the caller wants other readers to read the value protected by the lock. If we
90 // did not allow readers to acquire the lock before writers after a `downgrade`, then only the
91 // original writer would be able to read the value, thus defeating the purpose of `downgrade`.
92 state & MASK < MAX_READERS
93 && !has_readers_waiting(state)
94 && !is_write_locked(state)
95 && !is_unlocked(state)
96}
97
98#[inline]
99const fn has_reached_max_readers(state: Primitive) -> bool {
100 state & MASK == MAX_READERS
101}
102
103impl RwLock {
104 #[allow(unused)]
105 #[inline]
106 pub const fn new() -> Self {
107 Self {
108 state: Futex::new(0),
109 writer_notify: Futex::new(0),
110 _reserved8: 0,
111 _reserved10: 0,
112 _reserved18: 0,
113 _reserved20: 0,
114 _reserved28: 0,
115 _reserved30: 0,
116 }
117 }
118
119 #[inline]
120 pub fn try_read(&self) -> bool {
121 self.state
122 .fetch_update(Acquire, Relaxed, |s| is_read_lockable(s).then(|| s + READ_LOCKED))
123 .is_ok()
124 }
125
126 #[inline]
127 pub fn read(&self) {
128 let state = self.state.load(Relaxed);
129 if !is_read_lockable(state)
130 || self
131 .state
132 .compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
133 .is_err()
134 {
135 self.read_contended();
136 }
137 }
138
139 /// # Safety
140 ///
141 /// The `RwLock` must be read-locked (N readers) in order to call this.
142 #[inline]
143 pub unsafe fn read_unlock(&self) {
144 let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
145
146 // It's impossible for a reader to be waiting on a read-locked RwLock,
147 // except if there is also a writer waiting.
148 debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state));
149
150 // Wake up a writer if we were the last reader and there's a writer waiting.
151 if is_unlocked(state) && has_writers_waiting(state) {
152 self.wake_writer_or_readers(state);
153 }
154 }
155
156 #[cold]
157 fn read_contended(&self) {
158 let mut has_slept = false;
159 let mut state = self.spin_read();
160
161 loop {
162 // If we have just been woken up, first check for a `downgrade` call.
163 // Otherwise, if we can read-lock it, lock it.
164 if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) {
165 match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
166 {
167 Ok(_) => return, // Locked!
168 Err(s) => {
169 state = s;
170 continue;
171 }
172 }
173 }
174
175 // Check for overflow.
176 assert!(!has_reached_max_readers(state), "too many active read locks on RwLock");
177
178 // Make sure the readers waiting bit is set before we go to sleep.
179 if !has_readers_waiting(state) {
180 if let Err(s) =
181 self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed)
182 {
183 state = s;
184 continue;
185 }
186 }
187
188 // Wait for the state to change.
189 futex_wait(&self.state, state | READERS_WAITING, None);
190 has_slept = true;
191
192 // Spin again after waking up.
193 state = self.spin_read();
194 }
195 }
196
197 #[inline]
198 pub fn try_write(&self) -> bool {
199 self.state
200 .fetch_update(Acquire, Relaxed, |s| is_unlocked(s).then(|| s + WRITE_LOCKED))
201 .is_ok()
202 }
203
204 #[inline]
205 pub fn write(&self) {
206 if self.state.compare_exchange_weak(0, WRITE_LOCKED, Acquire, Relaxed).is_err() {
207 self.write_contended();
208 }
209 }
210
211 /// # Safety
212 ///
213 /// The `RwLock` must be write-locked (single writer) in order to call this.
214 #[inline]
215 pub unsafe fn write_unlock(&self) {
216 let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
217
218 debug_assert!(is_unlocked(state));
219
220 if has_writers_waiting(state) || has_readers_waiting(state) {
221 self.wake_writer_or_readers(state);
222 }
223 }
224
225 /// # Safety
226 ///
227 /// The `RwLock` must be write-locked (single writer) in order to call this.
228 #[inline]
229 pub unsafe fn downgrade(&self) {
230 // Removes all write bits and adds a single read bit.
231 let state = self.state.fetch_add(DOWNGRADE, Release);
232 debug_assert!(is_write_locked(state), "RwLock must be write locked to call `downgrade`");
233
234 if has_readers_waiting(state) {
235 // Since we had the exclusive lock, nobody else can unset this bit.
236 self.state.fetch_sub(READERS_WAITING, Relaxed);
237 futex_wake_all(&self.state);
238 }
239 }
240
241 #[cold]
242 fn write_contended(&self) {
243 let mut state = self.spin_write();
244
245 let mut other_writers_waiting = 0;
246
247 loop {
248 // If it's unlocked, we try to lock it.
249 if is_unlocked(state) {
250 match self.state.compare_exchange_weak(
251 state,
252 state | WRITE_LOCKED | other_writers_waiting,
253 Acquire,
254 Relaxed,
255 ) {
256 Ok(_) => return, // Locked!
257 Err(s) => {
258 state = s;
259 continue;
260 }
261 }
262 }
263
264 // Set the waiting bit indicating that we're waiting on it.
265 if !has_writers_waiting(state) {
266 if let Err(s) =
267 self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed)
268 {
269 state = s;
270 continue;
271 }
272 }
273
274 // Other writers might be waiting now too, so we should make sure
275 // we keep that bit on once we manage lock it.
276 other_writers_waiting = WRITERS_WAITING;
277
278 // Examine the notification counter before we check if `state` has changed,
279 // to make sure we don't miss any notifications.
280 let seq = self.writer_notify.load(Acquire);
281
282 // Don't go to sleep if the lock has become available,
283 // or if the writers waiting bit is no longer set.
284 state = self.state.load(Relaxed);
285 if is_unlocked(state) || !has_writers_waiting(state) {
286 continue;
287 }
288
289 // Wait for the state to change.
290 futex_wait(&self.writer_notify, seq, None);
291
292 // Spin again after waking up.
293 state = self.spin_write();
294 }
295 }
296
297 /// Wakes up waiting threads after unlocking.
298 ///
299 /// If both are waiting, this will wake up only one writer, but will fall
300 /// back to waking up readers if there was no writer to wake up.
301 #[cold]
302 fn wake_writer_or_readers(&self, mut state: Primitive) {
303 assert!(is_unlocked(state));
304
305 // The readers waiting bit might be turned on at any point now,
306 // since readers will block when there's anything waiting.
307 // Writers will just lock the lock though, regardless of the waiting bits,
308 // so we don't have to worry about the writer waiting bit.
309 //
310 // If the lock gets locked in the meantime, we don't have to do
311 // anything, because then the thread that locked the lock will take
312 // care of waking up waiters when it unlocks.
313
314 // If only writers are waiting, wake one of them up.
315 if state == WRITERS_WAITING {
316 match self.state.compare_exchange(state, 0, Relaxed, Relaxed) {
317 Ok(_) => {
318 self.wake_writer();
319 return;
320 }
321 Err(s) => {
322 // Maybe some readers are now waiting too. So, continue to the next `if`.
323 state = s;
324 }
325 }
326 }
327
328 // If both writers and readers are waiting, leave the readers waiting
329 // and only wake up one writer.
330 if state == READERS_WAITING + WRITERS_WAITING {
331 if self.state.compare_exchange(state, READERS_WAITING, Relaxed, Relaxed).is_err() {
332 // The lock got locked. Not our problem anymore.
333 return;
334 }
335 if self.wake_writer() {
336 return;
337 }
338 // No writers were actually blocked on futex_wait, so we continue
339 // to wake up readers instead, since we can't be sure if we notified a writer.
340 state = READERS_WAITING;
341 }
342
343 // If readers are waiting, wake them all up.
344 if state == READERS_WAITING
345 && self.state.compare_exchange(state, 0, Relaxed, Relaxed).is_ok()
346 {
347 futex_wake_all(&self.state);
348 }
349 }
350
351 /// This wakes one writer and returns true if we woke up a writer that was
352 /// blocked on futex_wait.
353 ///
354 /// If this returns false, it might still be the case that we notified a
355 /// writer that was about to go to sleep.
356 fn wake_writer(&self) -> bool {
357 self.writer_notify.fetch_add(1, Release);
358 futex_wake(&self.writer_notify)
359 // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
360 // up any threads or not, and always return `false` here. That still
361 // results in correct behavior: it just means readers get woken up as
362 // well in case both readers and writers were waiting.
363 }
364
365 /// Spin for a while, but stop directly at the given condition.
366 #[inline]
367 fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive {
368 let mut spin = 100; // Chosen by fair dice roll.
369 loop {
370 let state = self.state.load(Relaxed);
371 if f(state) || spin == 0 {
372 return state;
373 }
374 core::hint::spin_loop();
375 spin -= 1;
376 }
377 }
378
379 #[inline]
380 fn spin_write(&self) -> Primitive {
381 // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair.
382 self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state))
383 }
384
385 #[inline]
386 fn spin_read(&self) -> Primitive {
387 // Stop spinning when it's unlocked or read locked, or when there's waiting threads.
388 self.spin_until(|state| {
389 !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state)
390 })
391 }
392}