aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKai Sommerfeld <kai.sommerfeld@gmx.com>2016-12-21 07:18:13 +0100
committerKai Sommerfeld <kai.sommerfeld@gmx.com>2016-12-24 14:52:18 +0100
commitd22bcb6cf8b145a1ade611a438bd739b3fe15599 (patch)
tree31b04c3020d726b0738bb534c6c64a5553239a8d
parentf8846290ebe25a1cf3ff0f2e2343e8eb440c093d (diff)
[PVR] Fix PVR manager job queue concurrency problems.
-rw-r--r--xbmc/pvr/PVRManager.cpp169
-rw-r--r--xbmc/pvr/PVRManager.h35
2 files changed, 116 insertions, 88 deletions
diff --git a/xbmc/pvr/PVRManager.cpp b/xbmc/pvr/PVRManager.cpp
index 5fde45a24f..5ea4d7f818 100644
--- a/xbmc/pvr/PVRManager.cpp
+++ b/xbmc/pvr/PVRManager.cpp
@@ -75,10 +75,87 @@ using namespace KODI::MESSAGING;
using KODI::MESSAGING::HELPERS::DialogResponse;
+CPVRManagerJobQueue::CPVRManagerJobQueue()
+: m_triggerEvent(false),
+m_bStopped(true)
+{
+}
+
+void CPVRManagerJobQueue::Start()
+{
+ CSingleLock lock(m_critSection);
+ m_bStopped = false;
+ m_triggerEvent.Set();
+}
+
+void CPVRManagerJobQueue::Stop()
+{
+ CSingleLock lock(m_critSection);
+ m_bStopped = true;
+ m_triggerEvent.Reset();
+}
+
+void CPVRManagerJobQueue::Clear()
+{
+ CSingleLock lock(m_critSection);
+ for (CJob *updateJob : m_pendingUpdates)
+ delete updateJob;
+
+ m_pendingUpdates.clear();
+ m_triggerEvent.Set();
+}
+
+void CPVRManagerJobQueue::AppendJob(CJob * job)
+{
+ CSingleLock lock(m_critSection);
+
+ // check for another pending job of given type...
+ for (CJob *updateJob : m_pendingUpdates)
+ {
+ if (!strcmp(updateJob->GetType(), job->GetType()))
+ {
+ delete job;
+ return;
+ }
+ }
+
+ m_pendingUpdates.push_back(job);
+ m_triggerEvent.Set();
+}
+
+void CPVRManagerJobQueue::ExecutePendingJobs()
+{
+ std::vector<CJob *> pendingUpdates;
+
+ {
+ CSingleLock lock(m_critSection);
+
+ if (m_bStopped)
+ return;
+
+ pendingUpdates = std::move(m_pendingUpdates);
+ m_triggerEvent.Reset();
+ }
+
+ CJob *job = nullptr;
+ while (!pendingUpdates.empty())
+ {
+ job = pendingUpdates.front();
+ pendingUpdates.erase(pendingUpdates.begin());
+
+ job->DoWork();
+ delete job;
+ }
+}
+
+bool CPVRManagerJobQueue::WaitForJobs(unsigned int milliSeconds)
+{
+ return m_triggerEvent.WaitMSec(milliSeconds);
+}
+
CPVRManager::CPVRManager(void) :
CThread("PVRManager"),
m_addons(new CPVRClients),
- m_triggerEvent(true),
m_currentFile(NULL),
m_bFirstStart(true),
m_bIsSwitchingChannels(false),
@@ -253,6 +330,10 @@ void CPVRManager::OnSettingAction(const CSetting *setting)
void CPVRManager::Clear(void)
{
+ g_application.UnregisterActionListener(&CPVRActionListener::GetInstance());
+
+ m_pendingUpdates.Clear();
+
CSingleLock lock(m_critSection);
m_guiInfo.reset();
@@ -261,22 +342,11 @@ void CPVRManager::Clear(void)
m_channelGroups.reset();
m_parentalTimer.reset();
m_database.reset();
- m_triggerEvent.Set();
m_currentFile = NULL;
m_bIsSwitchingChannels = false;
m_bEpgsCreated = false;
- for (unsigned int iJobPtr = 0; iJobPtr < m_pendingUpdates.size(); iJobPtr++)
- delete m_pendingUpdates.at(iJobPtr);
- m_pendingUpdates.clear();
-
- /* unregister application action listener */
- {
- CSingleExit exit(m_critSection);
- g_application.UnregisterActionListener(&CPVRActionListener::GetInstance());
- }
-
HideProgressDialog();
}
@@ -331,6 +401,8 @@ void CPVRManager::Start()
ResetProperties();
SetState(ManagerStateStarting);
+ m_pendingUpdates.Start();
+
m_database->Open();
/* create the pvrmanager thread, which will ensure that all data will be loaded */
@@ -346,6 +418,8 @@ void CPVRManager::Stop(void)
SetState(ManagerStateStopping);
+ m_pendingUpdates.Stop();
+
/* stop the EPG updater, since it might be using the pvr add-ons */
g_EpgContainer.Stop();
@@ -362,6 +436,7 @@ void CPVRManager::Stop(void)
SetState(ManagerStateInterrupted);
StopThread();
+
if (m_guiInfo)
m_guiInfo->Stop();
@@ -483,7 +558,7 @@ void CPVRManager::Process(void)
/* execute the next pending jobs if there are any */
try
{
- ExecutePendingJobs();
+ m_pendingUpdates.ExecutePendingJobs();
}
catch (...)
{
@@ -492,7 +567,7 @@ void CPVRManager::Process(void)
}
if (IsStarted() && !bRestart)
- m_triggerEvent.WaitMSec(1000);
+ m_pendingUpdates.WaitForJobs(1000);
}
if (IsStarted())
@@ -1532,11 +1607,7 @@ bool CPVRManager::PerformChannelSwitch(const CPVRChannelPtr &channel, bool bPrev
}
// announce OnStop and OnPlay. yes, this ain't pretty
- {
- CSingleLock lock(m_critSectionTriggers);
- m_pendingUpdates.push_back(new CPVRChannelSwitchJob(previousFile, m_currentFile));
- }
- m_triggerEvent.Set();
+ m_pendingUpdates.AppendJob(new CPVRChannelSwitchJob(previousFile, m_currentFile));
return bSwitched;
}
@@ -1788,60 +1859,29 @@ void CPVRManager::SearchMissingChannelIcons(void)
m_channelGroups->SearchMissingChannelIcons();
}
-bool CPVRManager::IsJobPending(const char *strJobName) const
-{
- bool bReturn(false);
- CSingleLock lock(m_critSectionTriggers);
- for (unsigned int iJobPtr = 0; IsStarted() && iJobPtr < m_pendingUpdates.size(); iJobPtr++)
- {
- if (!strcmp(m_pendingUpdates.at(iJobPtr)->GetType(), strJobName))
- {
- bReturn = true;
- break;
- }
- }
-
- return bReturn;
-}
-
-void CPVRManager::QueueJob(CJob *job)
-{
- CSingleLock lock(m_critSectionTriggers);
- if (!IsStarted() || IsJobPending(job->GetType()))
- {
- delete job;
- return;
- }
-
- m_pendingUpdates.push_back(job);
-
- lock.Leave();
- m_triggerEvent.Set();
-}
-
void CPVRManager::TriggerEpgsCreate(void)
{
- QueueJob(new CPVREpgsCreateJob());
+ m_pendingUpdates.AppendJob(new CPVREpgsCreateJob());
}
void CPVRManager::TriggerRecordingsUpdate(void)
{
- QueueJob(new CPVRRecordingsUpdateJob());
+ m_pendingUpdates.AppendJob(new CPVRRecordingsUpdateJob());
}
void CPVRManager::TriggerTimersUpdate(void)
{
- QueueJob(new CPVRTimersUpdateJob());
+ m_pendingUpdates.AppendJob(new CPVRTimersUpdateJob());
}
void CPVRManager::TriggerChannelsUpdate(void)
{
- QueueJob(new CPVRChannelsUpdateJob());
+ m_pendingUpdates.AppendJob(new CPVRChannelsUpdateJob());
}
void CPVRManager::TriggerChannelGroupsUpdate(void)
{
- QueueJob(new CPVRChannelGroupsUpdateJob());
+ m_pendingUpdates.AppendJob(new CPVRChannelGroupsUpdateJob());
}
void CPVRManager::TriggerSearchMissingChannelIcons(void)
@@ -1854,25 +1894,6 @@ void CPVRManager::ConnectionStateChange(CPVRClient *client, std::string connectS
CJobManager::GetInstance().AddJob(new CPVRClientConnectionJob(client, connectString, state, message), NULL);
}
-void CPVRManager::ExecutePendingJobs(void)
-{
- CSingleLock lock(m_critSectionTriggers);
-
- while (!m_pendingUpdates.empty())
- {
- CJob *job = m_pendingUpdates.at(0);
- m_pendingUpdates.erase(m_pendingUpdates.begin());
- lock.Leave();
-
- job->DoWork();
- delete job;
-
- lock.Enter();
- }
-
- m_triggerEvent.Reset();
-}
-
bool CPVRChannelSwitchJob::DoWork(void)
{
// announce OnStop and delete m_previous when done
diff --git a/xbmc/pvr/PVRManager.h b/xbmc/pvr/PVRManager.h
index 651adcf9b3..6efdccb053 100644
--- a/xbmc/pvr/PVRManager.h
+++ b/xbmc/pvr/PVRManager.h
@@ -90,6 +90,26 @@ namespace PVR
typedef std::shared_ptr<PVR::CPVRChannelGroup> CPVRChannelGroupPtr;
+ class CPVRManagerJobQueue
+ {
+ public:
+ CPVRManagerJobQueue();
+
+ void Start();
+ void Stop();
+ void Clear();
+
+ void AppendJob(CJob * job);
+ void ExecutePendingJobs();
+ bool WaitForJobs(unsigned int milliSeconds);
+
+ private:
+ CCriticalSection m_critSection;
+ CEvent m_triggerEvent;
+ std::vector<CJob *> m_pendingUpdates;
+ bool m_bStopped;
+ };
+
class CPVRManager : public ISettingCallback, private CThread, public Observable, public ANNOUNCEMENT::IAnnouncer
{
friend class CPVRClients;
@@ -635,17 +655,6 @@ private:
*/
bool ContinueLastChannel(void);
- void ExecutePendingJobs(void);
-
- bool IsJobPending(const char *strJobName) const;
-
- /*!
- * @brief Adds the job to the list of pending jobs unless an identical
- * job is already queued
- * @param job the job
- */
- void QueueJob(CJob *job);
-
enum ManagerState
{
ManagerStateError = 0,
@@ -673,9 +682,7 @@ private:
std::unique_ptr<CPVRGUIInfo> m_guiInfo; /*!< pointer to the guiinfo data */
//@}
- CCriticalSection m_critSectionTriggers; /*!< critical section for triggered updates */
- CEvent m_triggerEvent; /*!< triggers an update */
- std::vector<CJob *> m_pendingUpdates; /*!< vector of pending pvr updates */
+ CPVRManagerJobQueue m_pendingUpdates; /*!< vector of pending pvr updates */
CFileItem * m_currentFile; /*!< the PVR file that is currently playing */
CPVRDatabasePtr m_database; /*!< the database for all PVR related data */