diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index c298257..b98baed 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -461,6 +461,11 @@ class Connection //! ThreadMap.makeThread) used to service requests to clients. ::capnp::CapabilityServerSet m_threads; + //! Hook called on the worker thread inside makeThread, right after + //! set_value. Used by tests to verify the waiter mutex is held when + //! the thread context is published. + std::function testing_hook_makethread; + //! Canceler for canceling promises that we want to discard when the //! connection is destroyed. This is used to interrupt method calls that are //! still executing at time of disconnection. diff --git a/include/mp/proxy.h b/include/mp/proxy.h index c55380c..e108080 100644 --- a/include/mp/proxy.h +++ b/include/mp/proxy.h @@ -70,6 +70,11 @@ struct ProxyContext Connection* connection; EventLoopRef loop; CleanupList cleanup_fns; + //! Hook called on the worker thread just before loop->sync() in PassField + //! for Context arguments. Used by tests to inject precise disconnect timing. + std::function testing_hook_before_sync; + //! Hook called on the worker thread just before returning results. + std::function testing_hook_after_cleanup; ProxyContext(Connection* connection); }; diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 72c3963..a74c0b9 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -61,8 +61,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& std::is_same::value, kj::Promise>::type { - const auto& params = server_context.call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); auto& server = server_context.proxy_server; int req = server_context.req; // Keep a reference to the ProxyServer instance by assigning it to the self @@ -74,8 +72,10 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& auto self = server.thisCap(); auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable { MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; - const auto& params = call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); + if (server.m_context.testing_hook_before_sync) server.m_context.testing_hook_before_sync(); + // Save testing_hook_after_cleanup to a local because the + // server may be freed in the cleanup sync() below. + auto testing_hook_after_cleanup = std::move(server.m_context.testing_hook_after_cleanup); ServerContext server_context{server, call_context, req}; { // Before invoking the function, store a reference to the @@ -127,6 +127,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& server_context.request_canceled = true; }; // Update requests_threads map if not canceled. + const auto& params = call_context.getParams(); + Context::Reader context_arg = Accessor::get(params); std::tie(request_thread, inserted) = SetThread( GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, [&] { return context_arg.getCallbackThread(); }); @@ -153,6 +155,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // the disconnect handler trying to destroy the thread // client object. server.m_context.loop->sync([&] { + // Clear cancellation callback. At this point the + // method invocation finished and the result is + // either being returned, or discarded if a + // cancellation happened. So we do not need to be + // notified of cancellations after this point. Also + // we do not want to be notified because + // cancel_mutex and server_context could be out of + // scope when it happens. + cancel_monitor.m_on_cancel = nullptr; auto self_dispose{kj::mv(self)}; if (erase_thread) { // Look up the thread again without using existing @@ -183,12 +194,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& } // End of scope: if KJ_DEFER was reached, it runs here } + if (testing_hook_after_cleanup) testing_hook_after_cleanup(); return call_context; }; // Lookup Thread object specified by the client. The specified thread should // be a local Thread::Server object, but it needs to be looked up // asynchronously with getLocalServer(). + const auto& params = server_context.call_context.getParams(); + Context::Reader context_arg = Accessor::get(params); auto thread_client = context_arg.getThread(); auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index da22ae6..e7ea2c8 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -416,8 +416,9 @@ kj::Promise ProxyServer::makeThread(MakeThreadContext context) std::thread thread([&thread_context, from, this]() { g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; g_thread_context.waiter = std::make_unique(); - thread_context.set_value(&g_thread_context); Lock lock(g_thread_context.waiter->m_mutex); + thread_context.set_value(&g_thread_context); + if (this->m_connection.testing_hook_makethread) this->m_connection.testing_hook_makethread(); // Wait for shutdown signal from ProxyServer destructor (signal // is just waiter getting set to null.) g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index bf41663..83296e1 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -325,6 +326,112 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") signal.set_value(); } +KJ_TEST("Worker thread destroyed before it is initialized") +{ + // Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756 + // (worker thread destroyed before it acquires the waiter mutex). The + // fix acquires the lock before calling set_value so the + // ProxyServer destructor cannot null the waiter while the + // worker is between set_value and Lock. + // + // The testing_hook_makethread fires right after set_value in + // makeThread's worker thread. A checker thread uses try_lock to + // verify the waiter mutex is held at that point. With the fix + // (Lock before set_value) the mutex is held, so try_lock fails. + // Without the fix (set_value before Lock) the hook fires before + // Lock, so try_lock succeeds, indicating the race window exists. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + std::promise mutex_promise; + std::promise check_done; + Connection* conn = setup.server->m_context.connection; + conn->testing_hook_makethread = [&] { + mutex_promise.set_value(&g_thread_context.waiter->m_mutex.m_mutex); + check_done.get_future().wait(); + }; + + std::atomic lock_was_held{false}; + std::thread check_thread{[&] { + std::mutex* m = mutex_promise.get_future().get(); + bool locked = m->try_lock(); + if (locked) m->unlock(); + lock_was_held = !locked; + check_done.set_value(); + }}; + + foo->callFnAsync(); + check_thread.join(); + KJ_EXPECT(lock_was_held); +} + +KJ_TEST("Calling async IPC method, with server disconnect racing the call") +{ + // Regression test for bitcoin/bitcoin#34777 (heap-use-after-free where + // getParams() was called on the worker thread after the event loop thread + // freed the RpcCallContext on disconnect). The fix moves getParams() inside + // loop->sync() so it always runs on the event loop thread. + // + // Use testing_hook_before_sync to pause the worker thread just before it + // enters loop->sync(), then disconnect the server from a separate thread. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + std::promise worker_ready; + std::promise disconnect_done; + auto disconnect_done_future = disconnect_done.get_future().share(); + setup.server->m_context.testing_hook_before_sync = [&worker_ready, disconnect_done_future] { + worker_ready.set_value(); + disconnect_done_future.wait(); + }; + + std::thread disconnect_thread{[&] { + worker_ready.get_future().wait(); + setup.server_disconnect(); + disconnect_done.set_value(); + }}; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } + disconnect_thread.join(); +} + +KJ_TEST("Calling async IPC method, with server disconnect after cleanup") +{ + // Regression test for bitcoin/bitcoin#34782 (stack-use-after-return where + // the m_on_cancel callback accessed stack-local cancel_mutex and + // server_context after the invoke lambda's inner scope exited). The fix + // clears m_on_cancel in the cleanup loop->sync() so it is null by the + // time the scope exits. + // + // Use testing_hook_after_cleanup to trigger a server disconnect after the + // inner scope exits (cancel_mutex destroyed). Without the fix, the + // disconnect fires m_on_cancel which accesses the destroyed mutex. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + setup.server->m_context.testing_hook_after_cleanup = [&] { + setup.server_disconnect(); + }; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup;