Fix agent borrow mut panic (#981)

This commit is contained in:
Justin Starry 2020-03-01 16:15:07 +08:00 committed by GitHub
parent afb5e9d7ac
commit c991c7c7d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -262,7 +262,8 @@ impl<AGN: Agent> LocalAgent<AGN> {
fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> ContextBridge<AGN> { fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> ContextBridge<AGN> {
let respondable = callback.is_some(); let respondable = callback.is_some();
let id: usize = self.slab.borrow_mut().insert(callback); let mut slab = self.slab.borrow_mut();
let id: usize = slab.insert(callback);
let id = HandlerId::new(id, respondable); let id = HandlerId::new(id, respondable);
ContextBridge { ContextBridge {
scope: self.scope.clone(), scope: self.scope.clone(),
@ -289,7 +290,8 @@ impl Discoverer for Context {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> { fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let mut scope_to_init = None; let mut scope_to_init = None;
let bridge = LOCAL_AGENTS_POOL.with(|pool| { let bridge = LOCAL_AGENTS_POOL.with(|pool| {
match pool.borrow_mut().entry::<LocalAgent<AGN>>() { let mut pool = pool.borrow_mut();
match pool.entry::<LocalAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => { anymap::Entry::Occupied(mut entry) => {
// TODO(#940): Insert callback! // TODO(#940): Insert callback!
entry.get_mut().create_bridge(callback) entry.get_mut().create_bridge(callback)
@ -335,10 +337,19 @@ fn locate_callback_and_respond<AGN: Agent>(
id: HandlerId, id: HandlerId,
output: AGN::Output, output: AGN::Output,
) { ) {
match slab.borrow().get(id.raw_id()).cloned() { let callback = {
Some(Some(callback)) => callback.emit(output), let slab = slab.borrow();
Some(None) => warn!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id()), match slab.get(id.raw_id()).cloned() {
None => warn!("Id of handler does not exist in the slab: {}.", id.raw_id()), Some(callback) => callback,
None => {
warn!("Id of handler does not exist in the slab: {}.", id.raw_id());
return;
}
}
};
match callback {
Some(callback) => callback.emit(output),
None => warn!("The Id of the handler: {}, while present in the slab, is not associated with a callback.", id.raw_id()),
} }
} }
@ -356,24 +367,30 @@ impl<AGN: Agent> Bridge<AGN> for ContextBridge<AGN> {
impl<AGN: Agent> Drop for ContextBridge<AGN> { impl<AGN: Agent> Drop for ContextBridge<AGN> {
fn drop(&mut self) { fn drop(&mut self) {
LOCAL_AGENTS_POOL.with(|pool| { let terminate_worker = LOCAL_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
let terminate_worker = { let terminate_worker = {
if let Some(launched) = pool.borrow_mut().get_mut::<LocalAgent<AGN>>() { if let Some(launched) = pool.get_mut::<LocalAgent<AGN>>() {
launched.remove_bridge(self) launched.remove_bridge(self)
} else { } else {
false false
} }
}; };
let upd = AgentLifecycleEvent::Disconnected(self.id);
self.scope.send(upd);
if terminate_worker { if terminate_worker {
let upd = AgentLifecycleEvent::Destroy; pool.remove::<LocalAgent<AGN>>();
self.scope.send(upd);
pool.borrow_mut().remove::<LocalAgent<AGN>>();
} }
terminate_worker
}); });
let upd = AgentLifecycleEvent::Disconnected(self.id);
self.scope.send(upd);
if terminate_worker {
let upd = AgentLifecycleEvent::Destroy;
self.scope.send(upd);
}
} }
} }
@ -535,7 +552,8 @@ impl<AGN: Agent> RemoteAgent<AGN> {
fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> { fn create_bridge(&mut self, callback: Option<Callback<AGN::Output>>) -> PublicBridge<AGN> {
let respondable = callback.is_some(); let respondable = callback.is_some();
let id: usize = self.slab.borrow_mut().insert(callback); let mut slab = self.slab.borrow_mut();
let id: usize = slab.insert(callback);
let id = HandlerId::new(id, respondable); let id = HandlerId::new(id, respondable);
PublicBridge { PublicBridge {
worker: self.worker.clone(), worker: self.worker.clone(),
@ -564,7 +582,8 @@ pub struct Public;
impl Discoverer for Public { impl Discoverer for Public {
fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> { fn spawn_or_join<AGN: Agent>(callback: Option<Callback<AGN::Output>>) -> Box<dyn Bridge<AGN>> {
let bridge = REMOTE_AGENTS_POOL.with(|pool| { let bridge = REMOTE_AGENTS_POOL.with(|pool| {
match pool.borrow_mut().entry::<RemoteAgent<AGN>>() { let mut pool = pool.borrow_mut();
match pool.entry::<RemoteAgent<AGN>>() {
anymap::Entry::Occupied(mut entry) => { anymap::Entry::Occupied(mut entry) => {
// TODO(#945): Insert callback! // TODO(#945): Insert callback!
entry.get_mut().create_bridge(callback) entry.get_mut().create_bridge(callback)
@ -581,13 +600,13 @@ impl Discoverer for Public {
match msg { match msg {
FromWorker::WorkerLoaded => { FromWorker::WorkerLoaded => {
// TODO(#944): Send `Connected` message // TODO(#944): Send `Connected` message
let _ = REMOTE_AGENTS_LOADED.with(|local| { REMOTE_AGENTS_LOADED.with(|loaded| {
local.borrow_mut().insert(TypeId::of::<AGN>()) let _ = loaded.borrow_mut().insert(TypeId::of::<AGN>());
}); });
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| {
if let Some(msgs) = REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| {
local.borrow_mut().get_mut(&TypeId::of::<AGN>()) let mut queue = queue.borrow_mut();
{ if let Some(msgs) = queue.get_mut(&TypeId::of::<AGN>()) {
for msg in msgs.drain(..) { for msg in msgs.drain(..) {
cfg_match! { cfg_match! {
feature = "std_web" => ({ feature = "std_web" => ({
@ -654,18 +673,15 @@ impl<AGN: Agent> fmt::Debug for PublicBridge<AGN> {
impl<AGN: Agent> PublicBridge<AGN> { impl<AGN: Agent> PublicBridge<AGN> {
fn worker_is_loaded(&self) -> bool { fn worker_is_loaded(&self) -> bool {
REMOTE_AGENTS_LOADED.with(|local| local.borrow().contains(&TypeId::of::<AGN>())) REMOTE_AGENTS_LOADED.with(|loaded| loaded.borrow().contains(&TypeId::of::<AGN>()))
} }
fn msg_to_queue(&self, msg: Vec<u8>) { fn msg_to_queue(&self, msg: Vec<u8>) {
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|local| { REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| {
match local.borrow_mut().entry(TypeId::of::<AGN>()) { let mut queue = queue.borrow_mut();
match queue.entry(TypeId::of::<AGN>()) {
hash_map::Entry::Vacant(record) => { hash_map::Entry::Vacant(record) => {
record.insert({ record.insert(vec![msg]);
let mut v = Vec::new();
v.push(msg);
v
});
} }
hash_map::Entry::Occupied(ref mut record) => { hash_map::Entry::Occupied(ref mut record) => {
record.get_mut().push(msg); record.get_mut().push(msg);
@ -700,36 +716,45 @@ impl<AGN: Agent> Bridge<AGN> for PublicBridge<AGN> {
if self.worker_is_loaded() { if self.worker_is_loaded() {
send_to_remote::<AGN>(&self.worker, msg); send_to_remote::<AGN>(&self.worker, msg);
} else { } else {
let msg = msg.pack(); self.msg_to_queue(msg.pack());
self.msg_to_queue(msg);
} }
} }
} }
impl<AGN: Agent> Drop for PublicBridge<AGN> { impl<AGN: Agent> Drop for PublicBridge<AGN> {
fn drop(&mut self) { fn drop(&mut self) {
REMOTE_AGENTS_POOL.with(|pool| { let terminate_worker = REMOTE_AGENTS_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
let terminate_worker = { let terminate_worker = {
if let Some(launched) = pool.borrow_mut().get_mut::<RemoteAgent<AGN>>() { if let Some(launched) = pool.get_mut::<RemoteAgent<AGN>>() {
launched.remove_bridge(self) launched.remove_bridge(self)
} else { } else {
false false
} }
}; };
let upd = ToWorker::Disconnected(self.id);
send_to_remote::<AGN>(&self.worker, upd);
if terminate_worker { if terminate_worker {
let upd = ToWorker::Destroy; pool.remove::<RemoteAgent<AGN>>();
send_to_remote::<AGN>(&self.worker, upd);
pool.borrow_mut().remove::<RemoteAgent<AGN>>();
REMOTE_AGENTS_LOADED.with(|pool| {
pool.borrow_mut().remove(&TypeId::of::<AGN>());
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|pool| {
pool.borrow_mut().remove(&TypeId::of::<AGN>());
});
} }
terminate_worker
}); });
let disconnected = ToWorker::Disconnected(self.id);
send_to_remote::<AGN>(&self.worker, disconnected);
if terminate_worker {
let destroy = ToWorker::Destroy;
send_to_remote::<AGN>(&self.worker, destroy);
REMOTE_AGENTS_LOADED.with(|loaded| {
loaded.borrow_mut().remove(&TypeId::of::<AGN>());
});
REMOTE_AGENTS_EARLY_MSGS_QUEUE.with(|queue| {
queue.borrow_mut().remove(&TypeId::of::<AGN>());
});
}
} }
} }