Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -264,5 +264,6 @@ jobs:
|| { echo "ERROR: failed to override protobuf version in MODULE.bazel to ${TEST_PROTOBUF_VERSION}"; exit 1; }
- run: |
bazel test --action_env=CC=clang --config=rdma \
--define with_bthread_tracer=true \
--define with_babylon_counter=true \
//test/... --test_arg=--gtest_filter=-RdmaRpcTest.*
2 changes: 1 addition & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ void Controller::ResetNonPods() {
if (auto span = _span.lock()) {
Span::Submit(span, butil::cpuwide_time_us());
}
_span.reset();
_error_text.clear();
_remote_side = butil::EndPoint();
_local_side = butil::EndPoint();
Expand Down Expand Up @@ -240,7 +241,6 @@ void Controller::ResetNonPods() {
void Controller::ResetPods() {
// NOTE: Make the sequence of assignments same with the order that they're
// defined in header. Better for cpu cache and faster for lookup.
_span.reset();
_flags = 0;
#ifndef BAIDU_INTERNAL
set_pb_bytes_to_base64(true);
Expand Down
19 changes: 17 additions & 2 deletions src/brpc/global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <signal.h>

#include "butil/build_config.h" // OS_LINUX
#include "butil/debug/leak_annotations.h"
// Naming services
#ifdef BAIDU_INTERNAL
#include "brpc/policy/baidu_naming_service.h"
Expand Down Expand Up @@ -216,6 +217,14 @@ static int GetRunningServerCount(void*) {

// Update global stuff periodically.
static void* GlobalUpdate(void*) {
// This bthread runs for the whole process lifetime and never returns, so
// the local objects below live until the process exits and their
// destructors never run. They are reachable from this bthread's stack, so
// the objects themselves are not reported as leaks, but the heap buffers
// they allocate while exposing themselves (variable names, watched path)
// would be. Disable leak detection only around their construction and
// re-enable it right after.
ANNOTATE_MEMORY_LEAK_DISABLE();
// Expose variables.
bvar::PassiveStatus<int64_t> var_iobuf_block_count(
"iobuf_block_count", GetIOBufBlockCount, NULL);
Expand All @@ -232,7 +241,9 @@ static void* GlobalUpdate(void*) {
"rpc_server_count", GetRunningServerCount, NULL);

butil::FileWatcher fw;
if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) {
const int fw_rc = fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE);
ANNOTATE_MEMORY_LEAK_ENABLE();
if (fw_rc < 0) {
LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'";
return NULL;
}
Expand Down Expand Up @@ -270,7 +281,11 @@ static void* GlobalUpdate(void*) {
}
}

SocketMapList(&conns);
{
// See detail above.
ANNOTATE_SCOPED_MEMORY_LEAK;
SocketMapList(&conns);
}
const int64_t now_ms = butil::cpuwide_time_ms();
for (size_t i = 0; i < conns.size(); ++i) {
SocketUniquePtr ptr;
Expand Down
7 changes: 7 additions & 0 deletions src/brpc/rdma/block_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,10 +615,17 @@ void DestroyBlockPool() {
node = tmp;
}
g_info->idle_list[i][j] = NULL;
// Release the per-bucket mutexes allocated in InitBlockPool.
delete g_info->lock[i][j];
g_info->lock[i][j] = NULL;
}
}
delete g_info;
g_info = NULL;
delete g_dump_mutex;
g_dump_mutex = NULL;
delete g_tls_info_mutex;
g_tls_info_mutex = NULL;
for (int i = 0; i < g_region_num; ++i) {
if (g_regions[i].start == 0) {
break;
Expand Down
84 changes: 47 additions & 37 deletions src/brpc/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,107 +500,107 @@ Server::~Server() {

int Server::AddBuiltinServices() {
// Firstly add services shown in tabs.
if (AddBuiltinService(new (std::nothrow) StatusService)) {
if (AddBuiltinService(new StatusService)) {
LOG(ERROR) << "Fail to add StatusService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) VarsService)) {
if (AddBuiltinService(new VarsService)) {
LOG(ERROR) << "Fail to add VarsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) ConnectionsService)) {
if (AddBuiltinService(new ConnectionsService)) {
LOG(ERROR) << "Fail to add ConnectionsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) FlagsService)) {
if (AddBuiltinService(new FlagsService)) {
LOG(ERROR) << "Fail to add FlagsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) RpczService)) {
if (AddBuiltinService(new RpczService)) {
LOG(ERROR) << "Fail to add RpczService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) HotspotsService)) {
if (AddBuiltinService(new HotspotsService)) {
LOG(ERROR) << "Fail to add HotspotsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) IndexService)) {
if (AddBuiltinService(new IndexService)) {
LOG(ERROR) << "Fail to add IndexService";
return -1;
}

// Add other services.
if (AddBuiltinService(new (std::nothrow) VersionService(this))) {
if (AddBuiltinService(new VersionService(this))) {
LOG(ERROR) << "Fail to add VersionService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) HealthService)) {
if (AddBuiltinService(new HealthService)) {
LOG(ERROR) << "Fail to add HealthService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) ProtobufsService(this))) {
if (AddBuiltinService(new ProtobufsService(this))) {
LOG(ERROR) << "Fail to add ProtobufsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) BadMethodService)) {
if (AddBuiltinService(new BadMethodService)) {
LOG(ERROR) << "Fail to add BadMethodService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) ListService(this))) {
if (AddBuiltinService(new ListService(this))) {
LOG(ERROR) << "Fail to add ListService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) PrometheusMetricsService)) {
if (AddBuiltinService(new PrometheusMetricsService)) {
LOG(ERROR) << "Fail to add MetricsService";
return -1;
}
if (FLAGS_enable_threads_service &&
AddBuiltinService(new (std::nothrow) ThreadsService)) {
AddBuiltinService(new ThreadsService)) {
LOG(ERROR) << "Fail to add ThreadsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) MemoryService)) {
if (AddBuiltinService(new MemoryService)) {
LOG(ERROR) << "Fail to add MemoryService";
return -1;
}

#if !BRPC_WITH_GLOG
if (AddBuiltinService(new (std::nothrow) VLogService)) {
if (AddBuiltinService(new VLogService)) {
LOG(ERROR) << "Fail to add VLogService";
return -1;
}
#endif

if (AddBuiltinService(new (std::nothrow) PProfService)) {
if (AddBuiltinService(new PProfService)) {
LOG(ERROR) << "Fail to add PProfService";
return -1;
}
if (FLAGS_enable_dir_service &&
AddBuiltinService(new (std::nothrow) DirService)) {
AddBuiltinService(new DirService)) {
LOG(ERROR) << "Fail to add DirService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) BthreadsService)) {
if (AddBuiltinService(new BthreadsService)) {
LOG(ERROR) << "Fail to add BthreadsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) IdsService)) {
if (AddBuiltinService(new IdsService)) {
LOG(ERROR) << "Fail to add IdsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) SocketsService)) {
if (AddBuiltinService(new SocketsService)) {
LOG(ERROR) << "Fail to add SocketsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) GetFaviconService)) {
if (AddBuiltinService(new GetFaviconService)) {
LOG(ERROR) << "Fail to add GetFaviconService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) GetJsService)) {
if (AddBuiltinService(new GetJsService)) {
LOG(ERROR) << "Fail to add GetJsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) GrpcHealthCheckService)) {
if (AddBuiltinService(new GrpcHealthCheckService)) {
LOG(ERROR) << "Fail to add GrpcHealthCheckService";
return -1;
}
Expand Down Expand Up @@ -927,18 +927,20 @@ int Server::StartInternal(const butil::EndPoint& endpoint,
_session_local_data_pool->Reserve(_options.reserved_session_local_data);
}

// Leak of `_keytable_pool' and others is by design.
// See comments in Server::Join() for details.
// Instruct LeakSanitizer to ignore the designated memory leak.
ANNOTATE_SCOPED_MEMORY_LEAK;
// Init _keytable_pool always. If the server was stopped before, the pool
// should be destroyed in Join().
_keytable_pool = new bthread_keytable_pool_t;
if (bthread_keytable_pool_init(_keytable_pool) != 0) {
LOG(ERROR) << "Fail to init _keytable_pool";
delete _keytable_pool;
_keytable_pool = NULL;
return -1;
{
// Leak of `_keytable_pool' and others is by design.
// See comments in Server::Join() for details.
// Instruct LeakSanitizer to ignore the designated memory leak.
ANNOTATE_SCOPED_MEMORY_LEAK;
// Init _keytable_pool always. If the server was stopped before, the pool
// should be destroyed in Join().
_keytable_pool = new bthread_keytable_pool_t;
if (bthread_keytable_pool_init(_keytable_pool) != 0) {
LOG(ERROR) << "Fail to init _keytable_pool";
delete _keytable_pool;
_keytable_pool = NULL;
return -1;
}
}

if (_options.thread_local_data_factory) {
Expand Down Expand Up @@ -1635,7 +1637,15 @@ int Server::AddService(google::protobuf::Service* service,
int Server::AddBuiltinService(google::protobuf::Service* service) {
ServiceOptions options;
options.ownership = SERVER_OWNS_SERVICE;
return AddServiceInternal(service, true, options);
int rc = AddServiceInternal(service, true, options);
if (rc != 0) {
// AddServiceInternal does not take ownership of `service' on failure:
// for builtin services the only failure paths (name/fullname conflict)
// return before the service is inserted into any map. Delete it here to
// avoid leaking the object allocated by the caller.
delete service;
}
return rc;
}

void Server::RemoveMethodsOf(google::protobuf::Service* service) {
Expand Down
9 changes: 9 additions & 0 deletions src/brpc/socket_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "butil/time.h"
#include "butil/scoped_lock.h"
#include "butil/logging.h"
#include "butil/debug/leak_annotations.h"
#include "brpc/log.h"
#include "brpc/protocol.h"
#include "brpc/input_messenger.h"
Expand Down Expand Up @@ -384,11 +385,19 @@ void* SocketMap::RunWatchConnections(void* arg) {
}

void SocketMap::WatchConnections() {
// This bthread of SocketMap Singleton runs for the whole process lifetime and
// never returns, so the local objects below live until the process exits and
// their destructors never run. They are reachable from this bthread's stack,
// so the objects themselves are not reported as leaks, but the heap buffers
// they allocate while exposing themselves (variable names, watched path) would
// be. Disable leak detection only around their construction and re-enable it
// right after
std::vector<SocketId> main_sockets;
std::vector<SocketId> pooled_sockets;
std::vector<SocketMapKey> orphan_sockets;
const uint64_t CHECK_INTERVAL_US = 1000000UL;
while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
ANNOTATE_SCOPED_MEMORY_LEAK;
// NOTE: save the gflag which may be reloaded at any time.
Comment thread
chenBright marked this conversation as resolved.
const int idle_seconds = _options.idle_timeout_second_dynamic ?
*_options.idle_timeout_second_dynamic
Expand Down
5 changes: 1 addition & 4 deletions src/brpc/span.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,14 +337,11 @@ void Span::ResetServerSpanName(const std::string& full_method_name) {
void Span::submit(int64_t cpuwide_us) {
// Note: this method is not called for client-side spans.
EndAsParent();
SpanContainer* container = new(std::nothrow) SpanContainer(shared_from_this());
// If memory allocation fails, the server span will not be submitted for persistence.
// The server span will be destroyed later when its shared_ptr refcount drops to zero
// Child spans (held in _client_list) will also be destroyed when
// their refcounts reach zero.
if (container) {
container->submit(cpuwide_us);
}
(new SpanContainer(shared_from_this()))->submit(cpuwide_us);
}

void Span::Annotate(const char* fmt, ...) {
Expand Down
23 changes: 13 additions & 10 deletions src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,22 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
<< m->stat.cputime_ns / 1000000.0 << "ms";
}

// Clean up span if it exists. This must be done before keytable cleanup
// because span cleanup may use bthread local storage (e.g. logging,
// which allocates bthread-local stream arrays via bthread_setspecific).
// If span cleanup ran after keytable cleanup, such allocations would
// re-populate the keytable and never be reclaimed, causing memory leak.
LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
if (tls_bls_ptr->rpcz_parent_span && g_rpcz_parent_span_dtor) {
g_rpcz_parent_span_dtor(tls_bls_ptr->rpcz_parent_span);
tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
tls_bls_ptr->rpcz_parent_span = NULL;
m->local_storage.rpcz_parent_span = NULL;
}

// Clean tls variables, must be done before changing version_butex
// otherwise another thread just joined this thread may not see side
// effects of destructing tls variables.
LocalStorage* tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
KeyTable* kt = tls_bls_ptr->keytable;
if (kt != NULL) {
return_keytable(m->attr.keytable_pool, kt);
Expand All @@ -430,15 +442,6 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
m->local_storage.keytable = NULL; // optional
}

// Clean up span if it exists. This must be done after keytable cleanup
// because span cleanup may use bthread local storage.
tls_bls_ptr = BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(tls_bls);
if (tls_bls_ptr->rpcz_parent_span && g_rpcz_parent_span_dtor) {
g_rpcz_parent_span_dtor(tls_bls_ptr->rpcz_parent_span);
tls_bls_ptr->rpcz_parent_span = NULL;
m->local_storage.rpcz_parent_span = NULL;
}

// During running the function in TaskMeta and deleting the KeyTable in
// return_KeyTable, the group is probably changed.
g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
Expand Down
4 changes: 3 additions & 1 deletion src/bthread/timer_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ void TimerThread::run() {
}

void TimerThread::stop_and_join() {
_stop.store(true, butil::memory_order_relaxed);
if (_stop.exchange(true, butil::memory_order_relaxed)) {
return;
}
if (_started) {
{
BAIDU_SCOPED_LOCK(_mutex);
Expand Down
Loading
Loading