parking_lot_core\thread_parker\windows/
keyed_event.rs

1// Copyright 2016 Amanieu d'Antras
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8use core::{
9    ffi,
10    mem::{self, MaybeUninit},
11    ptr,
12};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::time::Instant;
15
16const STATE_UNPARKED: usize = 0;
17const STATE_PARKED: usize = 1;
18const STATE_TIMED_OUT: usize = 2;
19
20use super::bindings::*;
21
22#[allow(non_snake_case)]
23pub struct KeyedEvent {
24    handle: HANDLE,
25    NtReleaseKeyedEvent: extern "system" fn(
26        EventHandle: HANDLE,
27        Key: *mut ffi::c_void,
28        Alertable: BOOLEAN,
29        Timeout: *mut i64,
30    ) -> NTSTATUS,
31    NtWaitForKeyedEvent: extern "system" fn(
32        EventHandle: HANDLE,
33        Key: *mut ffi::c_void,
34        Alertable: BOOLEAN,
35        Timeout: *mut i64,
36    ) -> NTSTATUS,
37}
38
39impl KeyedEvent {
40    #[inline]
41    unsafe fn wait_for(&self, key: *mut ffi::c_void, timeout: *mut i64) -> NTSTATUS {
42        (self.NtWaitForKeyedEvent)(self.handle, key, false.into(), timeout)
43    }
44
45    #[inline]
46    unsafe fn release(&self, key: *mut ffi::c_void) -> NTSTATUS {
47        (self.NtReleaseKeyedEvent)(self.handle, key, false.into(), ptr::null_mut())
48    }
49
50    #[allow(non_snake_case)]
51    pub fn create() -> Option<KeyedEvent> {
52        let ntdll = unsafe { GetModuleHandleA(b"ntdll.dll\0".as_ptr()) };
53        if ntdll == 0 {
54            return None;
55        }
56
57        let NtCreateKeyedEvent =
58            unsafe { GetProcAddress(ntdll, b"NtCreateKeyedEvent\0".as_ptr())? };
59        let NtReleaseKeyedEvent =
60            unsafe { GetProcAddress(ntdll, b"NtReleaseKeyedEvent\0".as_ptr())? };
61        let NtWaitForKeyedEvent =
62            unsafe { GetProcAddress(ntdll, b"NtWaitForKeyedEvent\0".as_ptr())? };
63
64        let NtCreateKeyedEvent: extern "system" fn(
65            KeyedEventHandle: *mut HANDLE,
66            DesiredAccess: u32,
67            ObjectAttributes: *mut ffi::c_void,
68            Flags: u32,
69        ) -> NTSTATUS = unsafe { mem::transmute(NtCreateKeyedEvent) };
70        let mut handle = MaybeUninit::uninit();
71        let status = NtCreateKeyedEvent(
72            handle.as_mut_ptr(),
73            GENERIC_READ | GENERIC_WRITE,
74            ptr::null_mut(),
75            0,
76        );
77        if status != STATUS_SUCCESS {
78            return None;
79        }
80
81        Some(KeyedEvent {
82            handle: unsafe { handle.assume_init() },
83            NtReleaseKeyedEvent: unsafe { mem::transmute(NtReleaseKeyedEvent) },
84            NtWaitForKeyedEvent: unsafe { mem::transmute(NtWaitForKeyedEvent) },
85        })
86    }
87
88    #[inline]
89    pub fn prepare_park(&'static self, key: &AtomicUsize) {
90        key.store(STATE_PARKED, Ordering::Relaxed);
91    }
92
93    #[inline]
94    pub fn timed_out(&'static self, key: &AtomicUsize) -> bool {
95        key.load(Ordering::Relaxed) == STATE_TIMED_OUT
96    }
97
98    #[inline]
99    pub unsafe fn park(&'static self, key: &AtomicUsize) {
100        let status = self.wait_for(key as *const _ as *mut ffi::c_void, ptr::null_mut());
101        debug_assert_eq!(status, STATUS_SUCCESS);
102    }
103
104    #[inline]
105    pub unsafe fn park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool {
106        let now = Instant::now();
107        if timeout <= now {
108            // If another thread unparked us, we need to call
109            // NtWaitForKeyedEvent otherwise that thread will stay stuck at
110            // NtReleaseKeyedEvent.
111            if key.swap(STATE_TIMED_OUT, Ordering::Relaxed) == STATE_UNPARKED {
112                self.park(key);
113                return true;
114            }
115            return false;
116        }
117
118        // NT uses a timeout in units of 100ns. We use a negative value to
119        // indicate a relative timeout based on a monotonic clock.
120        let diff = timeout - now;
121        let value = (diff.as_secs() as i64)
122            .checked_mul(-10000000)
123            .and_then(|x| x.checked_sub((diff.subsec_nanos() as i64 + 99) / 100));
124
125        let mut nt_timeout = match value {
126            Some(x) => x,
127            None => {
128                // Timeout overflowed, just sleep indefinitely
129                self.park(key);
130                return true;
131            }
132        };
133
134        let status = self.wait_for(key as *const _ as *mut ffi::c_void, &mut nt_timeout);
135        if status == STATUS_SUCCESS {
136            return true;
137        }
138        debug_assert_eq!(status, STATUS_TIMEOUT);
139
140        // If another thread unparked us, we need to call NtWaitForKeyedEvent
141        // otherwise that thread will stay stuck at NtReleaseKeyedEvent.
142        if key.swap(STATE_TIMED_OUT, Ordering::Relaxed) == STATE_UNPARKED {
143            self.park(key);
144            return true;
145        }
146        false
147    }
148
149    #[inline]
150    pub unsafe fn unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle {
151        // If the state was STATE_PARKED then we need to wake up the thread
152        if key.swap(STATE_UNPARKED, Ordering::Relaxed) == STATE_PARKED {
153            UnparkHandle {
154                key: key,
155                keyed_event: self,
156            }
157        } else {
158            UnparkHandle {
159                key: ptr::null(),
160                keyed_event: self,
161            }
162        }
163    }
164}
165
166impl Drop for KeyedEvent {
167    #[inline]
168    fn drop(&mut self) {
169        unsafe {
170            let ok = CloseHandle(self.handle);
171            debug_assert_eq!(ok, true.into());
172        }
173    }
174}
175
176// Handle for a thread that is about to be unparked. We need to mark the thread
177// as unparked while holding the queue lock, but we delay the actual unparking
178// until after the queue lock is released.
179pub struct UnparkHandle {
180    key: *const AtomicUsize,
181    keyed_event: &'static KeyedEvent,
182}
183
184impl UnparkHandle {
185    // Wakes up the parked thread. This should be called after the queue lock is
186    // released to avoid blocking the queue for too long.
187    #[inline]
188    pub unsafe fn unpark(self) {
189        if !self.key.is_null() {
190            let status = self.keyed_event.release(self.key as *mut ffi::c_void);
191            debug_assert_eq!(status, STATUS_SUCCESS);
192        }
193    }
194}