Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/source/EventThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ EventThread::EventThread(std::string threadName) : m_kThreadName(std::move(threa

EventThread::~EventThread()
{
{
std::unique_lock<std::mutex> locker(m_lock);

m_shutdown = true;

m_cond.notify_all();

locker.unlock();
}

if (m_thread.joinable())
m_thread.join();
Expand Down
34 changes: 26 additions & 8 deletions common/source/LinuxUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,23 @@ constexpr gid_t kNoGroupChange = -1; // -1 means chown() won't change the group
uid_t getFileOwnerId(const std::string &fileOwner)
{
uid_t ownerId = kNoOwnerChange;
const size_t kBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX);
if (!fileOwner.empty() && kBufferSize > 0)
long buffersize = sysconf(_SC_GETPW_R_SIZE_MAX);
size_t BufferSize = 0;
if (buffersize == -1)
{
RIALTO_COMMON_LOG_SYS_WARN(errno, "Invalid Buffer Size '%s'", fileOwner.c_str());
}
if (buffersize > 0)
{
BufferSize = static_cast<size_t>(buffersize);
}
if (!fileOwner.empty() && BufferSize > 0)
{
errno = 0;
passwd passwordStruct{};
passwd *passwordResult = nullptr;
char buffer[kBufferSize];
int result = getpwnam_r(fileOwner.c_str(), &passwordStruct, buffer, kBufferSize, &passwordResult);
std::vector<char> buffer(BufferSize);
int result = getpwnam_r(fileOwner.c_str(), &passwordStruct, buffer.data(), BufferSize, &passwordResult);
if (result == 0 && passwordResult)
{
ownerId = passwordResult->pw_uid;
Expand All @@ -55,14 +64,23 @@ uid_t getFileOwnerId(const std::string &fileOwner)
gid_t getFileGroupId(const std::string &fileGroup)
{
gid_t groupId = kNoGroupChange;
const size_t kBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX);
if (!fileGroup.empty() && kBufferSize > 0)
long buffersize = sysconf(_SC_GETPW_R_SIZE_MAX);
size_t BufferSize = 0;
if (buffersize == -1)
{
RIALTO_COMMON_LOG_SYS_WARN(errno, "Invalid Buffer Size '%s'", fileGroup.c_str());
}
if (buffersize > 0)
{
BufferSize = static_cast<size_t>(buffersize);
}
if (!fileGroup.empty() && BufferSize > 0)
{
errno = 0;
group groupStruct{};
group *groupResult = nullptr;
char buffer[kBufferSize];
int result = getgrnam_r(fileGroup.c_str(), &groupStruct, buffer, kBufferSize, &groupResult);
std::vector<char> buffer(BufferSize);
int result = getgrnam_r(fileGroup.c_str(), &groupStruct, buffer.data(), BufferSize, &groupResult);
if (result == 0 && groupResult)
{
groupId = groupResult->gr_gid;
Expand Down
10 changes: 8 additions & 2 deletions common/source/Timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,21 @@ Timer::Timer(const std::chrono::milliseconds &timeout, const std::function<void(
{
do
{
bool Callback = false;
{
std::unique_lock<std::mutex> lock{m_mutex};
if (!m_cv.wait_for(lock, m_timeout, [this]() { return !m_active; }))
{
if (m_active && m_callback)
{
lock.unlock();
m_callback();
Callback = true;
}
}
}
if (Callback)
{
m_callback();
}
} while (timerType == TimerType::PERIODIC && m_active);
m_active = false;
});
Expand Down
11 changes: 0 additions & 11 deletions ipc/client/source/IpcChannelImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,6 @@ void ChannelImpl::processTimeoutEvent()
updateTimeoutTimer();
}

// drop the lock and now terminate the timed out method calls
locker.unlock();

for (auto &call : timedOuts)
{
completeWithError(&call, "Timed out");
Expand Down Expand Up @@ -736,9 +733,6 @@ void ChannelImpl::processReplyFromServer(const transport::MethodCallReply &reply
// update the timeout timer now a method call has been processed
updateTimeoutTimer();

// can now drop the lock
locker.unlock();

// this is an actual reply so try and read it
if (!methodCall.response->ParseFromString(reply.reply_message()))
{
Expand Down Expand Up @@ -787,9 +781,6 @@ void ChannelImpl::processErrorFromServer(const transport::MethodCallError &error
// update the timeout timer now a method call has been processed
updateTimeoutTimer();

// can now drop the lock
locker.unlock();

RIALTO_IPC_LOG_DEBUG("error{ serial %" PRIu64 " } - %s", kSerialId, error.error_reason().c_str());

// complete the call with an error
Expand Down Expand Up @@ -1162,12 +1153,10 @@ void ChannelImpl::CallMethod(const google::protobuf::MethodDescriptor *method, /

if (m_sock < 0)
{
locker.unlock();
completeWithError(&methodCall, "Not connected");
}
else if (sendmsg(m_sock, header, MSG_NOSIGNAL) != static_cast<ssize_t>(kRequiredDataLen))
{
locker.unlock();
completeWithError(&methodCall, "Failed to send message");
}
else
Expand Down
34 changes: 26 additions & 8 deletions ipc/common/source/NamedSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,23 @@ bool NamedSocket::getSocketLock()
uid_t NamedSocket::getSocketOwnerId(const std::string &socketOwner) const
{
uid_t ownerId = kNoOwnerChange;
const size_t kBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX);
if (!socketOwner.empty() && kBufferSize > 0)
long buffersize = sysconf(_SC_GETPW_R_SIZE_MAX);
size_t BufferSize = 0;
if (buffersize == -1)
{
RIALTO_IPC_LOG_SYS_WARN(errno, "Invalid Buffer Size '%s'", socketOwner.c_str());
}
if (buffersize > 0)
{
BufferSize = static_cast<size_t>(buffersize);
}
if (!socketOwner.empty() && BufferSize > 0)
{
errno = 0;
passwd passwordStruct{};
passwd *passwordResult = nullptr;
char buffer[kBufferSize];
int result = getpwnam_r(socketOwner.c_str(), &passwordStruct, buffer, kBufferSize, &passwordResult);
std::vector<char> buffer(BufferSize);
int result = getpwnam_r(socketOwner.c_str(), &passwordStruct, buffer.data(), BufferSize, &passwordResult);
if (result == 0 && passwordResult)
{
ownerId = passwordResult->pw_uid;
Expand All @@ -291,14 +300,23 @@ uid_t NamedSocket::getSocketOwnerId(const std::string &socketOwner) const
gid_t NamedSocket::getSocketGroupId(const std::string &socketGroup) const
{
gid_t groupId = kNoGroupChange;
const size_t kBufferSize = sysconf(_SC_GETPW_R_SIZE_MAX);
if (!socketGroup.empty() && kBufferSize > 0)
long buffersize = sysconf(_SC_GETPW_R_SIZE_MAX);
size_t BufferSize = 0;
if (buffersize == -1)
{
RIALTO_IPC_LOG_SYS_WARN(errno, "Invalid Buffer Size '%s'", socketGroup.c_str());
}
if (buffersize > 0)
{
BufferSize = static_cast<size_t>(buffersize);
}
if (!socketGroup.empty() && BufferSize > 0)
{
errno = 0;
group groupStruct{};
group *groupResult = nullptr;
char buffer[kBufferSize];
int result = getgrnam_r(socketGroup.c_str(), &groupStruct, buffer, kBufferSize, &groupResult);
std::vector<char> buffer(BufferSize);
int result = getgrnam_r(socketGroup.c_str(), &groupStruct, buffer.data(), BufferSize, &groupResult);
if (result == 0 && groupResult)
{
groupId = groupResult->gr_gid;
Expand Down
44 changes: 24 additions & 20 deletions ipc/server/source/IpcServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ bool ServerImpl::addSocket(const std::string &socketPath,
}

// store the client connected / disconnected callbacks
socket.connectedCb = clientConnectedCb;
socket.disconnectedCb = clientDisconnectedCb;
socket.connectedCb = std::move(clientConnectedCb);
socket.disconnectedCb = std::move(clientDisconnectedCb);

// add to the internal map
std::lock_guard<std::mutex> locker(m_socketsLock);
Expand Down Expand Up @@ -323,8 +323,8 @@ bool ServerImpl::addSocket(int fd, std::function<void(const std::shared_ptr<ICli
}

// store the client connected / disconnected callbacks
socket.connectedCb = clientConnectedCb;
socket.disconnectedCb = clientDisconnectedCb;
socket.connectedCb = std::move(clientConnectedCb);
socket.disconnectedCb = std::move(clientDisconnectedCb);

// add to the internal map
std::lock_guard<std::mutex> locker(m_socketsLock);
Expand Down Expand Up @@ -616,6 +616,11 @@ void ServerImpl::processNewConnection(uint64_t socketId)
{
RIALTO_IPC_LOG_DEBUG("processing new connection");

int clientSock = 0;
std::string SockPath;
std::function<void(const std::shared_ptr<IClient> &)> connectedCb;
std::function<void(const std::shared_ptr<IClient> &)> disconnectedCb;
{
std::unique_lock<std::mutex> socketLocker(m_socketsLock);

// find matching socket object
Expand All @@ -632,21 +637,20 @@ void ServerImpl::processNewConnection(uint64_t socketId)
struct sockaddr clientAddr = {0};
socklen_t clientAddrLen = sizeof(clientAddr);

int clientSock = accept4(kSocket.sockFd, &clientAddr, &clientAddrLen, SOCK_NONBLOCK | SOCK_CLOEXEC);
clientSock = accept4(kSocket.sockFd, &clientAddr, &clientAddrLen, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (clientSock < 0)
{
RIALTO_IPC_LOG_SYS_ERROR(errno, "failed to accept client connection");
return;
}

const std::string kSockPath = kSocket.sockPath;
std::function<void(const std::shared_ptr<IClient> &)> connectedCb = kSocket.connectedCb;
std::function<void(const std::shared_ptr<IClient> &)> disconnectedCb = kSocket.disconnectedCb;

socketLocker.unlock();
SockPath = kSocket.sockPath;
connectedCb = kSocket.connectedCb;
disconnectedCb = kSocket.disconnectedCb;
}

// attempt to add the socket to the client list
auto client = addClientSocket(clientSock, kSockPath, std::move(disconnectedCb));
auto client = addClientSocket(clientSock, SockPath, std::move(disconnectedCb));
if (!client)
{
close(clientSock);
Expand Down Expand Up @@ -736,6 +740,9 @@ static std::vector<FileDescriptor> readMessageFds(const struct msghdr *msg, size
*/
void ServerImpl::processClientSocket(uint64_t clientId, unsigned events)
{
int SockFd = 0;
std::shared_ptr<ClientImpl> clientObj = nullptr;
{
// take the lock while accessing the client list
std::unique_lock<std::mutex> locker(m_clientsLock);

Expand All @@ -754,13 +761,11 @@ void ServerImpl::processClientSocket(uint64_t clientId, unsigned events)
}

// get the socket that corresponds to the client connection
const int kSockFd = it->second.sock;
SockFd = it->second.sock;

// get the client object
std::shared_ptr<ClientImpl> clientObj = it->second.client;

// can safely release the lock now we have the clientId and client object
locker.unlock();
clientObj = it->second.client;
}

// if there was an error disconnect the socket
if (events & EPOLLERR)
Expand All @@ -786,7 +791,7 @@ void ServerImpl::processClientSocket(uint64_t clientId, unsigned events)
msg.msg_controllen = sizeof(m_recvCtrlBuf);

// read one message
ssize_t rd = TEMP_FAILURE_RETRY(recvmsg(kSockFd, &msg, MSG_CMSG_CLOEXEC));
ssize_t rd = TEMP_FAILURE_RETRY(recvmsg(SockFd, &msg, MSG_CMSG_CLOEXEC));
if (rd < 0)
{
if (errno != EWOULDBLOCK)
Expand Down Expand Up @@ -1303,9 +1308,10 @@ bool ServerImpl::isClientConnected(uint64_t clientId) const
*/
void ServerImpl::disconnectClient(uint64_t clientId)
{
{
std::unique_lock<std::mutex> locker(m_clientsLock);
m_condemnedClients.insert(clientId);
locker.unlock();
}

wakeEventLoop();
}
Expand Down Expand Up @@ -1407,8 +1413,6 @@ bool ServerImpl::sendEvent(uint64_t clientId, const std::shared_ptr<google::prot
return false;
}

locker.unlock();

RIALTO_IPC_LOG_DEBUG("event{ %s } - { %s }", eventMessage->GetTypeName().c_str(),
eventMessage->ShortDebugString().c_str());

Expand Down
2 changes: 1 addition & 1 deletion logging/source/EnvVariableParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ std::vector<std::string> split(std::string s, const std::string &delimiter)
result.push_back(s.substr(0, pos));
s.erase(0, pos + delimiter.length());
}
result.push_back(s);
result.push_back(std::move(s));
return result;
}

Expand Down
2 changes: 1 addition & 1 deletion media/client/ipc/include/MediaPipelineIpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class MediaPipelineIpc : public IMediaPipelineIpc, public IpcModule

bool setVideoWindow(uint32_t x, uint32_t y, uint32_t width, uint32_t height) override;

bool play() override;
bool play(bool &async) override;

bool pause() override;

Expand Down
4 changes: 3 additions & 1 deletion media/client/ipc/interface/IMediaPipelineIpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ class IMediaPipelineIpc
/**
* @brief Request play on the playback session.
*
* @param[out] async : True if play method call is asynchronous
*
* @retval true on success.
*/
virtual bool play() = 0;
virtual bool play(bool &async) = 0;

/**
* @brief Request pause on the playback session.
Expand Down
2 changes: 1 addition & 1 deletion media/client/ipc/source/IpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ std::shared_ptr<ipc::IBlockingClosure> IpcClient::createBlockingClosure()
// check which thread we're being called from, this determines if we pump
// event loop from within the wait() method or not
if (m_ipcThread.get_id() == std::this_thread::get_id())
return m_blockingClosureFactory->createBlockingClosurePoll(ipcChannel);
return m_blockingClosureFactory->createBlockingClosurePoll(std::move(ipcChannel));
else
return m_blockingClosureFactory->createBlockingClosureSemaphore();
}
Expand Down
4 changes: 3 additions & 1 deletion media/client/ipc/source/MediaPipelineIpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ bool MediaPipelineIpc::setVideoWindow(uint32_t x, uint32_t y, uint32_t width, ui
return true;
}

bool MediaPipelineIpc::play()
bool MediaPipelineIpc::play(bool &async)
{
if (!reattachChannelIfRequired())
{
Expand All @@ -375,6 +375,8 @@ bool MediaPipelineIpc::play()
return false;
}

async = response.async();

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion media/client/main/include/MediaPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class MediaPipeline : public IMediaPipelineAndIControlClient, public IMediaPipel

bool allSourcesAttached() override;

bool play() override;
bool play(bool &async) override;

bool pause() override;

Expand Down
2 changes: 1 addition & 1 deletion media/client/main/include/MediaPipelineProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MediaPipelineProxy : public IMediaPipelineAndIControlClient

bool allSourcesAttached() override { return m_mediaPipeline->allSourcesAttached(); }

bool play() override { return m_mediaPipeline->play(); }
bool play(bool &async) override { return m_mediaPipeline->play(async); }

bool pause() override { return m_mediaPipeline->pause(); }

Expand Down
2 changes: 1 addition & 1 deletion media/client/main/source/ClientController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void ClientController::changeStateAndNotifyClients(ApplicationState state)
std::shared_ptr<IControlClient> clientLocked{client.lock()};
if (clientLocked)
{
currentClients.push_back(clientLocked);
currentClients.push_back(std::move(clientLocked));
}
else
{
Expand Down
Loading
Loading