Convert map_async from being async to being callback based (#2698)

This commit is contained in:
Connor Fitzgerald 2022-05-31 11:22:21 -04:00 committed by GitHub
parent 8063edc648
commit 32af4f5607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 281 additions and 383 deletions

12
Cargo.lock generated
View File

@ -582,6 +582,17 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
[[package]]
name = "futures-intrusive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62007592ac46aa7c2b6416f7deb9a8a8f63a01e0f1d6e1787d5630170db2b63e"
dependencies = [
"futures-core",
"lock_api",
"parking_lot 0.11.2",
]
[[package]]
name = "futures-io"
version = "0.3.21"
@ -2039,6 +2050,7 @@ dependencies = [
"console_log",
"ddsfile",
"env_logger",
"futures-intrusive",
"glam",
"js-sys",
"log",

View File

@ -83,36 +83,27 @@ pub async fn op_webgpu_buffer_get_map_async(
.get::<super::WebGpuDevice>(device_rid)?;
device = device_resource.0;
let boxed_sender = Box::new(sender);
let sender_ptr = Box::into_raw(boxed_sender) as *mut u8;
extern "C" fn buffer_map_future_wrapper(
status: wgpu_core::resource::BufferMapAsyncStatus,
user_data: *mut u8,
) {
let sender_ptr = user_data as *mut oneshot::Sender<Result<(), AnyError>>;
let boxed_sender = unsafe { Box::from_raw(sender_ptr) };
boxed_sender
let callback = Box::new(move |status| {
sender
.send(match status {
wgpu_core::resource::BufferMapAsyncStatus::Success => Ok(()),
_ => unreachable!(), // TODO
})
.unwrap();
}
});
// TODO(lucacasonato): error handling
let maybe_err = gfx_select!(buffer => instance.buffer_map_async(
buffer,
offset..(offset + size),
wgpu_core::resource::BufferMapOperation {
host: match mode {
1 => wgpu_core::device::HostMap::Read,
2 => wgpu_core::device::HostMap::Write,
_ => unreachable!(),
},
callback: buffer_map_future_wrapper,
user_data: sender_ptr,
}
buffer,
offset..(offset + size),
wgpu_core::resource::BufferMapOperation {
host: match mode {
1 => wgpu_core::device::HostMap::Read,
2 => wgpu_core::device::HostMap::Write,
_ => unreachable!(),
},
callback: wgpu_core::resource::BufferMapCallback::from_rust(callback),
}
))
.err();

View File

@ -14,7 +14,7 @@ use std::{
fs::{read_to_string, File},
io::{Read, Seek, SeekFrom},
path::{Path, PathBuf},
ptr, slice,
slice,
};
#[derive(serde::Deserialize)]
@ -55,7 +55,7 @@ struct Test<'a> {
actions: Vec<wgc::device::trace::Action<'a>>,
}
extern "C" fn map_callback(status: wgc::resource::BufferMapAsyncStatus, _user_data: *mut u8) {
fn map_callback(status: wgc::resource::BufferMapAsyncStatus) {
match status {
wgc::resource::BufferMapAsyncStatus::Success => (),
_ => panic!("Unable to map"),
@ -112,8 +112,9 @@ impl Test<'_> {
expect.offset .. expect.offset+expect.data.len() as wgt::BufferAddress,
wgc::resource::BufferMapOperation {
host: wgc::device::HostMap::Read,
callback: map_callback,
user_data: ptr::null_mut(),
callback: wgc::resource::BufferMapCallback::from_rust(
Box::new(map_callback)
),
}
))
.unwrap();

View File

@ -440,15 +440,18 @@ impl<A: hal::Api> LifetimeTracker<A> {
}
}
pub fn add_work_done_closure(&mut self, closure: SubmittedWorkDoneClosure) -> bool {
pub fn add_work_done_closure(
&mut self,
closure: SubmittedWorkDoneClosure,
) -> Option<SubmittedWorkDoneClosure> {
match self.active.last_mut() {
Some(active) => {
active.work_done_closures.push(closure);
true
None
}
// Note: we can't immediately invoke the closure, since it assumes
// nothing is currently locked in the hubs.
None => false,
None => Some(closure),
}
}
}

View File

@ -141,14 +141,14 @@ impl UserClosures {
self.submissions.extend(other.submissions);
}
unsafe fn fire(self) {
//Note: this logic is specifically moved out of `handle_mapping()` in order to
fn fire(self) {
// Note: this logic is specifically moved out of `handle_mapping()` in order to
// have nothing locked by the time we execute users callback code.
for (operation, status) in self.mappings {
(operation.callback)(status, operation.user_data);
operation.callback.call(status);
}
for closure in self.submissions {
(closure.callback)(closure.user_data);
closure.call();
}
}
}
@ -4968,9 +4968,9 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
.map_err(|_| DeviceError::Invalid)?
.maintain(hub, force_wait, &mut token)?
};
unsafe {
closures.fire();
}
closures.fire();
Ok(queue_empty)
}
@ -5048,9 +5048,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
self.poll_devices::<hal::api::Gles>(force_wait, &mut closures)? && all_queue_empty;
}
unsafe {
closures.fire();
}
closures.fire();
Ok(all_queue_empty)
}
@ -5157,7 +5155,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
return Err(resource::BufferAccessError::AlreadyMapped);
}
resource::BufferMapState::Waiting(_) => {
op.call_error();
op.callback.call_error();
return Ok(());
}
resource::BufferMapState::Idle => {
@ -5374,9 +5372,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
//Note: outside inner function so no locks are held when calling the callback
let closure = self.buffer_unmap_inner::<A>(buffer_id)?;
if let Some((operation, status)) = closure {
unsafe {
(operation.callback)(status, operation.user_data);
}
operation.callback.call(status);
}
Ok(())
}

View File

@ -28,16 +28,56 @@ use thiserror::Error;
/// without a concrete moment of when it can be cleared.
const WRITE_COMMAND_BUFFERS_PER_POOL: usize = 64;
pub type OnSubmittedWorkDoneCallback = unsafe extern "C" fn(user_data: *mut u8);
#[repr(C)]
#[derive(Clone, Copy, Debug)]
pub struct SubmittedWorkDoneClosure {
pub callback: OnSubmittedWorkDoneCallback,
pub user_data: *mut u8,
pub struct SubmittedWorkDoneClosureC {
callback: unsafe extern "C" fn(user_data: *mut u8),
user_data: *mut u8,
}
unsafe impl Send for SubmittedWorkDoneClosure {}
unsafe impl Sync for SubmittedWorkDoneClosure {}
unsafe impl Send for SubmittedWorkDoneClosureC {}
pub struct SubmittedWorkDoneClosure {
// We wrap this so creating the enum in the C variant can be unsafe,
// allowing our call function to be safe.
inner: SubmittedWorkDoneClosureInner,
}
enum SubmittedWorkDoneClosureInner {
Rust {
callback: Box<dyn FnOnce() + Send + 'static>,
},
C {
inner: SubmittedWorkDoneClosureC,
},
}
impl SubmittedWorkDoneClosure {
pub fn from_rust(callback: Box<dyn FnOnce() + Send + 'static>) -> Self {
Self {
inner: SubmittedWorkDoneClosureInner::Rust { callback },
}
}
/// # Safety
///
/// - The callback pointer must be valid to call with the provided user_data pointer.
/// - Both pointers must point to 'static data as the callback may happen at an unspecified time.
pub unsafe fn from_c(inner: SubmittedWorkDoneClosureC) -> Self {
Self {
inner: SubmittedWorkDoneClosureInner::C { inner },
}
}
pub(crate) fn call(self) {
match self.inner {
SubmittedWorkDoneClosureInner::Rust { callback } => callback(),
// SAFETY: the contract of the call to from_c says that this unsafe is sound.
SubmittedWorkDoneClosureInner::C { inner } => unsafe {
(inner.callback)(inner.user_data)
},
}
}
}
struct StagingData<A: hal::Api> {
buffer: A::Buffer,
@ -932,9 +972,8 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
};
// the closures should execute with nothing locked!
unsafe {
callbacks.fire();
}
callbacks.fire();
Ok(())
}
@ -957,7 +996,7 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
closure: SubmittedWorkDoneClosure,
) -> Result<(), InvalidQueue> {
//TODO: flush pending writes
let added = {
let closure_opt = {
let hub = A::hub(self);
let mut token = Token::root();
let (device_guard, mut token) = hub.devices.read(&mut token);
@ -966,10 +1005,8 @@ impl<G: GlobalIdentityHandlerFactory> Global<G> {
Err(_) => return Err(InvalidQueue),
}
};
if !added {
unsafe {
(closure.callback)(closure.user_data);
}
if let Some(closure) = closure_opt {
closure.call();
}
Ok(())
}

View File

@ -23,7 +23,6 @@ pub enum BufferMapAsyncStatus {
ContextLost,
}
#[derive(Debug)]
pub(crate) enum BufferMapState<A: hal::Api> {
/// Mapped at creation.
Init {
@ -46,27 +45,65 @@ pub(crate) enum BufferMapState<A: hal::Api> {
unsafe impl<A: hal::Api> Send for BufferMapState<A> {}
unsafe impl<A: hal::Api> Sync for BufferMapState<A> {}
pub type BufferMapCallback = unsafe extern "C" fn(status: BufferMapAsyncStatus, userdata: *mut u8);
#[repr(C)]
#[derive(Debug)]
pub struct BufferMapCallbackC {
callback: unsafe extern "C" fn(status: BufferMapAsyncStatus, user_data: *mut u8),
user_data: *mut u8,
}
unsafe impl Send for BufferMapCallbackC {}
pub struct BufferMapCallback {
// We wrap this so creating the enum in the C variant can be unsafe,
// allowing our call function to be safe.
inner: BufferMapCallbackInner,
}
enum BufferMapCallbackInner {
Rust {
callback: Box<dyn FnOnce(BufferMapAsyncStatus) + Send + 'static>,
},
C {
inner: BufferMapCallbackC,
},
}
impl BufferMapCallback {
pub fn from_rust(callback: Box<dyn FnOnce(BufferMapAsyncStatus) + Send + 'static>) -> Self {
Self {
inner: BufferMapCallbackInner::Rust { callback },
}
}
/// # Safety
///
/// - The callback pointer must be valid to call with the provided user_data pointer.
/// - Both pointers must point to 'static data as the callback may happen at an unspecified time.
pub unsafe fn from_c(inner: BufferMapCallbackC) -> Self {
Self {
inner: BufferMapCallbackInner::C { inner },
}
}
pub(crate) fn call(self, status: BufferMapAsyncStatus) {
match self.inner {
BufferMapCallbackInner::Rust { callback } => callback(status),
// SAFETY: the contract of the call to from_c says that this unsafe is sound.
BufferMapCallbackInner::C { inner } => unsafe {
(inner.callback)(status, inner.user_data)
},
}
}
pub(crate) fn call_error(self) {
log::error!("wgpu_buffer_map_async failed: buffer mapping is pending");
self.call(BufferMapAsyncStatus::Error);
}
}
pub struct BufferMapOperation {
pub host: HostMap,
pub callback: BufferMapCallback,
pub user_data: *mut u8,
}
//TODO: clarify if/why this is needed here
unsafe impl Send for BufferMapOperation {}
unsafe impl Sync for BufferMapOperation {}
impl BufferMapOperation {
pub(crate) fn call_error(self) {
log::error!("wgpu_buffer_map_async failed: buffer mapping is pending");
unsafe {
(self.callback)(BufferMapAsyncStatus::Error, self.user_data);
}
}
}
#[derive(Clone, Debug, Error)]
@ -105,7 +142,6 @@ pub enum BufferAccessError {
},
}
#[derive(Debug)]
pub(crate) struct BufferPendingMapping {
pub range: Range<wgt::BufferAddress>,
pub op: BufferMapOperation,
@ -115,7 +151,6 @@ pub(crate) struct BufferPendingMapping {
pub type BufferDescriptor<'a> = wgt::BufferDescriptor<Label<'a>>;
#[derive(Debug)]
pub struct Buffer<A: hal::Api> {
pub(crate) raw: Option<A::Buffer>,
pub(crate) device_id: Stored<DeviceId>,

View File

@ -123,6 +123,7 @@ bitflags = "1"
bytemuck = { version = "1.4", features = ["derive"] }
glam = "0.20.2"
ddsfile = "0.5"
futures-intrusive = "0.4"
log = "0.4"
# Opt out of noise's "default-features" to avoid "image" feature as a dependency count optimization.
# This will not be required in the next release since it has been removed from the default feature in https://github.com/Razaekel/noise-rs/commit/1af9e1522236b2c584fb9a02150c9c67a5e6bb04#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542

View File

@ -126,7 +126,9 @@ async fn create_png(
) {
// Note that we're not calling `.await` here.
let buffer_slice = output_buffer.slice(..);
let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read);
// Sets the buffer up for mapping, sending over the result of the mapping back to us when it is finished.
let (sender, receiver) = futures_intrusive::channel::shared::oneshot_channel();
buffer_slice.map_async(wgpu::MapMode::Read, move |v| sender.send(v).unwrap());
// Poll the device in a blocking manner so that our future resolves.
// In an actual application, `device.poll(...)` should
@ -138,7 +140,7 @@ async fn create_png(
return;
}
if let Ok(()) = buffer_future.await {
if let Some(Ok(())) = receiver.receive().await {
let padded_buffer = buffer_slice.get_mapped_range();
let mut png_encoder = png::Encoder::new(
@ -214,18 +216,15 @@ mod tests {
#[test]
fn ensure_generated_data_matches_expected() {
pollster::block_on(assert_generated_data_matches_expected());
assert_generated_data_matches_expected();
}
async fn assert_generated_data_matches_expected() {
fn assert_generated_data_matches_expected() {
let (device, output_buffer, dimensions) =
create_red_image_with_dimensions(100usize, 200usize).await;
let buffer_slice = output_buffer.slice(..);
let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read);
buffer_slice.map_async(wgpu::MapMode::Read, |_| ());
device.poll(wgpu::Maintain::Wait);
buffer_future
.await
.expect("failed to map buffer slice for capture test");
let padded_buffer = buffer_slice.get_mapped_range();
let expected_buffer_size = dimensions.padded_bytes_per_row * dimensions.height;
assert_eq!(padded_buffer.len(), expected_buffer_size);

View File

@ -517,7 +517,7 @@ pub fn test<E: Example>(mut params: FrameworkRefTest) {
ctx.queue.submit(Some(cmd_buf.finish()));
let dst_buffer_slice = dst_buffer.slice(..);
let _ = dst_buffer_slice.map_async(wgpu::MapMode::Read);
dst_buffer_slice.map_async(wgpu::MapMode::Read, |_| ());
ctx.device.poll(wgpu::Maintain::Wait);
let bytes = dst_buffer_slice.get_mapped_range().to_vec();

View File

@ -147,8 +147,9 @@ async fn execute_gpu_inner(
// Note that we're not calling `.await` here.
let buffer_slice = staging_buffer.slice(..);
// Gets the future representing when `staging_buffer` can be read from
let buffer_future = buffer_slice.map_async(wgpu::MapMode::Read);
// Sets the buffer up for mapping, sending over the result of the mapping back to us when it is finished.
let (sender, receiver) = futures_intrusive::channel::shared::oneshot_channel();
buffer_slice.map_async(wgpu::MapMode::Read, move |v| sender.send(v).unwrap());
// Poll the device in a blocking manner so that our future resolves.
// In an actual application, `device.poll(...)` should
@ -156,7 +157,7 @@ async fn execute_gpu_inner(
device.poll(wgpu::Maintain::Wait);
// Awaits until `buffer_future` can be read from
if let Ok(()) = buffer_future.await {
if let Some(Ok(())) = receiver.receive().await {
// Gets contents of buffer
let data = buffer_slice.get_mapped_range();
// Since contents are got in bytes, this converts these bytes back to u32

View File

@ -380,11 +380,11 @@ impl framework::Example for Example {
queue.submit(Some(init_encoder.finish()));
if let Some(ref query_sets) = query_sets {
// We can ignore the future as we're about to wait for the device.
let _ = query_sets
// We can ignore the callback as we're about to wait for the device.
query_sets
.data_buffer
.slice(..)
.map_async(wgpu::MapMode::Read);
.map_async(wgpu::MapMode::Read, |_| ());
// Wait for device to be done rendering mipmaps
device.poll(wgpu::Maintain::Wait);
// This is guaranteed to be ready.

View File

@ -398,7 +398,7 @@ impl framework::Example for Skybox {
view: &wgpu::TextureView,
device: &wgpu::Device,
queue: &wgpu::Queue,
spawner: &framework::Spawner,
_spawner: &framework::Spawner,
) {
let mut encoder =
device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None });
@ -457,8 +457,7 @@ impl framework::Example for Skybox {
queue.submit(std::iter::once(encoder.finish()));
let belt_future = self.staging_belt.recall();
spawner.spawn_local(belt_future);
self.staging_belt.recall();
}
}

View File

@ -1,8 +1,8 @@
use crate::{
backend::native_gpu_future, AdapterInfo, BindGroupDescriptor, BindGroupLayoutDescriptor,
BindingResource, BufferBinding, CommandEncoderDescriptor, ComputePassDescriptor,
ComputePipelineDescriptor, DownlevelCapabilities, Features, Label, Limits, LoadOp, MapMode,
Operations, PipelineLayoutDescriptor, RenderBundleEncoderDescriptor, RenderPipelineDescriptor,
AdapterInfo, BindGroupDescriptor, BindGroupLayoutDescriptor, BindingResource, BufferBinding,
CommandEncoderDescriptor, ComputePassDescriptor, ComputePipelineDescriptor,
DownlevelCapabilities, Features, Label, Limits, LoadOp, MapMode, Operations,
PipelineLayoutDescriptor, RenderBundleEncoderDescriptor, RenderPipelineDescriptor,
SamplerDescriptor, ShaderModuleDescriptor, ShaderModuleDescriptorSpirV, ShaderSource,
SurfaceStatus, TextureDescriptor, TextureFormat, TextureViewDescriptor,
};
@ -805,8 +805,6 @@ impl crate::Context for Context {
#[allow(clippy::type_complexity)]
type RequestDeviceFuture =
Ready<Result<(Self::DeviceId, Self::QueueId), crate::RequestDeviceError>>;
type MapAsyncFuture = native_gpu_future::GpuFuture<Result<(), crate::BufferAsyncError>>;
type OnSubmittedWorkDoneFuture = native_gpu_future::GpuFuture<()>;
type PopErrorScopeFuture = Ready<Option<crate::Error>>;
fn init(backends: wgt::Backends) -> Self {
@ -1622,28 +1620,20 @@ impl crate::Context for Context {
buffer: &Self::BufferId,
mode: MapMode,
range: Range<wgt::BufferAddress>,
) -> Self::MapAsyncFuture {
let (future, completion) = native_gpu_future::new_gpu_future();
extern "C" fn buffer_map_future_wrapper(
status: wgc::resource::BufferMapAsyncStatus,
user_data: *mut u8,
) {
let completion =
unsafe { native_gpu_future::GpuFutureCompletion::from_raw(user_data as _) };
completion.complete(match status {
wgc::resource::BufferMapAsyncStatus::Success => Ok(()),
_ => Err(crate::BufferAsyncError),
})
}
callback: impl FnOnce(Result<(), crate::BufferAsyncError>) + Send + 'static,
) {
let operation = wgc::resource::BufferMapOperation {
host: match mode {
MapMode::Read => wgc::device::HostMap::Read,
MapMode::Write => wgc::device::HostMap::Write,
},
callback: buffer_map_future_wrapper,
user_data: completion.into_raw() as _,
callback: wgc::resource::BufferMapCallback::from_rust(Box::new(|status| {
let res = match status {
wgc::resource::BufferMapAsyncStatus::Success => Ok(()),
_ => Err(crate::BufferAsyncError),
};
callback(res);
})),
};
let global = &self.0;
@ -1651,7 +1641,6 @@ impl crate::Context for Context {
Ok(()) => (),
Err(cause) => self.handle_error_nolabel(&buffer.error_sink, cause, "Buffer::map_async"),
}
future
}
fn buffer_get_mapped_range(
@ -2216,26 +2205,15 @@ impl crate::Context for Context {
fn queue_on_submitted_work_done(
&self,
queue: &Self::QueueId,
) -> Self::OnSubmittedWorkDoneFuture {
let (future, completion) = native_gpu_future::new_gpu_future();
extern "C" fn submitted_work_done_future_wrapper(user_data: *mut u8) {
let completion =
unsafe { native_gpu_future::GpuFutureCompletion::from_raw(user_data as _) };
completion.complete(())
}
let closure = wgc::device::queue::SubmittedWorkDoneClosure {
callback: submitted_work_done_future_wrapper,
user_data: completion.into_raw() as _,
};
callback: Box<dyn FnOnce() + Send + 'static>,
) {
let closure = wgc::device::queue::SubmittedWorkDoneClosure::from_rust(callback);
let global = &self.0;
let res = wgc::gfx_select!(queue => global.queue_on_submitted_work_done(*queue, closure));
if let Err(cause) = res {
self.handle_error_fatal(cause, "Queue::on_submitted_work_done");
}
future
}
fn device_start_capture(&self, device: &Self::DeviceId) {

View File

@ -7,6 +7,3 @@ pub(crate) use web::{BufferMappedRange, Context};
mod direct;
#[cfg(any(not(target_arch = "wasm32"), feature = "webgl"))]
pub(crate) use direct::{BufferMappedRange, Context};
#[cfg(any(not(target_arch = "wasm32"), feature = "webgl"))]
mod native_gpu_future;

View File

@ -1,143 +0,0 @@
//! Futures that can be resolved when the GPU completes a task.
//!
//! This module defines the [`GpuFuture`] and [`GpuFutureCompletion`]
//! types, which `wgpu` uses to communicate to users when GPU
//! operations have completed, and when resources are ready to access.
//! This is only used by the `direct` back end, not on the web.
//!
//! The life cycle of a `GpuFuture` is as follows:
//!
//! - Calling [`new_gpu_future`] constructs a paired `GpuFuture` and
//! `GpuFutureCompletion`.
//!
//! - Calling [`complete(v)`] on a `GpuFutureCompletion` marks its
//! paired `GpuFuture` as ready with value `v`. This also wakes
//! the most recent [`Waker`] the future was polled with, if any.
//!
//! - Polling a `GpuFuture` either returns `v` if it is ready, or
//! saves the `Waker` passed to [`Future::poll`], to be awoken
//! when `complete` is called on the paired `GpuFutureCompletion`.
//!
//! ## Communicating with `wgpu_core`
//!
//! The `wgpu_core` crate uses various specialized callback types,
//! like [`wgpu_core::resource::BufferMapOperation`] for reporting
//! buffers that are ready to map, or
//! [`wgpu_core::device::queue::SubmittedWorkDoneClosure`] for
//! reporting the completion of submitted commands. To support FFI
//! bindings, these are unsafe, low-level structures that usually have
//! a function pointer and a untyped, raw "closure" pointer.
//!
//! Calling [`GpuFutureCompletion::into_raw`] returns a raw opaque
//! pointer suitable for use as the "closure" pointer in `wgpu_core`'s
//! callbacks. The [`GpuFutureCompletion::from_raw`] converts such a
//! raw opaque pointer back into a [`GpuFutureCompletion`]. See the
//! direct back end's implementation of [`Context::buffer_map_async`]
//! for an example of this.
//!
//! [`complete(v)`]: GpuFutureCompletion::complete
//! [`Waker`]: std::task::Waker
//! [`Future::poll`]: std::future::Future::poll
//! [`wgpu_core::resource::BufferMapOperation`]: https://docs.rs/wgpu-core/latest/wgpu_core/resource/struct.BufferMapOperation.html
//! [`wgpu_core::device::queue::SubmittedWorkDoneClosure`]: https://docs.rs/wgpu-core/latest/wgpu_core/device/queue/struct.SubmittedWorkDoneClosure.html
//! [`Context::buffer_map_async`]: crate::Context::buffer_map_async
use parking_lot::Mutex;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
/// The current state of a `GpuFuture`.
enum WakerOrResult<T> {
/// The last [`Waker`] used to poll this future, if any.
///
/// [`Waker`]: std::task::Waker
Waker(Waker),
/// The value this future resolves to, if it is ready.
Result(T),
}
/// The shared state of a [`GpuFuture`] and its [`GpuFutureCompletion`].
///
/// Polling the future when it is not yet ready stores the [`Waker`]
/// here; completing the future when it has not yet been polled stores
/// the value here. See [`WakerOrResult`] for details.
type GpuFutureData<T> = Mutex<Option<WakerOrResult<T>>>;
/// A [`Future`] that will be ready when some sort of GPU activity has finished.
///
/// Call [`new_gpu_future`] to create a `GpuFuture`, along with a
/// paired `GpuFutureCompletion` that can be used to mark it as ready.
pub struct GpuFuture<T> {
data: Arc<GpuFutureData<T>>,
}
/// An opaque type used for pointers to a [`GpuFutureCompletion`]'s guts.
pub enum OpaqueData {}
//TODO: merge this with `GpuFuture` and avoid `Arc` on the data.
/// A completion handle to set the result on a [`GpuFuture`].
pub struct GpuFutureCompletion<T> {
data: Arc<GpuFutureData<T>>,
}
impl<T> Future for GpuFuture<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let mut waker_or_result = self.into_ref().get_ref().data.lock();
match waker_or_result.take() {
Some(WakerOrResult::Result(res)) => Poll::Ready(res),
_ => {
*waker_or_result = Some(WakerOrResult::Waker(context.waker().clone()));
Poll::Pending
}
}
}
}
impl<T> GpuFutureCompletion<T> {
/// Mark our paired [`GpuFuture`] as ready, with the given `value`.
pub fn complete(self, value: T) {
let mut waker_or_result = self.data.lock();
match waker_or_result.replace(WakerOrResult::Result(value)) {
Some(WakerOrResult::Waker(waker)) => waker.wake(),
None => {}
Some(WakerOrResult::Result(_)) => {
// Drop before panicking. Not sure if this is necessary, but it makes me feel better.
drop(waker_or_result);
unreachable!()
}
};
}
/// Convert this `GpuFutureCompletion` into a raw pointer for `wgpu_core` to hold.
pub(crate) fn into_raw(self) -> *mut OpaqueData {
Arc::into_raw(self.data) as _
}
/// Convert a raw pointer returned by [`into_raw`] back into a `GpuFutureCompletion`.
///
/// [`into_raw`]: GpuFutureCompletion::into_raw
pub(crate) unsafe fn from_raw(this: *mut OpaqueData) -> Self {
Self {
data: Arc::from_raw(this as _),
}
}
}
/// Construct a fresh [`GpuFuture`] and a paired [`GpuFutureCompletion`].
///
/// See the module docs for details.
pub(crate) fn new_gpu_future<T>() -> (GpuFuture<T>, GpuFutureCompletion<T>) {
let data = Arc::new(Mutex::new(None));
(
GpuFuture {
data: Arc::clone(&data),
},
GpuFutureCompletion { data },
)
}

View File

@ -1,10 +1,12 @@
#![allow(clippy::type_complexity)]
use std::{
cell::RefCell,
fmt,
future::Future,
ops::Range,
pin::Pin,
rc::Rc,
task::{self, Poll},
};
use wasm_bindgen::{prelude::*, JsCast};
@ -935,10 +937,6 @@ fn future_request_device(
.map_err(|_| crate::RequestDeviceError)
}
fn future_map_async(result: JsFutureResult) -> Result<(), crate::BufferAsyncError> {
result.map(|_| ()).map_err(|_| crate::BufferAsyncError)
}
fn future_pop_error_scope(result: JsFutureResult) -> Option<crate::Error> {
match result {
Ok(js_value) if js_value.is_object() => {
@ -1006,12 +1004,6 @@ impl crate::Context for Context {
wasm_bindgen_futures::JsFuture,
fn(JsFutureResult) -> Result<(Self::DeviceId, Self::QueueId), crate::RequestDeviceError>,
>;
type MapAsyncFuture = MakeSendFuture<
wasm_bindgen_futures::JsFuture,
fn(JsFutureResult) -> Result<(), crate::BufferAsyncError>,
>;
type OnSubmittedWorkDoneFuture =
MakeSendFuture<wasm_bindgen_futures::JsFuture, fn(JsFutureResult) -> ()>;
type PopErrorScopeFuture =
MakeSendFuture<wasm_bindgen_futures::JsFuture, fn(JsFutureResult) -> Option<crate::Error>>;
@ -1743,17 +1735,30 @@ impl crate::Context for Context {
buffer: &Self::BufferId,
mode: crate::MapMode,
range: Range<wgt::BufferAddress>,
) -> Self::MapAsyncFuture {
callback: impl FnOnce(Result<(), crate::BufferAsyncError>) + Send + 'static,
) {
let map_promise = buffer.0.map_async_with_f64_and_f64(
map_map_mode(mode),
range.start as f64,
(range.end - range.start) as f64,
);
MakeSendFuture::new(
wasm_bindgen_futures::JsFuture::from(map_promise),
future_map_async,
)
// Both the 'success' and 'rejected' closures need access to callback, but only one
// of them will ever run. We have them both hold a reference to a `Rc<RefCell<Option<impl FnOnce...>>>`,
// and then take ownership of callback when invoked.
//
// We also only need Rc's because these will only ever be called on our thread.
let rc_callback = Rc::new(RefCell::new(Some(callback)));
let rc_callback_clone = rc_callback.clone();
let closure_success = wasm_bindgen::closure::Closure::once(move |_| {
rc_callback.borrow_mut().take().unwrap()(Ok(()))
});
let closure_rejected = wasm_bindgen::closure::Closure::once(move |_| {
rc_callback_clone.borrow_mut().take().unwrap()(Err(crate::BufferAsyncError))
});
let _ = map_promise.then2(&closure_success, &closure_rejected);
}
fn buffer_get_mapped_range(
@ -2221,7 +2226,8 @@ impl crate::Context for Context {
fn queue_on_submitted_work_done(
&self,
_queue: &Self::QueueId,
) -> Self::OnSubmittedWorkDoneFuture {
_callback: Box<dyn FnOnce() + Send + 'static>,
) {
unimplemented!()
}

View File

@ -191,8 +191,6 @@ trait Context: Debug + Send + Sized + Sync {
type RequestAdapterFuture: Future<Output = Option<Self::AdapterId>> + Send;
type RequestDeviceFuture: Future<Output = Result<(Self::DeviceId, Self::QueueId), RequestDeviceError>>
+ Send;
type MapAsyncFuture: Future<Output = Result<(), BufferAsyncError>> + Send;
type OnSubmittedWorkDoneFuture: Future<Output = ()> + Send;
type PopErrorScopeFuture: Future<Output = Option<Error>> + Send;
fn init(backends: Backends) -> Self;
@ -336,7 +334,11 @@ trait Context: Debug + Send + Sized + Sync {
buffer: &Self::BufferId,
mode: MapMode,
range: Range<BufferAddress>,
) -> Self::MapAsyncFuture;
// Note: we keep this as an `impl` through the context because the native backend
// needs to wrap it with a wrapping closure. queue_on_submitted_work_done doesn't
// need this wrapping closure, so can be made a Box immediately.
callback: impl FnOnce(Result<(), BufferAsyncError>) + Send + 'static,
);
fn buffer_get_mapped_range(
&self,
buffer: &Self::BufferId,
@ -495,7 +497,12 @@ trait Context: Debug + Send + Sized + Sync {
fn queue_on_submitted_work_done(
&self,
queue: &Self::QueueId,
) -> Self::OnSubmittedWorkDoneFuture;
// Note: we force the caller to box this because neither backend needs to
// wrap the callback and this prevents us from needing to make more functions
// generic than we have to. `buffer_map_async` needs to be wrapped on the native
// backend, so we don't box until after it has been wrapped.
callback: Box<dyn FnOnce() + Send + 'static>,
);
fn device_start_capture(&self, device: &Self::DeviceId);
fn device_stop_capture(&self, device: &Self::DeviceId);
@ -2379,20 +2386,20 @@ impl Buffer {
}
impl<'a> BufferSlice<'a> {
//TODO: fn slice(&self) -> Self
/// Map the buffer. Buffer is ready to map once the future is resolved.
/// Map the buffer. Buffer is ready to map once the callback is called.
///
/// For the future to complete, `device.poll(...)` must be called elsewhere in the runtime, possibly integrated
/// into an event loop, run on a separate thread, or continually polled in the same task runtime that this
/// future will be run on.
/// For the callback to complete, either `queue.submit(..)`, `instance.poll_all(..)`, or `device.poll(..)`
/// must be called elsewhere in the runtime, possibly integrated into an event loop or run on a separate thread.
///
/// It's expected that wgpu will eventually supply its own event loop infrastructure that will be easy to integrate
/// into other event loops, like winit's.
/// The callback will be called on the thread that first calls the above functions after the gpu work
/// has completed. There are no restrictions on the code you can run in the callback, however on native the
/// call to the function will not complete until the callback returns, so prefer keeping callbacks short
/// and used to set flags, send messages, etc.
pub fn map_async(
&self,
mode: MapMode,
) -> impl Future<Output = Result<(), BufferAsyncError>> + Send {
callback: impl FnOnce(Result<(), BufferAsyncError>) + Send + 'static,
) {
let mut mc = self.buffer.map_context.lock();
assert_eq!(
mc.initial_range,
@ -2411,6 +2418,7 @@ impl<'a> BufferSlice<'a> {
&self.buffer.id,
mode,
self.offset..end,
callback,
)
}
@ -3383,10 +3391,19 @@ impl Queue {
Context::queue_get_timestamp_period(&*self.context, &self.id)
}
/// Returns a future that resolves once all the work submitted by this point
/// is done processing on GPU.
pub fn on_submitted_work_done(&self) -> impl Future<Output = ()> + Send {
Context::queue_on_submitted_work_done(&*self.context, &self.id)
/// Registers a callback when the previous call to submit finishes running on the gpu. This callback
/// being called implies that all mapped buffer callbacks attached to the same submission have also
/// been called.
///
/// For the callback to complete, either `queue.submit(..)`, `instance.poll_all(..)`, or `device.poll(..)`
/// must be called elsewhere in the runtime, possibly integrated into an event loop or run on a separate thread.
///
/// The callback will be called on the thread that first calls the above functions after the gpu work
/// has completed. There are no restrictions on the code you can run in the callback, however on native the
/// call to the function will not complete until the callback returns, so prefer keeping callbacks short
/// and used to set flags, send messages, etc.
pub fn on_submitted_work_done(&self, callback: impl FnOnce() + Send + 'static) {
Context::queue_on_submitted_work_done(&*self.context, &self.id, Box::new(callback))
}
}

View File

@ -3,44 +3,10 @@ use crate::{
CommandEncoder, Device, MapMode,
};
use std::fmt;
use std::pin::Pin;
use std::task::{self, Poll};
use std::{future::Future, sync::mpsc};
// Given a vector of futures, poll each in parallel until all are ready.
struct Join<F> {
futures: Vec<Option<F>>,
}
impl<F: Future<Output = ()>> Future for Join<F> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
// This is safe because we have no Drop implementation to violate the Pin requirements and
// do not provide any means of moving the inner futures.
let all_ready = unsafe {
// Poll all remaining futures, removing all that are ready
self.get_unchecked_mut().futures.iter_mut().all(|opt| {
if let Some(future) = opt {
if Pin::new_unchecked(future).poll(cx) == Poll::Ready(()) {
*opt = None;
}
}
opt.is_none()
})
};
if all_ready {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
use std::sync::{mpsc, Arc};
struct Chunk {
buffer: Buffer,
buffer: Arc<Buffer>,
size: BufferAddress,
offset: BufferAddress,
}
@ -116,12 +82,12 @@ impl StagingBelt {
} else {
let size = self.chunk_size.max(size.get());
Chunk {
buffer: device.create_buffer(&BufferDescriptor {
buffer: Arc::new(device.create_buffer(&BufferDescriptor {
label: Some("(wgpu internal) StagingBelt staging buffer"),
size,
usage: BufferUsages::MAP_WRITE | BufferUsages::COPY_SRC,
mapped_at_creation: true,
}),
})),
size,
offset: 0,
}
@ -158,31 +124,23 @@ impl StagingBelt {
/// Recall all of the closed buffers back to be reused.
///
/// This has to be called after the command encoders written to `write_buffer` are submitted!
pub fn recall(&mut self) -> impl Future<Output = ()> + Send {
pub fn recall(&mut self) {
while let Ok(mut chunk) = self.receiver.try_recv() {
chunk.offset = 0;
self.free_chunks.push(chunk);
}
let sender = &self.sender;
let futures = self
.closed_chunks
.drain(..)
.map(|chunk| {
let sender = sender.clone();
let async_buffer = chunk.buffer.slice(..).map_async(MapMode::Write);
Some(async move {
// The result is ignored
async_buffer.await.ok();
// The only possible error is the other side disconnecting, which is fine
for chunk in self.closed_chunks.drain(..) {
let sender = sender.clone();
chunk
.buffer
.clone()
.slice(..)
.map_async(MapMode::Write, move |_| {
let _ = sender.send(chunk);
})
})
.collect::<Vec<_>>();
Join { futures }
});
}
}
}

View File

@ -6,7 +6,7 @@ mod encoder;
mod indirect;
mod init;
use std::future::Future;
use std::sync::Arc;
use std::{
borrow::Cow,
mem::{align_of, size_of},
@ -70,7 +70,7 @@ pub fn make_spirv_raw(data: &[u8]) -> Cow<[u32]> {
}
/// CPU accessible buffer used to download data back from the GPU.
pub struct DownloadBuffer(super::Buffer, super::BufferMappedRange);
pub struct DownloadBuffer(Arc<super::Buffer>, super::BufferMappedRange);
impl DownloadBuffer {
/// Asynchronously read the contents of a buffer.
@ -78,18 +78,19 @@ impl DownloadBuffer {
device: &super::Device,
queue: &super::Queue,
buffer: &super::BufferSlice,
) -> impl Future<Output = Result<Self, super::BufferAsyncError>> + Send {
callback: impl FnOnce(Result<Self, super::BufferAsyncError>) + Send + 'static,
) {
let size = match buffer.size {
Some(size) => size.into(),
None => buffer.buffer.map_context.lock().total_size - buffer.offset,
};
let download = device.create_buffer(&super::BufferDescriptor {
let download = Arc::new(device.create_buffer(&super::BufferDescriptor {
size,
usage: super::BufferUsages::COPY_DST | super::BufferUsages::MAP_READ,
mapped_at_creation: false,
label: None,
});
}));
let mut encoder =
device.create_command_encoder(&super::CommandEncoderDescriptor { label: None });
@ -97,13 +98,22 @@ impl DownloadBuffer {
let command_buffer: super::CommandBuffer = encoder.finish();
queue.submit(Some(command_buffer));
let fut = download.slice(..).map_async(super::MapMode::Read);
async move {
fut.await?;
let mapped_range =
super::Context::buffer_get_mapped_range(&*download.context, &download.id, 0..size);
Ok(Self(download, mapped_range))
}
download
.clone()
.slice(..)
.map_async(super::MapMode::Read, move |result| {
if let Err(e) = result {
callback(Err(e));
return;
}
let mapped_range = super::Context::buffer_get_mapped_range(
&*download.context,
&download.id,
0..size,
);
callback(Ok(Self(download, mapped_range)));
});
}
}

View File

@ -123,7 +123,7 @@ fn pulling_common(
ctx.queue.submit(Some(encoder.finish()));
let slice = buffer.slice(..);
let _ = slice.map_async(wgpu::MapMode::Read);
slice.map_async(wgpu::MapMode::Read, |_| ());
ctx.device.poll(wgpu::Maintain::Wait);
let data: Vec<u32> = bytemuck::cast_slice(&*slice.get_mapped_range()).to_vec();

View File

@ -282,7 +282,7 @@ fn copy_texture_to_buffer(
fn assert_buffer_is_zero(readback_buffer: &wgpu::Buffer, device: &wgpu::Device) {
{
let buffer_slice = readback_buffer.slice(..);
let _ = buffer_slice.map_async(wgpu::MapMode::Read);
buffer_slice.map_async(wgpu::MapMode::Read, |_| ());
device.poll(wgpu::Maintain::Wait);
let buffer_view = buffer_slice.get_mapped_range();