aboutsummaryrefslogtreecommitdiff
path: root/src/utils/JobManager.h
blob: 1df2287207364d7f23bcf711fb93d91856b54c61 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#pragma once
/*
 *      Copyright (C) 2005-2013 Team XBMC
 *      http://xbmc.org
 *
 *  This Program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2, or (at your option)
 *  any later version.
 *
 *  This Program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with XBMC; see the file COPYING.  If not, see
 *  <http://www.gnu.org/licenses/>.
 *
 */

#include <queue>
#include <vector>
#include <string>
#include "threads/CriticalSection.h"
#include "threads/Thread.h"
#include "Job.h"

class CJobManager;

class CJobWorker : public CThread
{
public:
  CJobWorker(CJobManager *manager);
  virtual ~CJobWorker();

  void Process();
private:
  CJobManager  *m_jobManager;
};

/*!
 \ingroup jobs
 \brief Job Queue class to handle a queue of unique jobs to be processed sequentially

 Holds a queue of jobs to be processed sequentially, either first in,first out
 or last in, first out.  Jobs are unique, so queueing multiple copies of the same job
 (based on the CJob::operator==) will not add additional jobs.

 Classes should subclass this class and override OnJobCallback should they require
 information from the job.

 \sa CJob and IJobCallback
 */
class CJobQueue: public IJobCallback
{
  class CJobPointer
  {
  public:
    CJobPointer(CJob *job)
    {
      m_job = job;
      m_id = 0;
    };
    void CancelJob();
    void FreeJob()
    {
      delete m_job;
      m_job = NULL;
    };
    bool operator==(const CJob *job) const
    {
      if (m_job)
        return *m_job == job;
      return false;
    };
    CJob *m_job;
    unsigned int m_id;
  };
public:
  /*!
   \brief CJobQueue constructor
   \param lifo whether the queue should be processed last in first out or first in first out.  Defaults to false (first in first out)
   \param jobsAtOnce number of jobs at once to process.  Defaults to 1.
   \param priority priority of this queue.
   \sa CJob
   */
  CJobQueue(bool lifo = false, unsigned int jobsAtOnce = 1, CJob::PRIORITY priority = CJob::PRIORITY_LOW);

  /*!
   \brief CJobQueue destructor
   Cancels any in-process jobs, and destroys the job queue.
   \sa CJob
   */
  virtual ~CJobQueue();

  /*!
   \brief Add a job to the queue
   On completion of the job (or destruction of the job queue) the CJob object will be destroyed.
   \param job a pointer to the job to add. The job should be subclassed from CJob.
   \sa CJob
   */
  void AddJob(CJob *job);

  /*!
   \brief Cancel a job in the queue
   Cancels a job in the queue. Any job currently being processed may complete after this
   call has completed, but OnJobComplete will not be performed. If the job is only queued
   then it will be removed from the queue and deleted.
   \param job a pointer to the job to cancel. The job should be subclassed from CJob.
   \sa CJob
   */
  void CancelJob(const CJob *job);

  /*!
   \brief Cancel all jobs in the queue
   Removes all jobs from the queue. Any job currently being processed may complete after this
   call has completed, but OnJobComplete will not be performed.
   \sa CJob
   */
  void CancelJobs();

  /*!
   \brief The callback used when a job completes.

   OnJobComplete is called at the completion of the CJob::DoWork function, and is used
   to return information to the caller on the result of the job.  On returning from this function
   the CJobManager will destroy this job.

   Subclasses should override this function if they wish to transfer information from the job prior
   to it's deletion.  They must then call this base class function, which will move on to the next
   job.

   \sa CJobManager, IJobCallback and  CJob
   */
  virtual void OnJobComplete(unsigned int jobID, bool success, CJob *job);

protected:
  /*!
   \brief Returns if we still have jobs waiting to be processed
   NOTE: This function does not take into account the jobs that are currently processing 
   */
  bool QueueEmpty() const;
  
private:
  void QueueNextJob();

  typedef std::deque<CJobPointer> Queue;
  typedef std::vector<CJobPointer> Processing;
  Queue m_jobQueue;
  Processing m_processing;

  unsigned int m_jobsAtOnce;
  CJob::PRIORITY m_priority;
  CCriticalSection m_section;
  bool m_lifo;
};

/*!
 \ingroup jobs
 \brief Job Manager class for scheduling asynchronous jobs.

 Controls asynchronous job execution, by allowing clients to add and cancel jobs.
 Should be accessed via CJobManager::GetInstance().  Jobs are allocated based on
 priority levels.  Lower priority jobs are executed only if there are sufficient
 spare worker threads free to allow for higher priority jobs that may arise.

 \sa CJob and IJobCallback
 */
class CJobManager
{
  class CWorkItem
  {
  public:
    CWorkItem(CJob *job, unsigned int id, CJob::PRIORITY priority, IJobCallback *callback)
    {
      m_job = job;
      m_id = id;
      m_callback = callback;
      m_priority = priority;
    }
    bool operator==(unsigned int jobID) const
    {
      return m_id == jobID;
    };
    bool operator==(const CJob *job) const
    {
      return m_job == job;
    };
    void FreeJob()
    {
      delete m_job;
      m_job = NULL;
    };
    void Cancel()
    {
      m_callback = NULL;
    };
    CJob         *m_job;
    unsigned int  m_id;
    IJobCallback *m_callback;
    CJob::PRIORITY m_priority;
  };

public:
  /*!
   \brief The only way through which the global instance of the CJobManager should be accessed.
   \return the global instance.
   */
  static CJobManager &GetInstance();

  /*!
   \brief Add a job to the threaded job manager.
   \param job a pointer to the job to add. The job should be subclassed from CJob
   \param callback a pointer to an IJobCallback instance to receive job progress and completion notices.
   \param priority the priority that this job should run at.
   \return a unique identifier for this job, to be used with other interaction
   \sa CJob, IJobCallback, CancelJob()
   */
  unsigned int AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority = CJob::PRIORITY_LOW);

  /*!
   \brief Cancel a job with the given id.
   \param jobID the id of the job to cancel, retrieved previously from AddJob()
   \sa AddJob()
   */
  void CancelJob(unsigned int jobID);

  /*!
   \brief Cancel all remaining jobs, preparing for shutdown
   Should be called prior to destroying any objects that may be being used as callbacks
   \sa CancelJob(), AddJob()
   */
  void CancelJobs();

  /*!
   \brief Re-start accepting jobs again
   Called after calling CancelJobs() to allow this manager to accept more jobs
   \throws std::logic_error if the manager was not previously cancelled
   \sa CancelJobs()
   */
  void Restart();

  /*!
   \brief Checks to see if any jobs of a specific type are currently processing.
   \param type Job type to search for
   \return Number of matching jobs
   */
  int IsProcessing(const std::string &type) const;

  /*!
   \brief Suspends queueing of jobs with priority PRIORITY_LOW_PAUSABLE until unpaused
   Useful to (for ex) stop queuing thumb jobs during video start/playback.
   Does not affect currently processing jobs, use IsProcessing to see if any need to be waited on
   \sa UnPauseJobs()
   */
  void PauseJobs();

  /*!
   \brief Resumes queueing of (previously paused) jobs with priority PRIORITY_LOW_PAUSABLE
   \sa PauseJobs()
   */
  void UnPauseJobs();

  /*!
   \brief Checks to see if any jobs with specific priority are currently processing.
   \param priority to search for
   \return true if processing jobs, else returns false
   */
  bool IsProcessing(const CJob::PRIORITY &priority) const;

protected:
  friend class CJobWorker;
  friend class CJob;

  /*!
   \brief Get a new job to process. Blocks until a new job is available, or a timeout has occurred.
   \param worker a pointer to the current CJobWorker instance requesting a job.
   \sa CJob
   */
  CJob *GetNextJob(const CJobWorker *worker);

  /*!
   \brief Callback from CJobWorker after a job has completed.
   Calls IJobCallback::OnJobComplete(), and then destroys job.
   \param job a pointer to the calling subclassed CJob instance.
   \param success the result from the DoWork call
   \sa IJobCallback, CJob
   */
  void  OnJobComplete(bool success, CJob *job);

  /*!
   \brief Callback from CJob to report progress and check for cancellation.
   Checks for cancellation, and calls IJobCallback::OnJobProgress().
   \param progress amount of processing performed to date, out of total.
   \param total total amount of processing.
   \param job pointer to the calling subclassed CJob instance.
   \return true if the job has been cancelled, else returns false.
   \sa IJobCallback, CJob
   */
  bool  OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const;

private:
  // private construction, and no assignements; use the provided singleton methods
  CJobManager();
  CJobManager(const CJobManager&);
  CJobManager const& operator=(CJobManager const&);
  virtual ~CJobManager();

  /*! \brief Pop a job off the job queue and add to the processing queue ready to process
   \return the job to process, NULL if no jobs are available
   */
  CJob *PopJob();

  void StartWorkers(CJob::PRIORITY priority);
  void RemoveWorker(const CJobWorker *worker);
  static unsigned int GetMaxWorkers(CJob::PRIORITY priority);

  unsigned int m_jobCounter;

  typedef std::deque<CWorkItem>    JobQueue;
  typedef std::vector<CWorkItem>   Processing;
  typedef std::vector<CJobWorker*> Workers;

  JobQueue   m_jobQueue[CJob::PRIORITY_HIGH+1];
  bool       m_pauseJobs;
  Processing m_processing;
  Workers    m_workers;

  CCriticalSection m_section;
  CEvent           m_jobEvent;
  bool             m_running;
};