diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml index 06c5872156..03e35f0ce8 100644 --- a/.github/workflows/ci-linux.yml +++ b/.github/workflows/ci-linux.yml @@ -264,5 +264,6 @@ jobs: || { echo "ERROR: failed to override protobuf version in MODULE.bazel to ${TEST_PROTOBUF_VERSION}"; exit 1; } - run: | bazel test --action_env=CC=clang --config=rdma \ + --define with_bthread_tracer=true \ --define with_babylon_counter=true \ //test/... --test_arg=--gtest_filter=-RdmaRpcTest.* diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 15c8c91887..377d3cf183 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -186,6 +186,7 @@ void Controller::ResetNonPods() { if (auto span = _span.lock()) { Span::Submit(span, butil::cpuwide_time_us()); } + _span.reset(); _error_text.clear(); _remote_side = butil::EndPoint(); _local_side = butil::EndPoint(); @@ -240,7 +241,6 @@ void Controller::ResetNonPods() { void Controller::ResetPods() { // NOTE: Make the sequence of assignments same with the order that they're // defined in header. Better for cpu cache and faster for lookup. - _span.reset(); _flags = 0; #ifndef BAIDU_INTERNAL set_pb_bytes_to_base64(true); diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 90f19cd5bc..641cb3977a 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -28,6 +28,7 @@ #include #include "butil/build_config.h" // OS_LINUX +#include "butil/debug/leak_annotations.h" // Naming services #ifdef BAIDU_INTERNAL #include "brpc/policy/baidu_naming_service.h" @@ -216,6 +217,14 @@ static int GetRunningServerCount(void*) { // Update global stuff periodically. static void* GlobalUpdate(void*) { + // This bthread runs for the whole process lifetime and never returns, so + // the local objects below live until the process exits and their + // destructors never run. They are reachable from this bthread's stack, so + // the objects themselves are not reported as leaks, but the heap buffers + // they allocate while exposing themselves (variable names, watched path) + // would be. Disable leak detection only around their construction and + // re-enable it right after. + ANNOTATE_MEMORY_LEAK_DISABLE(); // Expose variables. bvar::PassiveStatus var_iobuf_block_count( "iobuf_block_count", GetIOBufBlockCount, NULL); @@ -232,7 +241,9 @@ static void* GlobalUpdate(void*) { "rpc_server_count", GetRunningServerCount, NULL); butil::FileWatcher fw; - if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) { + const int fw_rc = fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE); + ANNOTATE_MEMORY_LEAK_ENABLE(); + if (fw_rc < 0) { LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'"; return NULL; } @@ -270,7 +281,11 @@ static void* GlobalUpdate(void*) { } } - SocketMapList(&conns); + { + // See detail above. + ANNOTATE_SCOPED_MEMORY_LEAK; + SocketMapList(&conns); + } const int64_t now_ms = butil::cpuwide_time_ms(); for (size_t i = 0; i < conns.size(); ++i) { SocketUniquePtr ptr; diff --git a/src/brpc/rdma/block_pool.cpp b/src/brpc/rdma/block_pool.cpp index 36c763ec13..d8dbb8abda 100644 --- a/src/brpc/rdma/block_pool.cpp +++ b/src/brpc/rdma/block_pool.cpp @@ -615,10 +615,17 @@ void DestroyBlockPool() { node = tmp; } g_info->idle_list[i][j] = NULL; + // Release the per-bucket mutexes allocated in InitBlockPool. + delete g_info->lock[i][j]; + g_info->lock[i][j] = NULL; } } delete g_info; g_info = NULL; + delete g_dump_mutex; + g_dump_mutex = NULL; + delete g_tls_info_mutex; + g_tls_info_mutex = NULL; for (int i = 0; i < g_region_num; ++i) { if (g_regions[i].start == 0) { break; diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 57f665da91..6e1f9e8840 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -500,107 +500,107 @@ Server::~Server() { int Server::AddBuiltinServices() { // Firstly add services shown in tabs. - if (AddBuiltinService(new (std::nothrow) StatusService)) { + if (AddBuiltinService(new StatusService)) { LOG(ERROR) << "Fail to add StatusService"; return -1; } - if (AddBuiltinService(new (std::nothrow) VarsService)) { + if (AddBuiltinService(new VarsService)) { LOG(ERROR) << "Fail to add VarsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) ConnectionsService)) { + if (AddBuiltinService(new ConnectionsService)) { LOG(ERROR) << "Fail to add ConnectionsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) FlagsService)) { + if (AddBuiltinService(new FlagsService)) { LOG(ERROR) << "Fail to add FlagsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) RpczService)) { + if (AddBuiltinService(new RpczService)) { LOG(ERROR) << "Fail to add RpczService"; return -1; } - if (AddBuiltinService(new (std::nothrow) HotspotsService)) { + if (AddBuiltinService(new HotspotsService)) { LOG(ERROR) << "Fail to add HotspotsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) IndexService)) { + if (AddBuiltinService(new IndexService)) { LOG(ERROR) << "Fail to add IndexService"; return -1; } // Add other services. - if (AddBuiltinService(new (std::nothrow) VersionService(this))) { + if (AddBuiltinService(new VersionService(this))) { LOG(ERROR) << "Fail to add VersionService"; return -1; } - if (AddBuiltinService(new (std::nothrow) HealthService)) { + if (AddBuiltinService(new HealthService)) { LOG(ERROR) << "Fail to add HealthService"; return -1; } - if (AddBuiltinService(new (std::nothrow) ProtobufsService(this))) { + if (AddBuiltinService(new ProtobufsService(this))) { LOG(ERROR) << "Fail to add ProtobufsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) BadMethodService)) { + if (AddBuiltinService(new BadMethodService)) { LOG(ERROR) << "Fail to add BadMethodService"; return -1; } - if (AddBuiltinService(new (std::nothrow) ListService(this))) { + if (AddBuiltinService(new ListService(this))) { LOG(ERROR) << "Fail to add ListService"; return -1; } - if (AddBuiltinService(new (std::nothrow) PrometheusMetricsService)) { + if (AddBuiltinService(new PrometheusMetricsService)) { LOG(ERROR) << "Fail to add MetricsService"; return -1; } if (FLAGS_enable_threads_service && - AddBuiltinService(new (std::nothrow) ThreadsService)) { + AddBuiltinService(new ThreadsService)) { LOG(ERROR) << "Fail to add ThreadsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) MemoryService)) { + if (AddBuiltinService(new MemoryService)) { LOG(ERROR) << "Fail to add MemoryService"; return -1; } #if !BRPC_WITH_GLOG - if (AddBuiltinService(new (std::nothrow) VLogService)) { + if (AddBuiltinService(new VLogService)) { LOG(ERROR) << "Fail to add VLogService"; return -1; } #endif - if (AddBuiltinService(new (std::nothrow) PProfService)) { + if (AddBuiltinService(new PProfService)) { LOG(ERROR) << "Fail to add PProfService"; return -1; } if (FLAGS_enable_dir_service && - AddBuiltinService(new (std::nothrow) DirService)) { + AddBuiltinService(new DirService)) { LOG(ERROR) << "Fail to add DirService"; return -1; } - if (AddBuiltinService(new (std::nothrow) BthreadsService)) { + if (AddBuiltinService(new BthreadsService)) { LOG(ERROR) << "Fail to add BthreadsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) IdsService)) { + if (AddBuiltinService(new IdsService)) { LOG(ERROR) << "Fail to add IdsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) SocketsService)) { + if (AddBuiltinService(new SocketsService)) { LOG(ERROR) << "Fail to add SocketsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) GetFaviconService)) { + if (AddBuiltinService(new GetFaviconService)) { LOG(ERROR) << "Fail to add GetFaviconService"; return -1; } - if (AddBuiltinService(new (std::nothrow) GetJsService)) { + if (AddBuiltinService(new GetJsService)) { LOG(ERROR) << "Fail to add GetJsService"; return -1; } - if (AddBuiltinService(new (std::nothrow) GrpcHealthCheckService)) { + if (AddBuiltinService(new GrpcHealthCheckService)) { LOG(ERROR) << "Fail to add GrpcHealthCheckService"; return -1; } @@ -927,18 +927,20 @@ int Server::StartInternal(const butil::EndPoint& endpoint, _session_local_data_pool->Reserve(_options.reserved_session_local_data); } - // Leak of `_keytable_pool' and others is by design. - // See comments in Server::Join() for details. - // Instruct LeakSanitizer to ignore the designated memory leak. - ANNOTATE_SCOPED_MEMORY_LEAK; - // Init _keytable_pool always. If the server was stopped before, the pool - // should be destroyed in Join(). - _keytable_pool = new bthread_keytable_pool_t; - if (bthread_keytable_pool_init(_keytable_pool) != 0) { - LOG(ERROR) << "Fail to init _keytable_pool"; - delete _keytable_pool; - _keytable_pool = NULL; - return -1; + { + // Leak of `_keytable_pool' and others is by design. + // See comments in Server::Join() for details. + // Instruct LeakSanitizer to ignore the designated memory leak. + ANNOTATE_SCOPED_MEMORY_LEAK; + // Init _keytable_pool always. If the server was stopped before, the pool + // should be destroyed in Join(). + _keytable_pool = new bthread_keytable_pool_t; + if (bthread_keytable_pool_init(_keytable_pool) != 0) { + LOG(ERROR) << "Fail to init _keytable_pool"; + delete _keytable_pool; + _keytable_pool = NULL; + return -1; + } } if (_options.thread_local_data_factory) { @@ -1635,7 +1637,15 @@ int Server::AddService(google::protobuf::Service* service, int Server::AddBuiltinService(google::protobuf::Service* service) { ServiceOptions options; options.ownership = SERVER_OWNS_SERVICE; - return AddServiceInternal(service, true, options); + int rc = AddServiceInternal(service, true, options); + if (rc != 0) { + // AddServiceInternal does not take ownership of `service' on failure: + // for builtin services the only failure paths (name/fullname conflict) + // return before the service is inserted into any map. Delete it here to + // avoid leaking the object allocated by the caller. + delete service; + } + return rc; } void Server::RemoveMethodsOf(google::protobuf::Service* service) { diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 8d934f5827..1562e0a3b9 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -22,6 +22,7 @@ #include "butil/time.h" #include "butil/scoped_lock.h" #include "butil/logging.h" +#include "butil/debug/leak_annotations.h" #include "brpc/log.h" #include "brpc/protocol.h" #include "brpc/input_messenger.h" @@ -384,11 +385,19 @@ void* SocketMap::RunWatchConnections(void* arg) { } void SocketMap::WatchConnections() { + // This bthread of SocketMap Singleton runs for the whole process lifetime and + // never returns, so the local objects below live until the process exits and + // their destructors never run. They are reachable from this bthread's stack, + // so the objects themselves are not reported as leaks, but the heap buffers + // they allocate while exposing themselves (variable names, watched path) would + // be. Disable leak detection only around their construction and re-enable it + // right after std::vector main_sockets; std::vector pooled_sockets; std::vector orphan_sockets; const uint64_t CHECK_INTERVAL_US = 1000000UL; while (bthread_usleep(CHECK_INTERVAL_US) == 0) { + ANNOTATE_SCOPED_MEMORY_LEAK; // NOTE: save the gflag which may be reloaded at any time. const int idle_seconds = _options.idle_timeout_second_dynamic ? *_options.idle_timeout_second_dynamic diff --git a/src/brpc/span.cpp b/src/brpc/span.cpp index 3a53f33a39..1863f01a9f 100644 --- a/src/brpc/span.cpp +++ b/src/brpc/span.cpp @@ -337,14 +337,11 @@ void Span::ResetServerSpanName(const std::string& full_method_name) { void Span::submit(int64_t cpuwide_us) { // Note: this method is not called for client-side spans. EndAsParent(); - SpanContainer* container = new(std::nothrow) SpanContainer(shared_from_this()); // If memory allocation fails, the server span will not be submitted for persistence. // The server span will be destroyed later when its shared_ptr refcount drops to zero // Child spans (held in _client_list) will also be destroyed when // their refcounts reach zero. - if (container) { - container->submit(cpuwide_us); - } + (new SpanContainer(shared_from_this()))->submit(cpuwide_us); } void Span::Annotate(const char* fmt, ...) { diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index c0804c9a63..d94393f561 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -417,10 +417,22 @@ void TaskGroup::task_runner(intptr_t skip_remained) { << m->stat.cputime_ns / 1000000.0 << "ms"; } + // Clean up span if it exists. This must be done before keytable cleanup + // because span cleanup may use bthread local storage (e.g. logging, + // which allocates bthread-local stream arrays via bthread_setspecific). + // If span cleanup ran after keytable cleanup, such allocations would + // re-populate the keytable and never be reclaimed, causing memory leak. + LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls); + if (tls_bls_ptr->rpcz_parent_span && g_rpcz_parent_span_dtor) { + g_rpcz_parent_span_dtor(tls_bls_ptr->rpcz_parent_span); + tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls); + tls_bls_ptr->rpcz_parent_span = NULL; + m->local_storage.rpcz_parent_span = NULL; + } + // Clean tls variables, must be done before changing version_butex // otherwise another thread just joined this thread may not see side // effects of destructing tls variables. - LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls); KeyTable* kt = tls_bls_ptr->keytable; if (kt != NULL) { return_keytable(m->attr.keytable_pool, kt); @@ -430,15 +442,6 @@ void TaskGroup::task_runner(intptr_t skip_remained) { m->local_storage.keytable = NULL; // optional } - // Clean up span if it exists. This must be done after keytable cleanup - // because span cleanup may use bthread local storage. - tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls); - if (tls_bls_ptr->rpcz_parent_span && g_rpcz_parent_span_dtor) { - g_rpcz_parent_span_dtor(tls_bls_ptr->rpcz_parent_span); - tls_bls_ptr->rpcz_parent_span = NULL; - m->local_storage.rpcz_parent_span = NULL; - } - // During running the function in TaskMeta and deleting the KeyTable in // return_KeyTable, the group is probably changed. g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); diff --git a/src/bthread/timer_thread.cpp b/src/bthread/timer_thread.cpp index 813104a452..a7ebfa4c22 100644 --- a/src/bthread/timer_thread.cpp +++ b/src/bthread/timer_thread.cpp @@ -445,7 +445,9 @@ void TimerThread::run() { } void TimerThread::stop_and_join() { - _stop.store(true, butil::memory_order_relaxed); + if (_stop.exchange(true, butil::memory_order_relaxed)) { + return; + } if (_started) { { BAIDU_SCOPED_LOCK(_mutex); diff --git a/src/butil/debug/leak_annotations.h b/src/butil/debug/leak_annotations.h index 47387acbb7..bcbaa00ed3 100644 --- a/src/butil/debug/leak_annotations.h +++ b/src/butil/debug/leak_annotations.h @@ -26,16 +26,13 @@ extern "C" { void __lsan_disable(); void __lsan_enable(); void __lsan_ignore_object(const void *p); - -// Invoke leak detection immediately. If leaks are found, the process will exit. -void __lsan_do_leak_check(); } // extern "C" class ScopedLeakSanitizerDisabler { public: ScopedLeakSanitizerDisabler() { __lsan_disable(); } ~ScopedLeakSanitizerDisabler() { __lsan_enable(); } -private: + DISALLOW_COPY_AND_ASSIGN(ScopedLeakSanitizerDisabler); }; @@ -44,11 +41,17 @@ class ScopedLeakSanitizerDisabler { #define ANNOTATE_LEAKING_OBJECT_PTR(X) __lsan_ignore_object(X) +// Manually pair these to mark allocations made in between as intentional non-leaks. +#define ANNOTATE_MEMORY_LEAK_DISABLE() __lsan_disable() +#define ANNOTATE_MEMORY_LEAK_ENABLE() __lsan_enable() + #else // If neither HeapChecker nor LSan are used, the annotations should be no-ops. #define ANNOTATE_SCOPED_MEMORY_LEAK ((void)0) #define ANNOTATE_LEAKING_OBJECT_PTR(X) ((void)(X)) +#define ANNOTATE_MEMORY_LEAK_DISABLE() ((void)0) +#define ANNOTATE_MEMORY_LEAK_ENABLE() ((void)0) #endif diff --git a/src/butil/find_cstr.h b/src/butil/find_cstr.h index fd9971399a..103f9e70a1 100644 --- a/src/butil/find_cstr.h +++ b/src/butil/find_cstr.h @@ -24,6 +24,7 @@ #include #include #include "butil/thread_local.h" +#include "butil/debug/leak_annotations.h" // Find c-string in maps with std::string as keys without memory allocations. // Example: @@ -57,6 +58,11 @@ struct StringMapThreadLocalTemp { } inline std::string* get_string(const char* key) { + // This thread-local string (and any buffer it reallocates) is reclaimed + // via thread_atexit when the thread exits. If a thread is still alive at + // leak-check time the allocation would be reported; it is a thread-local + // cache, so mark its allocations as intentional non-leaks. + ANNOTATE_SCOPED_MEMORY_LEAK; if (!initialized) { initialized = true; std::string* tmp = new (buf) std::string(key); @@ -70,6 +76,8 @@ struct StringMapThreadLocalTemp { } inline std::string* get_string(const char* key, size_t length) { + // See the note in the other get_string overload. + ANNOTATE_SCOPED_MEMORY_LEAK; if (!initialized) { initialized = true; std::string* tmp = new (buf) std::string(key, length); diff --git a/src/butil/lazy_instance.h b/src/butil/lazy_instance.h index e1daeb5894..b300faf4d7 100644 --- a/src/butil/lazy_instance.h +++ b/src/butil/lazy_instance.h @@ -39,7 +39,6 @@ #include "butil/atomicops.h" #include "butil/base_export.h" -#include "butil/basictypes.h" #include "butil/debug/leak_annotations.h" #include "butil/logging.h" #include "butil/memory/aligned_memory.h" diff --git a/src/butil/logging.cc b/src/butil/logging.cc index e4964d0ecf..ad6e821ac1 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -69,6 +69,7 @@ typedef pthread_mutex_t* MutexHandle; #include "butil/debug/alias.h" #include "butil/debug/debugger.h" #include "butil/debug/stack_trace.h" +#include "butil/debug/leak_annotations.h" #include "butil/posix/eintr_wrapper.h" #include "butil/strings/string_util.h" #include "butil/strings/stringprintf.h" @@ -619,6 +620,7 @@ void AsyncLogger::Log(LogInfo&& log_info) { DoLog(log_info); return; } + ANNOTATE_SCOPED_MEMORY_LEAK; log_req->log_info = std::move(log_info); LogImpl(log_req); } diff --git a/src/butil/memory/singleton.h b/src/butil/memory/singleton.h index f76a8317b5..ff132bc48e 100644 --- a/src/butil/memory/singleton.h +++ b/src/butil/memory/singleton.h @@ -24,8 +24,6 @@ #include "butil/base_export.h" #include "butil/memory/aligned_memory.h" #include "butil/third_party/dynamic_annotations/dynamic_annotations.h" -#include "butil/threading/thread_restrictions.h" -#include "butil/debug/leak_annotations.h" namespace butil { namespace internal { @@ -267,13 +265,8 @@ class Singleton { butil::subtle::Release_Store( &instance_, reinterpret_cast(newval)); - if (newval != NULL) { - if (Traits::kRegisterAtExit) { - butil::AtExitManager::RegisterCallback(OnExit, NULL); - } else { - // Instruct LeakSanitizer to ignore the designated memory leak. - ANNOTATE_LEAKING_OBJECT_PTR(newval); - } + if (newval != NULL && Traits::kRegisterAtExit) { + butil::AtExitManager::RegisterCallback(OnExit, NULL); } return newval; diff --git a/src/butil/object_pool_inl.h b/src/butil/object_pool_inl.h index cc6b411c14..14310db1c6 100644 --- a/src/butil/object_pool_inl.h +++ b/src/butil/object_pool_inl.h @@ -26,6 +26,8 @@ #include // pthread_mutex_t #include // std::max, std::min #include + +#include "class_name.h" #include "butil/atomicops.h" // butil::atomic #include "butil/macros.h" // BAIDU_CACHELINE_ALIGNMENT #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK @@ -357,12 +359,62 @@ class BAIDU_CACHELINE_ALIGNMENT ObjectPool { ObjectPool() { _free_chunks.reserve(OP_INITIAL_FREE_LIST_SIZE); pthread_mutex_init(&_free_chunks_mutex, NULL); +#if defined(BUTIL_USE_ASAN) && \ + !defined(BAIDU_CLEAR_OBJECT_POOL_AFTER_ALL_THREADS_QUIT) + // Objects returned to the pool stay ASan-poisoned (see return_object()). + // LeakSanitizer skips poisoned memory when scanning for live pointers + // (its `use_poisoned' option is off by default), so heap memory that is + // only reachable through pointers stored inside pooled objects (e.g. + // std::string buffers owned by cached protobuf messages) would be + // falsely reported as leaked at process exit. Un-poison all pooled + // objects right before LSan runs. LSan registers its leak check via + // atexit() very early during sanitizer init, so this handler (registered + // lazily on the first singleton creation) runs before it because atexit + // handlers execute in LIFO order. + // Not needed when BAIDU_CLEAR_OBJECT_POOL_AFTER_ALL_THREADS_QUIT is + // defined: clear_from_destructor_of_local_pool() then destructs and + // frees all pooled objects after the last thread quits. + if (ObjectPoolWithASanPoison::value) { + atexit(unpoison_all_objects_before_leak_check); + } +#endif } ~ObjectPool() { pthread_mutex_destroy(&_free_chunks_mutex); } +#if defined(BUTIL_USE_ASAN) && \ + !defined(BAIDU_CLEAR_OBJECT_POOL_AFTER_ALL_THREADS_QUIT) + // Un-poison every constructed object still held by the pool so that + // LeakSanitizer can follow the pointers inside them. Only un-poisons, does + // not destruct: the pool intentionally keeps objects alive for reuse, and + // they remain reachable from the singleton, so they are not real leaks. + static void unpoison_all_objects_before_leak_check() { + if (NULL == _singleton.load(butil::memory_order_consume)) { + return; + } + const size_t ngroup = _ngroup.load(butil::memory_order_acquire); + for (size_t i = 0; i < ngroup; ++i) { + BlockGroup* bg = _block_groups[i].load(butil::memory_order_consume); + if (NULL == bg) { + break; + } + const size_t nblock = std::min( + bg->nblock.load(butil::memory_order_relaxed), OP_GROUP_NBLOCK); + for (size_t j = 0; j < nblock; ++j) { + Block* b = bg->blocks[j].load(butil::memory_order_consume); + if (NULL == b) { + continue; + } + for (size_t k = 0; k < b->nitem; ++k) { + asan_unpoison_memory_region((T*)&b->items[k]); + } + } + } + } +#endif + // Create a Block and append it to right-most BlockGroup. static Block* add_block(size_t* index) { Block* const new_block = new(std::nothrow) Block; diff --git a/test/BUILD.bazel b/test/BUILD.bazel index 3efe804c4d..70fda21d4f 100644 --- a/test/BUILD.bazel +++ b/test/BUILD.bazel @@ -262,6 +262,7 @@ generate_unittests( # on the same port and fail. size=large is the only safe lever here. per_test_size = { "brpc_channel_unittest.cpp": "large", + "brpc_load_balancer_unittest.cpp": "large", }, ) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index df8fedce49..cd1494086a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -216,6 +216,11 @@ if(BRPC_WITH_GLOG) target_link_libraries(brpc-shared-debug ${GLOG_LIB}) endif() +# AddressSanitizer is incompatible with tcmalloc/gperftools, do not link it when WITH_ASAN is on +if(WITH_ASAN) + set(GPERFTOOLS_LIBRARIES "") +endif() + # test_butil add_executable(test_butil ${TEST_BUTIL_SOURCES} ${CMAKE_CURRENT_BINARY_DIR}/iobuf.pb.cc) diff --git a/test/brpc_alpn_protocol_unittest.cpp b/test/brpc_alpn_protocol_unittest.cpp index 9c1506710b..3e0fd1381d 100644 --- a/test/brpc_alpn_protocol_unittest.cpp +++ b/test/brpc_alpn_protocol_unittest.cpp @@ -99,13 +99,17 @@ class ALPNTest : public testing::Test { // SSL handshake. SSL* ssl = brpc::CreateSSLSession(ssl_ctx, 0, cli_fd, false); EXPECT_NE(nullptr, ssl); - EXPECT_EQ(1, SSL_do_handshake(ssl)); + EXPECT_EQ(1, SSL_do_handshake(ssl)); // Get handshake result. const unsigned char* select_alpn = nullptr; unsigned int len = 0; SSL_get0_alpn_selected(ssl, &select_alpn, &len); - return std::string(reinterpret_cast(select_alpn), len); + std::string result(reinterpret_cast(select_alpn), len); + + SSL_free(ssl); + SSL_CTX_free(ssl_ctx); + return result; } private: diff --git a/test/brpc_event_dispatcher_unittest.cpp b/test/brpc_event_dispatcher_unittest.cpp index 5c0aa064d8..dcca305fef 100644 --- a/test/brpc_event_dispatcher_unittest.cpp +++ b/test/brpc_event_dispatcher_unittest.cpp @@ -202,6 +202,10 @@ struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public brpc::SocketUser { times = 0; } + ~SocketExtra() { + free(buf); + } + void BeforeRecycle(brpc::Socket* m) override { pthread_mutex_lock(&rel_fd_mutex); rel_fd.push_back(m->fd()); @@ -293,6 +297,7 @@ void* client_thread(void* arg) { } } } + free(buf); EXPECT_EQ(0, close(m->fd)); return NULL; } @@ -320,6 +325,7 @@ TEST_F(EventDispatcherTest, dispatch_tasks) { pthread_t cth[NCLIENT]; ClientMeta* cm[NCLIENT]; SocketExtra* sm[NCLIENT]; + brpc::SocketId socket_ids[NCLIENT]; for (size_t i = 0; i < NCLIENT; ++i) { ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i)); @@ -334,6 +340,7 @@ TEST_F(EventDispatcherTest, dispatch_tasks) { options.on_edge_triggered_events = SocketExtra::OnEdgeTriggeredEvents; ASSERT_EQ(0, brpc::Socket::Create(options, &socket_id)); + socket_ids[i] = socket_id; cm[i] = new ClientMeta; cm[i]->fd = fds[i * 2 + 1]; cm[i]->times = 0; @@ -387,6 +394,16 @@ TEST_F(EventDispatcherTest, dispatch_tasks) { #ifdef BUTIL_RESOURCE_POOL_NEED_FREE_ITEM_NUM ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num); #endif + + // Release sockets (SocketExtra::BeforeRecycle deletes the user) and the + // per-client metadata to avoid leaking them. + for (size_t i = 0; i < NCLIENT; ++i) { + brpc::SocketUniquePtr s; + if (brpc::Socket::Address(socket_ids[i], &s) == 0) { + s->SetFailed(); + } + delete cm[i]; + } } // Unique identifier of a EventPipe. diff --git a/test/brpc_http_rpc_protocol_unittest.cpp b/test/brpc_http_rpc_protocol_unittest.cpp index c7022ed283..3f3290bdc4 100644 --- a/test/brpc_http_rpc_protocol_unittest.cpp +++ b/test/brpc_http_rpc_protocol_unittest.cpp @@ -1772,6 +1772,9 @@ TEST_F(HttpTest, http2_goaway_sanity) { butil::Status st = socket_message->AppendAndDestroySelf(&dummy, _h2_client_sock.get()); ASSERT_EQ(st.error_code(), brpc::ELOGOFF); ASSERT_TRUE(st.error_data().ends_with("the connection just issued GOAWAY")); + // Release the reference held by stream_user_data (which is normally released + // by Controller::Call::OnComplete) to avoid leaking the H2UnsentRequest. + h2_req->DestroyStreamUserData(_h2_client_sock, &cntl, 0, false); } class AfterRecevingGoAway : public ::google::protobuf::Closure { diff --git a/test/brpc_input_messenger_unittest.cpp b/test/brpc_input_messenger_unittest.cpp index 812c499a57..ae4afb6fb1 100644 --- a/test/brpc_input_messenger_unittest.cpp +++ b/test/brpc_input_messenger_unittest.cpp @@ -143,6 +143,7 @@ void* client_thread(void* arg) { } } } + free(buf); return NULL; } @@ -217,5 +218,8 @@ TEST_F(MessengerTest, dispatch_tasks) { messenger[i].StopAccept(0); } sleep(1); + for (size_t i = 0; i < NCLIENT; ++i) { + delete cm[i]; + } LOG(WARNING) << "begin to exit!!!!"; } diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp index 76ad005eac..757f9c9bcc 100644 --- a/test/brpc_load_balancer_unittest.cpp +++ b/test/brpc_load_balancer_unittest.cpp @@ -911,9 +911,11 @@ TEST_F(LoadBalancerTest, weighted_round_robin_no_valid_server) { brpc::ServerId id(8888); brpc::SocketOptions options; options.remote_side = dummy; - options.user = new SaveRecycle; id.tag = weight[i]; if (i < 2) { + // `user` is owned by the Socket; only allocate it when a Socket is + // actually created, otherwise it would leak. + options.user = new SaveRecycle; ASSERT_EQ(0, brpc::Socket::Create(options, &id.id)); } EXPECT_TRUE(wrrlb.AddServer(id)); @@ -1107,14 +1109,14 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) { "10.92.115.19:8832", "10.42.122.201:8833", }; - brpc::LoadBalancer* lb = NULL; + std::unique_ptr lb; int rand = butil::fast_rand_less_than(2); if (rand == 0) { brpc::policy::RandomizedLoadBalancer rlb; - lb = rlb.New("min_working_instances=2 hold_seconds=2"); + lb.reset(rlb.New("min_working_instances=2 hold_seconds=2")); } else if (rand == 1) { brpc::policy::RoundRobinLoadBalancer rrlb; - lb = rrlb.New("min_working_instances=2 hold_seconds=2"); + lb.reset(rrlb.New("min_working_instances=2 hold_seconds=2")); } brpc::SocketUniquePtr ptr[2]; for (size_t i = 0; i < ARRAY_SIZE(servers); ++i) { diff --git a/test/brpc_proto_unittest.cpp b/test/brpc_proto_unittest.cpp index 052a0671fc..88ed40dac9 100644 --- a/test/brpc_proto_unittest.cpp +++ b/test/brpc_proto_unittest.cpp @@ -19,6 +19,7 @@ #include +#include #include #include #include @@ -58,7 +59,7 @@ TEST(ProtoTest, proto) { meta.set_correlation_id(123); std::string data; ASSERT_TRUE(meta.SerializeToString(&data)); - Message *msg = factory.GetPrototype(new_desc)->New(); + std::unique_ptr msg(factory.GetPrototype(new_desc)->New()); ASSERT_TRUE(msg != NULL); ASSERT_TRUE(msg->ParseFromString(data)); ASSERT_TRUE(msg->SerializeToString(&data)); diff --git a/test/brpc_protobuf_json_unittest.cpp b/test/brpc_protobuf_json_unittest.cpp index aa73fbd8f2..b5ad0bb2dd 100644 --- a/test/brpc_protobuf_json_unittest.cpp +++ b/test/brpc_protobuf_json_unittest.cpp @@ -1136,7 +1136,7 @@ TEST_F(ProtobufJsonTest, pb_to_json_control_char_case) { person->set_id(100); char ch = 0x01; - char* name = new char[17]; + char name[17]; memcpy(name, "baidu ", 6); name[6] = ch; char c = 0x08; diff --git a/test/brpc_redis_cluster_unittest.cpp b/test/brpc_redis_cluster_unittest.cpp index 3047159cca..34a6d9ba6e 100644 --- a/test/brpc_redis_cluster_unittest.cpp +++ b/test/brpc_redis_cluster_unittest.cpp @@ -424,20 +424,26 @@ class KVCommandHandler : public brpc::RedisCommandHandler { class ClusterRedisService : public brpc::RedisService { public: - explicit ClusterRedisService(NodeData* data) { - AddCommandHandler("asking", new AskingHandler()); - AddCommandHandler("cluster", new ClusterCommandHandler(data->meta)); - - KVCommandHandler* handler = new KVCommandHandler(data); - AddCommandHandler("ping", handler); - AddCommandHandler("get", handler); - AddCommandHandler("set", handler); - AddCommandHandler("del", handler); - AddCommandHandler("exists", handler); - AddCommandHandler("unlink", handler); - AddCommandHandler("eval", handler); - AddCommandHandler("evalsha", handler); + explicit ClusterRedisService(NodeData* data) + : _asking_handler(new AskingHandler()) + , _cluster_handler(new ClusterCommandHandler(data->meta)) + , _kv_handler(new KVCommandHandler(data)) { + AddCommandHandler("asking", _asking_handler.get()); + AddCommandHandler("cluster", _cluster_handler.get()); + AddCommandHandler("ping", _kv_handler.get()); + AddCommandHandler("get", _kv_handler.get()); + AddCommandHandler("set", _kv_handler.get()); + AddCommandHandler("del", _kv_handler.get()); + AddCommandHandler("exists", _kv_handler.get()); + AddCommandHandler("unlink", _kv_handler.get()); + AddCommandHandler("eval", _kv_handler.get()); + AddCommandHandler("evalsha", _kv_handler.get()); } + +private: + std::unique_ptr _asking_handler; + std::unique_ptr _cluster_handler; + std::unique_ptr _kv_handler; }; class Done : public google::protobuf::Closure { diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp index 80706b0623..a12cc4b1d7 100644 --- a/test/brpc_redis_unittest.cpp +++ b/test/brpc_redis_unittest.cpp @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -1131,24 +1132,26 @@ class IncrCommandHandler : public brpc::RedisCommandHandler { TEST_F(RedisTest, server_sanity) { std::string password = GeneratePassword(); + std::unique_ptr redis_auth_holder( + new brpc::policy::RedisAuthenticator(password)); + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); + std::unique_ptr gh(new GetCommandHandler(rsimpl)); + std::unique_ptr sh(new SetCommandHandler(rsimpl)); + std::unique_ptr ah(new AuthCommandHandler(rsimpl)); + std::unique_ptr ih(new IncrCommandHandler(rsimpl)); brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl(password); - GetCommandHandler *gh = new GetCommandHandler(rsimpl); - SetCommandHandler *sh = new SetCommandHandler(rsimpl); - AuthCommandHandler *ah = new AuthCommandHandler(rsimpl); - IncrCommandHandler *ih = new IncrCommandHandler(rsimpl); - rsimpl->AddCommandHandler("get", gh); - rsimpl->AddCommandHandler("set", sh); - rsimpl->AddCommandHandler("incr", ih); - rsimpl->AddCommandHandler("auth", ah); + rsimpl->AddCommandHandler("get", gh.get()); + rsimpl->AddCommandHandler("set", sh.get()); + rsimpl->AddCommandHandler("incr", ih.get()); + rsimpl->AddCommandHandler("auth", ah.get()); server_options.redis_service = rsimpl; brpc::PortRange pr(8081, 8900); ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options)); brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; - options.auth = new brpc::policy::RedisAuthenticator(password); + options.auth = redis_auth_holder.get(); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); @@ -1225,13 +1228,15 @@ void* incr_thread(void* arg) { TEST_F(RedisTest, server_concurrency) { std::string password = GeneratePassword(); int N = 10; + std::unique_ptr redis_auth_holder( + new brpc::policy::RedisAuthenticator(password)); + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); + std::unique_ptr ah(new AuthCommandHandler(rsimpl)); + std::unique_ptr ih(new IncrCommandHandler(rsimpl)); brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl(password); - AuthCommandHandler *ah = new AuthCommandHandler(rsimpl); - IncrCommandHandler *ih = new IncrCommandHandler(rsimpl); - rsimpl->AddCommandHandler("incr", ih); - rsimpl->AddCommandHandler("auth", ah); + rsimpl->AddCommandHandler("incr", ih.get()); + rsimpl->AddCommandHandler("auth", ah.get()); server_options.redis_service = rsimpl; brpc::PortRange pr(8081, 8900); ASSERT_EQ(0, server.Start("0.0.0.0", pr, &server_options)); @@ -1239,7 +1244,7 @@ TEST_F(RedisTest, server_concurrency) { brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; options.connection_type = "pooled"; - options.auth = new brpc::policy::RedisAuthenticator(password); + options.auth = redis_auth_holder.get(); std::vector bths; std::vector channels; for (int i = 0; i < N; ++i) { @@ -1311,21 +1316,28 @@ class MultiCommandHandler : public brpc::RedisCommandHandler { TEST_F(RedisTest, server_command_continue) { std::string password = GeneratePassword(); + std::unique_ptr redis_auth_holder( + new brpc::policy::RedisAuthenticator(password)); + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); + std::unique_ptr ah(new AuthCommandHandler(rsimpl)); + std::unique_ptr gh(new GetCommandHandler(rsimpl)); + std::unique_ptr sh(new SetCommandHandler(rsimpl)); + std::unique_ptr ih(new IncrCommandHandler(rsimpl)); + std::unique_ptr mh(new MultiCommandHandler); brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl(password); - rsimpl->AddCommandHandler("auth", new AuthCommandHandler(rsimpl)); - rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl)); - rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl)); - rsimpl->AddCommandHandler("incr", new IncrCommandHandler(rsimpl)); - rsimpl->AddCommandHandler("multi", new MultiCommandHandler); + rsimpl->AddCommandHandler("auth", ah.get()); + rsimpl->AddCommandHandler("get", gh.get()); + rsimpl->AddCommandHandler("set", sh.get()); + rsimpl->AddCommandHandler("incr", ih.get()); + rsimpl->AddCommandHandler("multi", mh.get()); server_options.redis_service = rsimpl; brpc::PortRange pr(8081, 8900); ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options)); brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; - options.auth = new brpc::policy::RedisAuthenticator(password); + options.auth = redis_auth_holder.get(); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); { @@ -1388,23 +1400,26 @@ TEST_F(RedisTest, server_command_continue) { TEST_F(RedisTest, server_handle_pipeline) { std::string password = GeneratePassword(); + std::unique_ptr redis_auth_holder( + new brpc::policy::RedisAuthenticator(password)); + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); + std::unique_ptr getch(new GetCommandHandler(rsimpl, true)); + std::unique_ptr setch(new SetCommandHandler(rsimpl, true)); + std::unique_ptr authch(new AuthCommandHandler(rsimpl)); + std::unique_ptr multich(new MultiCommandHandler); brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl(password); - GetCommandHandler* getch = new GetCommandHandler(rsimpl, true); - SetCommandHandler* setch = new SetCommandHandler(rsimpl, true); - AuthCommandHandler* authch = new AuthCommandHandler(rsimpl); - rsimpl->AddCommandHandler("auth", authch); - rsimpl->AddCommandHandler("get", getch); - rsimpl->AddCommandHandler("set", setch); - rsimpl->AddCommandHandler("multi", new MultiCommandHandler); + rsimpl->AddCommandHandler("auth", authch.get()); + rsimpl->AddCommandHandler("get", getch.get()); + rsimpl->AddCommandHandler("set", setch.get()); + rsimpl->AddCommandHandler("multi", multich.get()); server_options.redis_service = rsimpl; brpc::PortRange pr(8081, 8900); ASSERT_EQ(0, server.Start("127.0.0.1", pr, &server_options)); brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; - options.auth = new brpc::policy::RedisAuthenticator(password); + options.auth = redis_auth_holder.get(); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index 9851287411..a283ffcf59 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -28,6 +28,7 @@ #include "butil/time.h" #include "butil/macros.h" #include "butil/fd_utility.h" +#include "butil/debug/leak_annotations.h" #include #include "bthread/unstable.h" #include "bthread/task_control.h" @@ -209,7 +210,9 @@ class MyErrorMessage : public brpc::SocketMessage { explicit MyErrorMessage(const butil::Status& st) : _status(st) {} private: butil::Status AppendAndDestroySelf(butil::IOBuf*, brpc::Socket*) { - return _status; + butil::Status st = _status; + delete this; + return st; }; butil::Status _status; }; @@ -328,7 +331,9 @@ class MyConnect : public brpc::AppConnect { TEST_F(SocketTest, single_threaded_connect_and_write) { // FIXME(gejun): Messenger has to be new otherwise quitting may crash. + // It is intentionally never deleted; mark it so it is not a reported leak. brpc::Acceptor* messenger = new brpc::Acceptor; + ANNOTATE_LEAKING_OBJECT_PTR(messenger); const brpc::InputMessageHandler pairs[] = { { brpc::policy::ParseHuluMessage, EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" } @@ -659,7 +664,9 @@ TEST_F(SocketTest, app_level_health_check) { TEST_F(SocketTest, health_check) { // FIXME(gejun): Messenger has to be new otherwise quitting may crash. + // It is intentionally never deleted; mark it so it is not a reported leak. brpc::Acceptor* messenger = new brpc::Acceptor; + ANNOTATE_LEAKING_OBJECT_PTR(messenger); brpc::SocketId id = 8888; butil::EndPoint point(butil::IP_ANY, 7878); @@ -976,13 +983,14 @@ void* reader(void* void_arg) { ssize_t nr = read(arg->fd, buf, LEN); if (nr < 0) { printf("Fail to read, %m\n"); - return NULL; + break; } else if (nr == 0) { printf("Far end closed\n"); - return NULL; + break; } arg->nread += nr; } + free(buf); return NULL; } @@ -1233,7 +1241,9 @@ TEST_F(SocketTest, keepalive) { } TEST_F(SocketTest, keepalive_input_message) { + // It is intentionally never deleted; mark it so it is not a reported leak. brpc::Acceptor* messenger = new brpc::Acceptor; + ANNOTATE_LEAKING_OBJECT_PTR(messenger); int listening_fd = -1; butil::EndPoint point(butil::IP_ANY, 7878); for (int i = 0; i < 100; ++i) { @@ -1424,7 +1434,9 @@ void CheckTCPUserTimeout(int fd, int expect_tcp_user_timeout) { } TEST_F(SocketTest, tcp_user_timeout) { + // It is intentionally never deleted; mark it so it is not a reported leak. brpc::Acceptor* messenger = new brpc::Acceptor; + ANNOTATE_LEAKING_OBJECT_PTR(messenger); int listening_fd = -1; butil::EndPoint point(butil::IP_ANY, 7878); for (int i = 0; i < 100; ++i) { diff --git a/test/brpc_ssl_unittest.cpp b/test/brpc_ssl_unittest.cpp index 00fe705edb..6512eea4a6 100644 --- a/test/brpc_ssl_unittest.cpp +++ b/test/brpc_ssl_unittest.cpp @@ -339,6 +339,7 @@ void CheckCert(const char* cname, const char* cert) { std::vector cnames; brpc::ExtractHostnames(x509, &cnames); ASSERT_EQ(cert, cnames[0]) << x509; + X509_free(x509); } std::string GetRawPemString(const char* fname) { @@ -495,6 +496,11 @@ TEST_F(SSLTest, ssl_perf) { ASSERT_EQ(0, pthread_create(&spid, NULL, ssl_perf_server , serv_ssl)); ASSERT_EQ(0, pthread_join(cpid, NULL)); ASSERT_EQ(0, pthread_join(spid, NULL)); + + SSL_free(cli_ssl); + SSL_free(serv_ssl); + SSL_CTX_free(cli_ctx); + SSL_CTX_free(serv_ctx); close(clifd); close(servfd); } diff --git a/test/bthread_dispatcher_unittest.cpp b/test/bthread_dispatcher_unittest.cpp index 669d9c4eeb..411c392e62 100644 --- a/test/bthread_dispatcher_unittest.cpp +++ b/test/bthread_dispatcher_unittest.cpp @@ -118,9 +118,18 @@ void* epoll_thread(void* arg) { while (!server_stop) { #if defined(OS_LINUX) - const int n = epoll_wait(em->epfd, e, ARRAY_SIZE(e), -1); + // Use a finite timeout so the loop can observe server_stop without + // relying on an external fd to wake up epoll_wait. + const int n = epoll_wait(em->epfd, e, ARRAY_SIZE(e), 100); + if (n == 0) { + continue; + } #elif defined(OS_MACOSX) - const int n = kevent(em->epfd, NULL, 0, e, ARRAY_SIZE(e), NULL); + timespec ts = { 0, 100L * 1000L * 1000L }; + const int n = kevent(em->epfd, NULL, 0, e, ARRAY_SIZE(e), &ts); + if (n == 0) { + continue; + } #endif if (server_stop) { break; @@ -298,15 +307,9 @@ TEST(DispatcherTest, dispatch_tasks) { pthread_join(cth[i], NULL); } server_stop = true; + // epoll_thread polls server_stop with a finite timeout, so it exits on its + // own without needing an external fd to wake up epoll_wait. for (size_t i = 0; i < NEPOLL; ++i) { -#if defined(OS_LINUX) - epoll_event evt = { EPOLLOUT, { NULL } }; - ASSERT_EQ(0, epoll_ctl(epfd[i], EPOLL_CTL_ADD, 0, &evt)); -#elif defined(OS_MACOSX) - struct kevent kqueue_event; - EV_SET(&kqueue_event, 0, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, NULL); - ASSERT_EQ(0, kevent(epfd[i], &kqueue_event, 1, NULL, 0, NULL)); -#endif #ifdef RUN_EPOLL_IN_BTHREAD bthread_join(eth[i], NULL); #else @@ -315,5 +318,18 @@ TEST(DispatcherTest, dispatch_tasks) { } bthread::stop_and_join_epoll_threads(); bthread_usleep(100000); + + for (size_t i = 0; i < NCLIENT; ++i) { + free(sm[i]->buf); + delete sm[i]; + delete cm[i]; + } + for (size_t i = 0; i < NEPOLL; ++i) { + delete em[i]; + close(epfd[i]); + } + for (size_t i = 0; i < 2 * NCLIENT; ++i) { + close(fds[i]); + } } } // namespace diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp index fac6a4f2bd..1f68ffc4bf 100644 --- a/test/bthread_fd_unittest.cpp +++ b/test/bthread_fd_unittest.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "gperftools_helper.h" #include "butil/time.h" #include "butil/macros.h" @@ -252,7 +253,9 @@ TEST(FDTest, ping_pong) { #else pthread_t cth[NCLIENT]; #endif - ClientMeta* cm[NCLIENT]; + std::unique_ptr cm[NCLIENT]; + std::unique_ptr sm[NCLIENT]; + std::unique_ptr em_arr[NEPOLL]; for (size_t i = 0; i < NEPOLL; ++i) { #if defined(OS_LINUX) @@ -266,7 +269,8 @@ TEST(FDTest, ping_pong) { for (size_t i = 0; i < NCLIENT; ++i) { ASSERT_EQ(0, socketpair(AF_UNIX, SOCK_STREAM, 0, fds + 2 * i)); //printf("Created fd=%d,%d i=%lu\n", fds[2*i], fds[2*i+1], i); - SocketMeta* m = new SocketMeta; + sm[i].reset(new SocketMeta); + SocketMeta* m = sm[i].get(); m->fd = fds[i * 2]; m->epfd = epfd[fmix32(i) % NEPOLL]; ASSERT_EQ(0, fcntl(m->fd, F_SETFL, fcntl(m->fd, F_GETFL, 0) | O_NONBLOCK)); @@ -293,15 +297,15 @@ TEST(FDTest, ping_pong) { #elif defined(OS_MACOSX) ASSERT_EQ(0, kevent(m->epfd, &kqueue_event, 1, NULL, 0, NULL)); #endif - cm[i] = new ClientMeta; + cm[i].reset(new ClientMeta); cm[i]->fd = fds[i * 2 + 1]; cm[i]->count = i; cm[i]->times = REP; #ifdef RUN_CLIENT_IN_BTHREAD butil::make_non_blocking(cm[i]->fd); - ASSERT_EQ(0, bthread_start_urgent(&cth[i], NULL, client_thread, cm[i])); + ASSERT_EQ(0, bthread_start_urgent(&cth[i], NULL, client_thread, cm[i].get())); #else - ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i])); + ASSERT_EQ(0, pthread_create(&cth[i], NULL, client_thread, cm[i].get())); #endif } @@ -310,7 +314,8 @@ TEST(FDTest, ping_pong) { tm.start(); for (size_t i = 0; i < NEPOLL; ++i) { - EpollMeta *em = new EpollMeta; + em_arr[i].reset(new EpollMeta); + EpollMeta* em = em_arr[i].get(); em->epfd = epfd[i]; #ifdef RUN_EPOLL_IN_BTHREAD ASSERT_EQ(0, bthread_start_urgent(ð[i], epoll_thread, em, NULL); diff --git a/test/bthread_key_unittest.cpp b/test/bthread_key_unittest.cpp index 4319fb4180..92f4aacace 100644 --- a/test/bthread_key_unittest.cpp +++ b/test/bthread_key_unittest.cpp @@ -16,6 +16,7 @@ // under the License. #include // std::sort +#include // std::unique_ptr #include #include "butil/atomicops.h" #include @@ -447,7 +448,8 @@ static void usleep_thread_impl(PoolData2* data) { } static void* usleep_thread(void* args) { - usleep_thread_impl((PoolData2*)args); + std::unique_ptr data((PoolData2*)args); + usleep_thread_impl(data.get()); return NULL; } diff --git a/test/iobuf_unittest.cpp b/test/iobuf_unittest.cpp index 82112045ad..18d312ebce 100644 --- a/test/iobuf_unittest.cpp +++ b/test/iobuf_unittest.cpp @@ -1788,6 +1788,9 @@ TEST_F(IOBufTest, acquire_tls_block) { b = butil::iobuf::acquire_tls_block(); ASSERT_EQ(0, butil::iobuf::get_tls_block_count()); ASSERT_NE(butil::iobuf::block_cap(b), butil::iobuf::block_size(b)); + // acquire_tls_block() transfers ownership of a non-full block to the + // caller; return it to TLS so it is not leaked. + butil::iobuf::release_tls_block_chain(b); } TEST_F(IOBufTest, reserve_aligned) { diff --git a/test/object_pool_unittest.cpp b/test/object_pool_unittest.cpp index cfc891aff7..6ab321c4e3 100644 --- a/test/object_pool_unittest.cpp +++ b/test/object_pool_unittest.cpp @@ -184,9 +184,13 @@ TEST_F(ObjectPoolTest, get_int) { tm.stop(); printf("get a int takes %.1fns\n", tm.n_elapsed()/(double)N); + std::vector> new_ints; + new_ints.reserve(N); tm.start(); for (size_t i = 0; i < N; ++i) { - *(new int) = i; + int* pi = new int; + *pi = i; + new_ints.emplace_back(pi); } tm.stop(); printf("new a int takes %" PRId64 "ns\n", tm.n_elapsed()/N); diff --git a/test/run_tests.sh b/test/run_tests.sh index 510a2c70ba..ced0297686 100755 --- a/test/run_tests.sh +++ b/test/run_tests.sh @@ -23,10 +23,11 @@ test_num=0 failed_test="" rc=0 test_bins="test_butil test_bvar bthread*unittest brpc*unittest" +export ASAN_OPTIONS="detect_leaks=1:detect_stack_use_after_return=1" for test_bin in $test_bins; do test_num=$((test_num + 1)) >&2 echo "[runtest] $test_bin" - ASAN_OPTIONS="detect_leaks=0:detect_stack_use_after_return=1" ./$test_bin + ./$test_bin # If ASan abort without detailed call stack of new/delete, # try to disable fast_unwind_on_malloc, which would be a performance killer. # ASAN_OPTIONS="fast_unwind_on_malloc=0:detect_leaks=0" ./$test_bin diff --git a/test/thread_key_unittest.cpp b/test/thread_key_unittest.cpp index 758a6791ef..61b8518d9e 100644 --- a/test/thread_key_unittest.cpp +++ b/test/thread_key_unittest.cpp @@ -17,6 +17,7 @@ #include #include +#include #include "butil/thread_key.h" #include "butil/fast_rand.h" @@ -230,9 +231,12 @@ void* ThreadKeyFunc(void* arg) { auto thread_key_arg = (ThreadKeyArg*)arg; auto thread_keys = thread_key_arg->thread_keys; std::vector expects(thread_keys.size(), 0); + std::vector> owned_data; + owned_data.reserve(thread_keys.size()); for (auto key : thread_keys) { EXPECT_TRUE(butil::thread_getspecific(*key) == NULL); - EXPECT_EQ(0, butil::thread_setspecific(*key, new int(0))); + owned_data.emplace_back(new int(0)); + EXPECT_EQ(0, butil::thread_setspecific(*key, owned_data.back().get())); EXPECT_EQ(*(static_cast(butil::thread_getspecific(*key))), 0); } while (!g_stopped) { @@ -262,14 +266,17 @@ TEST(ThreadLocalTest, thread_key_multi_thread) { g_stopped = false; g_deleted = false; std::vector thread_keys; + std::vector> owned_data; int key_num = 20480; + owned_data.reserve(key_num); for (int i = 0; i < key_num; ++i) { thread_keys.push_back(new ThreadKey()); ASSERT_EQ(0, butil::thread_key_create(*thread_keys.back(), [](void* data) { delete static_cast(data); })); ASSERT_TRUE(butil::thread_getspecific(*thread_keys.back()) == NULL); - ASSERT_EQ(0, butil::thread_setspecific(*thread_keys.back(), new int(0))); + owned_data.emplace_back(new int(0)); + ASSERT_EQ(0, butil::thread_setspecific(*thread_keys.back(), owned_data.back().get())); ASSERT_EQ(*(static_cast(butil::thread_getspecific(*thread_keys.back()))), 0); } const int thread_num = 8;