diff options
author | Kai Sommerfeld <kai.sommerfeld@gmx.com> | 2016-12-21 07:18:13 +0100 |
---|---|---|
committer | Kai Sommerfeld <kai.sommerfeld@gmx.com> | 2016-12-24 14:52:18 +0100 |
commit | d22bcb6cf8b145a1ade611a438bd739b3fe15599 (patch) | |
tree | 31b04c3020d726b0738bb534c6c64a5553239a8d | |
parent | f8846290ebe25a1cf3ff0f2e2343e8eb440c093d (diff) |
[PVR] Fix PVR manager job queue concurrency problems.
-rw-r--r-- | xbmc/pvr/PVRManager.cpp | 169 | ||||
-rw-r--r-- | xbmc/pvr/PVRManager.h | 35 |
2 files changed, 116 insertions, 88 deletions
diff --git a/xbmc/pvr/PVRManager.cpp b/xbmc/pvr/PVRManager.cpp index 5fde45a24f..5ea4d7f818 100644 --- a/xbmc/pvr/PVRManager.cpp +++ b/xbmc/pvr/PVRManager.cpp @@ -75,10 +75,87 @@ using namespace KODI::MESSAGING; using KODI::MESSAGING::HELPERS::DialogResponse; +CPVRManagerJobQueue::CPVRManagerJobQueue() +: m_triggerEvent(false), +m_bStopped(true) +{ +} + +void CPVRManagerJobQueue::Start() +{ + CSingleLock lock(m_critSection); + m_bStopped = false; + m_triggerEvent.Set(); +} + +void CPVRManagerJobQueue::Stop() +{ + CSingleLock lock(m_critSection); + m_bStopped = true; + m_triggerEvent.Reset(); +} + +void CPVRManagerJobQueue::Clear() +{ + CSingleLock lock(m_critSection); + for (CJob *updateJob : m_pendingUpdates) + delete updateJob; + + m_pendingUpdates.clear(); + m_triggerEvent.Set(); +} + +void CPVRManagerJobQueue::AppendJob(CJob * job) +{ + CSingleLock lock(m_critSection); + + // check for another pending job of given type... + for (CJob *updateJob : m_pendingUpdates) + { + if (!strcmp(updateJob->GetType(), job->GetType())) + { + delete job; + return; + } + } + + m_pendingUpdates.push_back(job); + m_triggerEvent.Set(); +} + +void CPVRManagerJobQueue::ExecutePendingJobs() +{ + std::vector<CJob *> pendingUpdates; + + { + CSingleLock lock(m_critSection); + + if (m_bStopped) + return; + + pendingUpdates = std::move(m_pendingUpdates); + m_triggerEvent.Reset(); + } + + CJob *job = nullptr; + while (!pendingUpdates.empty()) + { + job = pendingUpdates.front(); + pendingUpdates.erase(pendingUpdates.begin()); + + job->DoWork(); + delete job; + } +} + +bool CPVRManagerJobQueue::WaitForJobs(unsigned int milliSeconds) +{ + return m_triggerEvent.WaitMSec(milliSeconds); +} + CPVRManager::CPVRManager(void) : CThread("PVRManager"), m_addons(new CPVRClients), - m_triggerEvent(true), m_currentFile(NULL), m_bFirstStart(true), m_bIsSwitchingChannels(false), @@ -253,6 +330,10 @@ void CPVRManager::OnSettingAction(const CSetting *setting) void CPVRManager::Clear(void) { + g_application.UnregisterActionListener(&CPVRActionListener::GetInstance()); + + m_pendingUpdates.Clear(); + CSingleLock lock(m_critSection); m_guiInfo.reset(); @@ -261,22 +342,11 @@ void CPVRManager::Clear(void) m_channelGroups.reset(); m_parentalTimer.reset(); m_database.reset(); - m_triggerEvent.Set(); m_currentFile = NULL; m_bIsSwitchingChannels = false; m_bEpgsCreated = false; - for (unsigned int iJobPtr = 0; iJobPtr < m_pendingUpdates.size(); iJobPtr++) - delete m_pendingUpdates.at(iJobPtr); - m_pendingUpdates.clear(); - - /* unregister application action listener */ - { - CSingleExit exit(m_critSection); - g_application.UnregisterActionListener(&CPVRActionListener::GetInstance()); - } - HideProgressDialog(); } @@ -331,6 +401,8 @@ void CPVRManager::Start() ResetProperties(); SetState(ManagerStateStarting); + m_pendingUpdates.Start(); + m_database->Open(); /* create the pvrmanager thread, which will ensure that all data will be loaded */ @@ -346,6 +418,8 @@ void CPVRManager::Stop(void) SetState(ManagerStateStopping); + m_pendingUpdates.Stop(); + /* stop the EPG updater, since it might be using the pvr add-ons */ g_EpgContainer.Stop(); @@ -362,6 +436,7 @@ void CPVRManager::Stop(void) SetState(ManagerStateInterrupted); StopThread(); + if (m_guiInfo) m_guiInfo->Stop(); @@ -483,7 +558,7 @@ void CPVRManager::Process(void) /* execute the next pending jobs if there are any */ try { - ExecutePendingJobs(); + m_pendingUpdates.ExecutePendingJobs(); } catch (...) { @@ -492,7 +567,7 @@ void CPVRManager::Process(void) } if (IsStarted() && !bRestart) - m_triggerEvent.WaitMSec(1000); + m_pendingUpdates.WaitForJobs(1000); } if (IsStarted()) @@ -1532,11 +1607,7 @@ bool CPVRManager::PerformChannelSwitch(const CPVRChannelPtr &channel, bool bPrev } // announce OnStop and OnPlay. yes, this ain't pretty - { - CSingleLock lock(m_critSectionTriggers); - m_pendingUpdates.push_back(new CPVRChannelSwitchJob(previousFile, m_currentFile)); - } - m_triggerEvent.Set(); + m_pendingUpdates.AppendJob(new CPVRChannelSwitchJob(previousFile, m_currentFile)); return bSwitched; } @@ -1788,60 +1859,29 @@ void CPVRManager::SearchMissingChannelIcons(void) m_channelGroups->SearchMissingChannelIcons(); } -bool CPVRManager::IsJobPending(const char *strJobName) const -{ - bool bReturn(false); - CSingleLock lock(m_critSectionTriggers); - for (unsigned int iJobPtr = 0; IsStarted() && iJobPtr < m_pendingUpdates.size(); iJobPtr++) - { - if (!strcmp(m_pendingUpdates.at(iJobPtr)->GetType(), strJobName)) - { - bReturn = true; - break; - } - } - - return bReturn; -} - -void CPVRManager::QueueJob(CJob *job) -{ - CSingleLock lock(m_critSectionTriggers); - if (!IsStarted() || IsJobPending(job->GetType())) - { - delete job; - return; - } - - m_pendingUpdates.push_back(job); - - lock.Leave(); - m_triggerEvent.Set(); -} - void CPVRManager::TriggerEpgsCreate(void) { - QueueJob(new CPVREpgsCreateJob()); + m_pendingUpdates.AppendJob(new CPVREpgsCreateJob()); } void CPVRManager::TriggerRecordingsUpdate(void) { - QueueJob(new CPVRRecordingsUpdateJob()); + m_pendingUpdates.AppendJob(new CPVRRecordingsUpdateJob()); } void CPVRManager::TriggerTimersUpdate(void) { - QueueJob(new CPVRTimersUpdateJob()); + m_pendingUpdates.AppendJob(new CPVRTimersUpdateJob()); } void CPVRManager::TriggerChannelsUpdate(void) { - QueueJob(new CPVRChannelsUpdateJob()); + m_pendingUpdates.AppendJob(new CPVRChannelsUpdateJob()); } void CPVRManager::TriggerChannelGroupsUpdate(void) { - QueueJob(new CPVRChannelGroupsUpdateJob()); + m_pendingUpdates.AppendJob(new CPVRChannelGroupsUpdateJob()); } void CPVRManager::TriggerSearchMissingChannelIcons(void) @@ -1854,25 +1894,6 @@ void CPVRManager::ConnectionStateChange(CPVRClient *client, std::string connectS CJobManager::GetInstance().AddJob(new CPVRClientConnectionJob(client, connectString, state, message), NULL); } -void CPVRManager::ExecutePendingJobs(void) -{ - CSingleLock lock(m_critSectionTriggers); - - while (!m_pendingUpdates.empty()) - { - CJob *job = m_pendingUpdates.at(0); - m_pendingUpdates.erase(m_pendingUpdates.begin()); - lock.Leave(); - - job->DoWork(); - delete job; - - lock.Enter(); - } - - m_triggerEvent.Reset(); -} - bool CPVRChannelSwitchJob::DoWork(void) { // announce OnStop and delete m_previous when done diff --git a/xbmc/pvr/PVRManager.h b/xbmc/pvr/PVRManager.h index 651adcf9b3..6efdccb053 100644 --- a/xbmc/pvr/PVRManager.h +++ b/xbmc/pvr/PVRManager.h @@ -90,6 +90,26 @@ namespace PVR typedef std::shared_ptr<PVR::CPVRChannelGroup> CPVRChannelGroupPtr; + class CPVRManagerJobQueue + { + public: + CPVRManagerJobQueue(); + + void Start(); + void Stop(); + void Clear(); + + void AppendJob(CJob * job); + void ExecutePendingJobs(); + bool WaitForJobs(unsigned int milliSeconds); + + private: + CCriticalSection m_critSection; + CEvent m_triggerEvent; + std::vector<CJob *> m_pendingUpdates; + bool m_bStopped; + }; + class CPVRManager : public ISettingCallback, private CThread, public Observable, public ANNOUNCEMENT::IAnnouncer { friend class CPVRClients; @@ -635,17 +655,6 @@ private: */ bool ContinueLastChannel(void); - void ExecutePendingJobs(void); - - bool IsJobPending(const char *strJobName) const; - - /*! - * @brief Adds the job to the list of pending jobs unless an identical - * job is already queued - * @param job the job - */ - void QueueJob(CJob *job); - enum ManagerState { ManagerStateError = 0, @@ -673,9 +682,7 @@ private: std::unique_ptr<CPVRGUIInfo> m_guiInfo; /*!< pointer to the guiinfo data */ //@} - CCriticalSection m_critSectionTriggers; /*!< critical section for triggered updates */ - CEvent m_triggerEvent; /*!< triggers an update */ - std::vector<CJob *> m_pendingUpdates; /*!< vector of pending pvr updates */ + CPVRManagerJobQueue m_pendingUpdates; /*!< vector of pending pvr updates */ CFileItem * m_currentFile; /*!< the PVR file that is currently playing */ CPVRDatabasePtr m_database; /*!< the database for all PVR related data */ |