Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions common/source/EventThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ EventThread::EventThread(std::string threadName) : m_kThreadName(std::move(threa

EventThread::~EventThread()
{
std::unique_lock<std::mutex> locker(m_lock);
{
std::unique_lock<std::mutex> locker(m_lock);

m_shutdown = true;
m_shutdown = true;

m_cond.notify_all();

locker.unlock();
m_cond.notify_all();
}

if (m_thread.joinable())
m_thread.join();
Expand Down
16 changes: 11 additions & 5 deletions common/source/Timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@ Timer::Timer(const std::chrono::milliseconds &timeout, const std::function<void(
{
do
{
std::unique_lock<std::mutex> lock{m_mutex};
if (!m_cv.wait_for(lock, m_timeout, [this]() { return !m_active; }))
bool Callback = false;
{
if (m_active && m_callback)
std::unique_lock<std::mutex> lock{m_mutex};
if (!m_cv.wait_for(lock, m_timeout, [this]() { return !m_active; }))
{
lock.unlock();
m_callback();
if (m_active && m_callback)
{
Callback = true;
}
}
}
if (Callback)
{
m_callback();
}
} while (timerType == TimerType::PERIODIC && m_active);
m_active = false;
});
Expand Down
11 changes: 0 additions & 11 deletions ipc/client/source/IpcChannelImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,6 @@ void ChannelImpl::processTimeoutEvent()
updateTimeoutTimer();
}

// drop the lock and now terminate the timed out method calls
locker.unlock();

for (auto &call : timedOuts)
{
completeWithError(&call, "Timed out");
Expand Down Expand Up @@ -736,9 +733,6 @@ void ChannelImpl::processReplyFromServer(const transport::MethodCallReply &reply
// update the timeout timer now a method call has been processed
updateTimeoutTimer();

// can now drop the lock
locker.unlock();

// this is an actual reply so try and read it
if (!methodCall.response->ParseFromString(reply.reply_message()))
{
Expand Down Expand Up @@ -787,9 +781,6 @@ void ChannelImpl::processErrorFromServer(const transport::MethodCallError &error
// update the timeout timer now a method call has been processed
updateTimeoutTimer();

// can now drop the lock
locker.unlock();

RIALTO_IPC_LOG_DEBUG("error{ serial %" PRIu64 " } - %s", kSerialId, error.error_reason().c_str());

// complete the call with an error
Expand Down Expand Up @@ -1162,12 +1153,10 @@ void ChannelImpl::CallMethod(const google::protobuf::MethodDescriptor *method, /

if (m_sock < 0)
{
locker.unlock();
completeWithError(&methodCall, "Not connected");
}
else if (sendmsg(m_sock, header, MSG_NOSIGNAL) != static_cast<ssize_t>(kRequiredDataLen))
{
locker.unlock();
completeWithError(&methodCall, "Failed to send message");
}
else
Expand Down
102 changes: 53 additions & 49 deletions ipc/server/source/IpcServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,37 +616,41 @@ void ServerImpl::processNewConnection(uint64_t socketId)
{
RIALTO_IPC_LOG_DEBUG("processing new connection");

std::unique_lock<std::mutex> socketLocker(m_socketsLock);

// find matching socket object
auto it = m_sockets.find(socketId);
if (it == m_sockets.end())
int clientSock = 0;
std::string SockPath;
std::function<void(const std::shared_ptr<IClient> &)> connectedCb;
std::function<void(const std::shared_ptr<IClient> &)> disconnectedCb;
{
RIALTO_IPC_LOG_ERROR("failed to find listening socket with id %" PRIu64, socketId);
return;
}
std::unique_lock<std::mutex> socketLocker(m_socketsLock);

const Socket &kSocket = it->second;
// find matching socket object
auto it = m_sockets.find(socketId);
if (it == m_sockets.end())
{
RIALTO_IPC_LOG_ERROR("failed to find listening socket with id %" PRIu64, socketId);
return;
}

// accept the connection from the client
struct sockaddr clientAddr = {0};
socklen_t clientAddrLen = sizeof(clientAddr);
const Socket &kSocket = it->second;

int clientSock = accept4(kSocket.sockFd, &clientAddr, &clientAddrLen, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (clientSock < 0)
{
RIALTO_IPC_LOG_SYS_ERROR(errno, "failed to accept client connection");
return;
}
// accept the connection from the client
struct sockaddr clientAddr = {0};
socklen_t clientAddrLen = sizeof(clientAddr);

const std::string kSockPath = kSocket.sockPath;
std::function<void(const std::shared_ptr<IClient> &)> connectedCb = kSocket.connectedCb;
std::function<void(const std::shared_ptr<IClient> &)> disconnectedCb = kSocket.disconnectedCb;
clientSock = accept4(kSocket.sockFd, &clientAddr, &clientAddrLen, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (clientSock < 0)
{
RIALTO_IPC_LOG_SYS_ERROR(errno, "failed to accept client connection");
return;
}

socketLocker.unlock();
SockPath = kSocket.sockPath;
connectedCb = kSocket.connectedCb;
disconnectedCb = kSocket.disconnectedCb;
}

// attempt to add the socket to the client list
auto client = addClientSocket(clientSock, kSockPath, std::move(disconnectedCb));
auto client = addClientSocket(clientSock, SockPath, std::move(disconnectedCb));
if (!client)
{
close(clientSock);
Expand Down Expand Up @@ -736,31 +740,32 @@ static std::vector<FileDescriptor> readMessageFds(const struct msghdr *msg, size
*/
void ServerImpl::processClientSocket(uint64_t clientId, unsigned events)
{
// take the lock while accessing the client list
std::unique_lock<std::mutex> locker(m_clientsLock);

auto it = m_clients.find(clientId);
if (it == m_clients.end())
int SockFd = 0;
std::shared_ptr<ClientImpl> clientObj = nullptr;
{
// should never happen
RIALTO_IPC_LOG_ERROR("received an event from a socket with no matching client");
return;
}
// take the lock while accessing the client list
std::unique_lock<std::mutex> locker(m_clientsLock);

// check if the client is marked for closure, if so then just ignore the data
if (m_condemnedClients.count(clientId) != 0)
{
return;
}
auto it = m_clients.find(clientId);
if (it == m_clients.end())
{
// should never happen
RIALTO_IPC_LOG_ERROR("received an event from a socket with no matching client");
return;
}

// get the socket that corresponds to the client connection
const int kSockFd = it->second.sock;
// check if the client is marked for closure, if so then just ignore the data
if (m_condemnedClients.count(clientId) != 0)
{
return;
}

// get the client object
std::shared_ptr<ClientImpl> clientObj = it->second.client;
// get the socket that corresponds to the client connection
SockFd = it->second.sock;

// can safely release the lock now we have the clientId and client object
locker.unlock();
// get the client object
clientObj = it->second.client;
}

// if there was an error disconnect the socket
if (events & EPOLLERR)
Expand All @@ -786,7 +791,7 @@ void ServerImpl::processClientSocket(uint64_t clientId, unsigned events)
msg.msg_controllen = sizeof(m_recvCtrlBuf);

// read one message
ssize_t rd = TEMP_FAILURE_RETRY(recvmsg(kSockFd, &msg, MSG_CMSG_CLOEXEC));
ssize_t rd = TEMP_FAILURE_RETRY(recvmsg(SockFd, &msg, MSG_CMSG_CLOEXEC));
if (rd < 0)
{
if (errno != EWOULDBLOCK)
Expand Down Expand Up @@ -1303,9 +1308,10 @@ bool ServerImpl::isClientConnected(uint64_t clientId) const
*/
void ServerImpl::disconnectClient(uint64_t clientId)
{
std::unique_lock<std::mutex> locker(m_clientsLock);
m_condemnedClients.insert(clientId);
locker.unlock();
{
std::unique_lock<std::mutex> locker(m_clientsLock);
m_condemnedClients.insert(clientId);
}

wakeEventLoop();
}
Expand Down Expand Up @@ -1407,8 +1413,6 @@ bool ServerImpl::sendEvent(uint64_t clientId, const std::shared_ptr<google::prot
return false;
}

locker.unlock();

RIALTO_IPC_LOG_DEBUG("event{ %s } - { %s }", eventMessage->GetTypeName().c_str(),
eventMessage->ShortDebugString().c_str());

Expand Down
14 changes: 10 additions & 4 deletions media/client/main/source/MediaPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,18 @@ bool MediaPipeline::flush(int32_t sourceId, bool resetTime, bool &async)
{
RIALTO_CLIENT_LOG_DEBUG("entry:");

std::unique_lock<std::mutex> flushLock{m_flushMutex};
if (m_mediaPipelineIpc->flush(sourceId, resetTime, async))
bool clearData = false;
{
m_attachedSources.setFlushing(sourceId, true);
flushLock.unlock();
std::unique_lock<std::mutex> flushLock{m_flushMutex};
if (m_mediaPipelineIpc->flush(sourceId, resetTime, async))
{
m_attachedSources.setFlushing(sourceId, true);
clearData = true;
}
}

if (clearData)
{
// Clear all need datas for flushed source
std::lock_guard<std::mutex> lock{m_needDataRequestMapMutex};
for (auto it = m_needDataRequestMap.begin(); it != m_needDataRequestMap.end();)
Expand Down
2 changes: 1 addition & 1 deletion media/server/gstplayer/source/GstGenericPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ bool GstGenericPlayer::setSyncOff()

if (m_glibWrapper->gObjectClassFindProperty(G_OBJECT_GET_CLASS(decoder), "sync-off"))
{
gboolean syncOffGboolean{decoder ? TRUE : FALSE};
gboolean syncOffGboolean{syncOff ? TRUE : FALSE};
m_glibWrapper->gObjectSet(decoder, "sync-off", syncOffGboolean, nullptr);
result = true;
}
Expand Down
4 changes: 2 additions & 2 deletions media/server/main/source/MediaKeySession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ void MediaKeySession::onProcessChallenge(const char url[], const uint8_t challen
{
std::string urlStr = url;
std::vector<unsigned char> challengeVec = std::vector<unsigned char>{challenge, challenge + challengeLength};
auto task = [&, urlStr, challengeVec]()
auto task = [&, urlStr = std::move(urlStr), challengeVec = std::move(challengeVec)]()
{
std::shared_ptr<IMediaKeysClient> client = m_mediaKeysClient.lock();
if (client)
Expand All @@ -409,7 +409,7 @@ void MediaKeySession::onProcessChallenge(const char url[], const uint8_t challen
void MediaKeySession::onKeyUpdated(const uint8_t keyId[], const uint8_t keyIdLength)
{
std::vector<unsigned char> keyIdVec = std::vector<unsigned char>{keyId, keyId + keyIdLength};
auto task = [&, keyIdVec]()
auto task = [&, keyIdVec = std::move(keyIdVec)]()
{
std::shared_ptr<IMediaKeysClient> client = m_mediaKeysClient.lock();
if (client)
Expand Down
Loading