diff --git a/common/source/EventThread.cpp b/common/source/EventThread.cpp index 1ad60188b..08d5366f2 100644 --- a/common/source/EventThread.cpp +++ b/common/source/EventThread.cpp @@ -54,13 +54,13 @@ EventThread::EventThread(std::string threadName) : m_kThreadName(std::move(threa EventThread::~EventThread() { - std::unique_lock locker(m_lock); + { + std::unique_lock locker(m_lock); - m_shutdown = true; + m_shutdown = true; - m_cond.notify_all(); - - locker.unlock(); + m_cond.notify_all(); + } if (m_thread.joinable()) m_thread.join(); diff --git a/common/source/Timer.cpp b/common/source/Timer.cpp index ae109a692..6c377a340 100644 --- a/common/source/Timer.cpp +++ b/common/source/Timer.cpp @@ -59,15 +59,21 @@ Timer::Timer(const std::chrono::milliseconds &timeout, const std::function lock{m_mutex}; - if (!m_cv.wait_for(lock, m_timeout, [this]() { return !m_active; })) + bool Callback = false; { - if (m_active && m_callback) + std::unique_lock lock{m_mutex}; + if (!m_cv.wait_for(lock, m_timeout, [this]() { return !m_active; })) { - lock.unlock(); - m_callback(); + if (m_active && m_callback) + { + Callback = true; + } } } + if (Callback) + { + m_callback(); + } } while (timerType == TimerType::PERIODIC && m_active); m_active = false; }); diff --git a/ipc/client/source/IpcChannelImpl.cpp b/ipc/client/source/IpcChannelImpl.cpp index 85c6fea52..7b068b2b6 100644 --- a/ipc/client/source/IpcChannelImpl.cpp +++ b/ipc/client/source/IpcChannelImpl.cpp @@ -590,9 +590,6 @@ void ChannelImpl::processTimeoutEvent() updateTimeoutTimer(); } - // drop the lock and now terminate the timed out method calls - locker.unlock(); - for (auto &call : timedOuts) { completeWithError(&call, "Timed out"); @@ -736,9 +733,6 @@ void ChannelImpl::processReplyFromServer(const transport::MethodCallReply &reply // update the timeout timer now a method call has been processed updateTimeoutTimer(); - // can now drop the lock - locker.unlock(); - // this is an actual reply so try and read it if (!methodCall.response->ParseFromString(reply.reply_message())) { @@ -787,9 +781,6 @@ void ChannelImpl::processErrorFromServer(const transport::MethodCallError &error // update the timeout timer now a method call has been processed updateTimeoutTimer(); - // can now drop the lock - locker.unlock(); - RIALTO_IPC_LOG_DEBUG("error{ serial %" PRIu64 " } - %s", kSerialId, error.error_reason().c_str()); // complete the call with an error @@ -1162,12 +1153,10 @@ void ChannelImpl::CallMethod(const google::protobuf::MethodDescriptor *method, / if (m_sock < 0) { - locker.unlock(); completeWithError(&methodCall, "Not connected"); } else if (sendmsg(m_sock, header, MSG_NOSIGNAL) != static_cast(kRequiredDataLen)) { - locker.unlock(); completeWithError(&methodCall, "Failed to send message"); } else diff --git a/ipc/server/source/IpcServerImpl.cpp b/ipc/server/source/IpcServerImpl.cpp index f6cd2da73..230e67227 100644 --- a/ipc/server/source/IpcServerImpl.cpp +++ b/ipc/server/source/IpcServerImpl.cpp @@ -616,37 +616,41 @@ void ServerImpl::processNewConnection(uint64_t socketId) { RIALTO_IPC_LOG_DEBUG("processing new connection"); - std::unique_lock socketLocker(m_socketsLock); - - // find matching socket object - auto it = m_sockets.find(socketId); - if (it == m_sockets.end()) + int clientSock = 0; + std::string SockPath; + std::function &)> connectedCb; + std::function &)> disconnectedCb; { - RIALTO_IPC_LOG_ERROR("failed to find listening socket with id %" PRIu64, socketId); - return; - } + std::unique_lock socketLocker(m_socketsLock); - const Socket &kSocket = it->second; + // find matching socket object + auto it = m_sockets.find(socketId); + if (it == m_sockets.end()) + { + RIALTO_IPC_LOG_ERROR("failed to find listening socket with id %" PRIu64, socketId); + return; + } - // accept the connection from the client - struct sockaddr clientAddr = {0}; - socklen_t clientAddrLen = sizeof(clientAddr); + const Socket &kSocket = it->second; - int clientSock = accept4(kSocket.sockFd, &clientAddr, &clientAddrLen, SOCK_NONBLOCK | SOCK_CLOEXEC); - if (clientSock < 0) - { - RIALTO_IPC_LOG_SYS_ERROR(errno, "failed to accept client connection"); - return; - } + // accept the connection from the client + struct sockaddr clientAddr = {0}; + socklen_t clientAddrLen = sizeof(clientAddr); - const std::string kSockPath = kSocket.sockPath; - std::function &)> connectedCb = kSocket.connectedCb; - std::function &)> disconnectedCb = kSocket.disconnectedCb; + clientSock = accept4(kSocket.sockFd, &clientAddr, &clientAddrLen, SOCK_NONBLOCK | SOCK_CLOEXEC); + if (clientSock < 0) + { + RIALTO_IPC_LOG_SYS_ERROR(errno, "failed to accept client connection"); + return; + } - socketLocker.unlock(); + SockPath = kSocket.sockPath; + connectedCb = kSocket.connectedCb; + disconnectedCb = kSocket.disconnectedCb; + } // attempt to add the socket to the client list - auto client = addClientSocket(clientSock, kSockPath, std::move(disconnectedCb)); + auto client = addClientSocket(clientSock, SockPath, std::move(disconnectedCb)); if (!client) { close(clientSock); @@ -736,31 +740,32 @@ static std::vector readMessageFds(const struct msghdr *msg, size */ void ServerImpl::processClientSocket(uint64_t clientId, unsigned events) { - // take the lock while accessing the client list - std::unique_lock locker(m_clientsLock); - - auto it = m_clients.find(clientId); - if (it == m_clients.end()) + int SockFd = 0; + std::shared_ptr clientObj = nullptr; { - // should never happen - RIALTO_IPC_LOG_ERROR("received an event from a socket with no matching client"); - return; - } + // take the lock while accessing the client list + std::unique_lock locker(m_clientsLock); - // check if the client is marked for closure, if so then just ignore the data - if (m_condemnedClients.count(clientId) != 0) - { - return; - } + auto it = m_clients.find(clientId); + if (it == m_clients.end()) + { + // should never happen + RIALTO_IPC_LOG_ERROR("received an event from a socket with no matching client"); + return; + } - // get the socket that corresponds to the client connection - const int kSockFd = it->second.sock; + // check if the client is marked for closure, if so then just ignore the data + if (m_condemnedClients.count(clientId) != 0) + { + return; + } - // get the client object - std::shared_ptr clientObj = it->second.client; + // get the socket that corresponds to the client connection + SockFd = it->second.sock; - // can safely release the lock now we have the clientId and client object - locker.unlock(); + // get the client object + clientObj = it->second.client; + } // if there was an error disconnect the socket if (events & EPOLLERR) @@ -786,7 +791,7 @@ void ServerImpl::processClientSocket(uint64_t clientId, unsigned events) msg.msg_controllen = sizeof(m_recvCtrlBuf); // read one message - ssize_t rd = TEMP_FAILURE_RETRY(recvmsg(kSockFd, &msg, MSG_CMSG_CLOEXEC)); + ssize_t rd = TEMP_FAILURE_RETRY(recvmsg(SockFd, &msg, MSG_CMSG_CLOEXEC)); if (rd < 0) { if (errno != EWOULDBLOCK) @@ -1303,9 +1308,10 @@ bool ServerImpl::isClientConnected(uint64_t clientId) const */ void ServerImpl::disconnectClient(uint64_t clientId) { - std::unique_lock locker(m_clientsLock); - m_condemnedClients.insert(clientId); - locker.unlock(); + { + std::unique_lock locker(m_clientsLock); + m_condemnedClients.insert(clientId); + } wakeEventLoop(); } @@ -1407,8 +1413,6 @@ bool ServerImpl::sendEvent(uint64_t clientId, const std::shared_ptrGetTypeName().c_str(), eventMessage->ShortDebugString().c_str()); diff --git a/media/client/main/source/MediaPipeline.cpp b/media/client/main/source/MediaPipeline.cpp index 41731d60b..271b2d4af 100644 --- a/media/client/main/source/MediaPipeline.cpp +++ b/media/client/main/source/MediaPipeline.cpp @@ -545,12 +545,18 @@ bool MediaPipeline::flush(int32_t sourceId, bool resetTime, bool &async) { RIALTO_CLIENT_LOG_DEBUG("entry:"); - std::unique_lock flushLock{m_flushMutex}; - if (m_mediaPipelineIpc->flush(sourceId, resetTime, async)) + bool clearData = false; { - m_attachedSources.setFlushing(sourceId, true); - flushLock.unlock(); + std::unique_lock flushLock{m_flushMutex}; + if (m_mediaPipelineIpc->flush(sourceId, resetTime, async)) + { + m_attachedSources.setFlushing(sourceId, true); + clearData = true; + } + } + if (clearData) + { // Clear all need datas for flushed source std::lock_guard lock{m_needDataRequestMapMutex}; for (auto it = m_needDataRequestMap.begin(); it != m_needDataRequestMap.end();) diff --git a/media/server/gstplayer/source/GstGenericPlayer.cpp b/media/server/gstplayer/source/GstGenericPlayer.cpp index 1e6f9f0f7..c24f0d10c 100644 --- a/media/server/gstplayer/source/GstGenericPlayer.cpp +++ b/media/server/gstplayer/source/GstGenericPlayer.cpp @@ -1371,7 +1371,7 @@ bool GstGenericPlayer::setSyncOff() if (m_glibWrapper->gObjectClassFindProperty(G_OBJECT_GET_CLASS(decoder), "sync-off")) { - gboolean syncOffGboolean{decoder ? TRUE : FALSE}; + gboolean syncOffGboolean{syncOff ? TRUE : FALSE}; m_glibWrapper->gObjectSet(decoder, "sync-off", syncOffGboolean, nullptr); result = true; } diff --git a/media/server/main/source/MediaKeySession.cpp b/media/server/main/source/MediaKeySession.cpp index 414856cd8..11016a4e9 100644 --- a/media/server/main/source/MediaKeySession.cpp +++ b/media/server/main/source/MediaKeySession.cpp @@ -387,7 +387,7 @@ void MediaKeySession::onProcessChallenge(const char url[], const uint8_t challen { std::string urlStr = url; std::vector challengeVec = std::vector{challenge, challenge + challengeLength}; - auto task = [&, urlStr, challengeVec]() + auto task = [&, urlStr = std::move(urlStr), challengeVec = std::move(challengeVec)]() { std::shared_ptr client = m_mediaKeysClient.lock(); if (client) @@ -409,7 +409,7 @@ void MediaKeySession::onProcessChallenge(const char url[], const uint8_t challen void MediaKeySession::onKeyUpdated(const uint8_t keyId[], const uint8_t keyIdLength) { std::vector keyIdVec = std::vector{keyId, keyId + keyIdLength}; - auto task = [&, keyIdVec]() + auto task = [&, keyIdVec = std::move(keyIdVec)]() { std::shared_ptr client = m_mediaKeysClient.lock(); if (client)