diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index c298257..3594708 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -340,6 +340,22 @@ class EventLoop //! External context pointer. void* m_context; + + //! Hook called when ProxyServer::makeThread() is called. + std::function testing_hook_makethread; + + //! Hook called on the worker thread inside makeThread(), after the thread + //! context is set up and thread_context promise is fulfilled, but before it + //! starts waiting for requests. + std::function testing_hook_makethread_created; + + //! Hook called on the worker thread when it starts to execute an async + //! request. Used by tests to control timing or inject behavior at this + //! point in execution. + std::function testing_hook_async_request_start; + + //! Hook called on the worker thread just before returning results. + std::function testing_hook_async_request_done; }; //! Single element task queue used to handle recursive capnp calls. (If the diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 72c3963..9c7f21b 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -61,8 +61,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& std::is_same::value, kj::Promise>::type { - const auto& params = server_context.call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); auto& server = server_context.proxy_server; int req = server_context.req; // Keep a reference to the ProxyServer instance by assigning it to the self @@ -74,8 +72,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& auto self = server.thisCap(); auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable { MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; - const auto& params = call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); + EventLoop& loop = *server.m_context.loop; + if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); ServerContext server_context{server, call_context, req}; { // Before invoking the function, store a reference to the @@ -127,6 +125,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& server_context.request_canceled = true; }; // Update requests_threads map if not canceled. + const auto& params = call_context.getParams(); + Context::Reader context_arg = Accessor::get(params); std::tie(request_thread, inserted) = SetThread( GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, [&] { return context_arg.getCallbackThread(); }); @@ -153,6 +153,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // the disconnect handler trying to destroy the thread // client object. server.m_context.loop->sync([&] { + // Clear cancellation callback. At this point the + // method invocation finished and the result is + // either being returned, or discarded if a + // cancellation happened. So we do not need to be + // notified of cancellations after this point. Also + // we do not want to be notified because + // cancel_mutex and server_context could be out of + // scope when it happens. + cancel_monitor.m_on_cancel = nullptr; auto self_dispose{kj::mv(self)}; if (erase_thread) { // Look up the thread again without using existing @@ -183,12 +192,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& } // End of scope: if KJ_DEFER was reached, it runs here } + if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done(); return call_context; }; // Lookup Thread object specified by the client. The specified thread should // be a local Thread::Server object, but it needs to be looked up // asynchronously with getLocalServer(). + const auto& params = server_context.call_context.getParams(); + Context::Reader context_arg = Accessor::get(params); auto thread_client = context_arg.getThread(); auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index da22ae6..f36e19f 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -411,13 +411,16 @@ ProxyServer::ProxyServer(Connection& connection) : m_connection(conne kj::Promise ProxyServer::makeThread(MakeThreadContext context) { + if (m_connection.m_loop->testing_hook_makethread) m_connection.m_loop->testing_hook_makethread(); const std::string from = context.getParams().getName(); std::promise thread_context; std::thread thread([&thread_context, from, this]() { - g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; + EventLoop& loop{*m_connection.m_loop}; + g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")"; g_thread_context.waiter = std::make_unique(); - thread_context.set_value(&g_thread_context); Lock lock(g_thread_context.waiter->m_mutex); + thread_context.set_value(&g_thread_context); + if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created(); // Wait for shutdown signal from ProxyServer destructor (signal // is just waiter getting set to null.) g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index bf41663..4f71a55 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -63,6 +64,7 @@ class TestSetup { public: std::function server_disconnect; + std::function server_disconnect_later; std::function client_disconnect; std::promise>> client_promise; std::unique_ptr> client; @@ -88,6 +90,10 @@ class TestSetup return capnp::Capability::Client(kj::mv(server_proxy)); }); server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); }; + server_disconnect_later = [&] { + assert(std::this_thread::get_id() == loop.m_thread_id); + loop.m_task_set->add(kj::evalLater([&] { server_connection.reset(); })); + }; // Set handler to destroy the server when the client disconnects. This // is ignored if server_disconnect() is called instead. server_connection->onDisconnect([&] { server_connection.reset(); }); @@ -325,6 +331,99 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") signal.set_value(); } +KJ_TEST("Worker thread destroyed before it is initialized") +{ + // Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756 + // where worker thread is destroyed before it starts. + // + // The test works by using the `makethread` hook to start a disconnect as + // soon as ProxyServer::makeThread is called, and using the + // `makethread_created` hook to sleep 100ms after the thread is created but + // before it starts waiting, so without the bugfix, + // ProxyServer::~ProxyServer would run and destroy the waiter, + // causing a SIGSEGV in the worker thread after the sleep. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_makethread = [&] { + setup.server_disconnect_later(); + }; + loop.testing_hook_makethread_created = [&] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }; + + bool disconnected{false}; + try { + foo->callFnAsync(); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + disconnected = true; + } + KJ_EXPECT(disconnected); +} + +KJ_TEST("Calling async IPC method, with server disconnect racing the call") +{ + // Regression test for bitcoin/bitcoin#34777 heap-use-after-free where + // an async request is canceled before it starts to execute. + // + // Use testing_hook_async_request_start to trigger a disconnect from the + // worker thread as soon as it begins to execute an async request. Without + // the bugfix, the worker thread would trigger a SIGSEGV after this by + // calling call_context.getParams(). + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_async_request_start = [&] { + setup.server_disconnect(); + // Sleep is neccessary to let the event loop fully clean up after the + // disconnect and trigger the SIGSEGV. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } +} + +KJ_TEST("Calling async IPC method, with server disconnect after cleanup") +{ + // Regression test for bitcoin/bitcoin#34782 stack-use-after-return where + // an async request is canceled after it finishes executing but before the + // response is sent. + // + // Use testing_hook_async_request_done to trigger a disconnect from the + // worker thread after it execute an async requests but before it returns. + // Without the bugfix, the m_on_cancel callback would be called at this + // point accessing the cancel_mutex stack variable that had gone out of + // scope. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_async_request_done = [&] { + setup.server_disconnect(); + }; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup;