1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381
  
     | 
    
      /*
 *  Copyright (C) 2005-2018 Team Kodi
 *  This file is part of Kodi - https://kodi.tv
 *
 *  SPDX-License-Identifier: GPL-2.0-or-later
 *  See LICENSES/README.md for more information.
 */
#pragma once
#include "Job.h"
#include "threads/CriticalSection.h"
#include "threads/Thread.h"
#include <queue>
#include <string>
#include <vector>
class CJobManager;
class CJobWorker : public CThread
{
public:
  explicit CJobWorker(CJobManager *manager);
  ~CJobWorker() override;
  void Process() override;
private:
  CJobManager  *m_jobManager;
};
template<typename F>
class CLambdaJob : public CJob
{
public:
  CLambdaJob(F&& f) : m_f(std::forward<F>(f)) {}
  bool DoWork() override
  {
    m_f();
    return true;
  }
  bool operator==(const CJob *job) const override
  {
    return this == job;
  };
private:
  F m_f;
};
/*!
 \ingroup jobs
 \brief Job Queue class to handle a queue of unique jobs to be processed sequentially
 Holds a queue of jobs to be processed sequentially, either first in,first out
 or last in, first out.  Jobs are unique, so queueing multiple copies of the same job
 (based on the CJob::operator==) will not add additional jobs.
 Classes should subclass this class and override OnJobCallback should they require
 information from the job.
 \sa CJob and IJobCallback
 */
class CJobQueue: public IJobCallback
{
  class CJobPointer
  {
  public:
    explicit CJobPointer(CJob *job)
    {
      m_job = job;
      m_id = 0;
    };
    void CancelJob();
    void FreeJob()
    {
      delete m_job;
      m_job = NULL;
    };
    bool operator==(const CJob *job) const
    {
      if (m_job)
        return *m_job == job;
      return false;
    };
    CJob *m_job;
    unsigned int m_id;
  };
public:
  /*!
   \brief CJobQueue constructor
   \param lifo whether the queue should be processed last in first out or first in first out.  Defaults to false (first in first out)
   \param jobsAtOnce number of jobs at once to process.  Defaults to 1.
   \param priority priority of this queue.
   \sa CJob
   */
  CJobQueue(bool lifo = false, unsigned int jobsAtOnce = 1, CJob::PRIORITY priority = CJob::PRIORITY_LOW);
  /*!
   \brief CJobQueue destructor
   Cancels any in-process jobs, and destroys the job queue.
   \sa CJob
   */
  ~CJobQueue() override;
  /*!
   \brief Add a job to the queue
   On completion of the job, destruction of the job queue or in case the job could not be added successfully, the CJob object will be destroyed.
   \param job a pointer to the job to add. The job should be subclassed from CJob.
   \return True if the job was added successfully, false otherwise.
   In case of failure, the passed CJob object will be deleted before returning from this method.
   \sa CJob
   */
  bool AddJob(CJob *job);
  /*!
   \brief Add a function f to this job queue
   */
  template<typename F>
  void Submit(F&& f)
  {
    AddJob(new CLambdaJob<F>(std::forward<F>(f)));
  }
  /*!
   \brief Cancel a job in the queue
   Cancels a job in the queue. Any job currently being processed may complete after this
   call has completed, but OnJobComplete will not be performed. If the job is only queued
   then it will be removed from the queue and deleted.
   \param job a pointer to the job to cancel. The job should be subclassed from CJob.
   \sa CJob
   */
  void CancelJob(const CJob *job);
  /*!
   \brief Cancel all jobs in the queue
   Removes all jobs from the queue. Any job currently being processed may complete after this
   call has completed, but OnJobComplete will not be performed.
   \sa CJob
   */
  void CancelJobs();
  /*!
   \brief Check whether the queue is processing a job
   */
  bool IsProcessing() const;
  /*!
   \brief The callback used when a job completes.
   CJobQueue implementation will cleanup the internal processing queue and then queue the next
   job at the job manager, if any.
   \param jobID the unique id of the job (as retrieved from CJobManager::AddJob)
   \param success the result from the DoWork call
   \param job the job that has been processed.
   \sa CJobManager, IJobCallback and CJob
   */
  void OnJobComplete(unsigned int jobID, bool success, CJob *job) override;
  /*!
   \brief The callback used when a job will be aborted.
   CJobQueue implementation will cleanup the internal processing queue and then queue the next
   job at the job manager, if any.
   \param jobID the unique id of the job (as retrieved from CJobManager::AddJob)
   \param job the job that has been aborted.
   \sa CJobManager, IJobCallback and CJob
   */
  void OnJobAbort(unsigned int jobID, CJob* job) override;
protected:
  /*!
   \brief Returns if we still have jobs waiting to be processed
   NOTE: This function does not take into account the jobs that are currently processing
   */
  bool QueueEmpty() const;
private:
  void OnJobNotify(CJob* job);
  void QueueNextJob();
  typedef std::deque<CJobPointer> Queue;
  typedef std::vector<CJobPointer> Processing;
  Queue m_jobQueue;
  Processing m_processing;
  unsigned int m_jobsAtOnce;
  CJob::PRIORITY m_priority;
  mutable CCriticalSection m_section;
  bool m_lifo;
};
/*!
 \ingroup jobs
 \brief Job Manager class for scheduling asynchronous jobs.
 Controls asynchronous job execution, by allowing clients to add and cancel jobs.
 Should be accessed via CServiceBroker::GetJobManager().  Jobs are allocated based
 on priority levels.  Lower priority jobs are executed only if there are sufficient
 spare worker threads free to allow for higher priority jobs that may arise.
 \sa CJob and IJobCallback
 */
class CJobManager final
{
  class CWorkItem
  {
  public:
    CWorkItem(CJob *job, unsigned int id, CJob::PRIORITY priority, IJobCallback *callback)
    {
      m_job = job;
      m_id = id;
      m_callback = callback;
      m_priority = priority;
    }
    bool operator==(unsigned int jobID) const
    {
      return m_id == jobID;
    };
    bool operator==(const CJob *job) const
    {
      return m_job == job;
    };
    void FreeJob()
    {
      delete m_job;
      m_job = NULL;
    };
    void Cancel()
    {
      m_callback = NULL;
    };
    CJob         *m_job;
    unsigned int  m_id;
    IJobCallback *m_callback;
    CJob::PRIORITY m_priority;
  };
public:
  CJobManager();
  /*!
   \brief Add a job to the threaded job manager.
   On completion or abort of the job or in case the job could not be added successfully, the CJob object will be destroyed.
   \param job a pointer to the job to add. The job should be subclassed from CJob
   \param callback a pointer to an IJobCallback instance to receive job progress and completion notices.
   \param priority the priority that this job should run at.
   \return On success, a unique identifier for this job, to be used with other interaction, 0 otherwise.
   In case of failure, the passed CJob object will be deleted before returning from this method.
   \sa CJob, IJobCallback, CancelJob()
   */
  unsigned int AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority = CJob::PRIORITY_LOW);
  /*!
   \brief Add a function f to this job manager for asynchronously execution.
   */
  template<typename F>
  void Submit(F&& f, CJob::PRIORITY priority = CJob::PRIORITY_LOW)
  {
    AddJob(new CLambdaJob<F>(std::forward<F>(f)), nullptr, priority);
  }
  /*!
   \brief Add a function f to this job manager for asynchronously execution.
   */
  template<typename F>
  void Submit(F&& f, IJobCallback *callback, CJob::PRIORITY priority = CJob::PRIORITY_LOW)
  {
    AddJob(new CLambdaJob<F>(std::forward<F>(f)), callback, priority);
  }
  /*!
   \brief Cancel a job with the given id.
   \param jobID the id of the job to cancel, retrieved previously from AddJob()
   \sa AddJob()
   */
  void CancelJob(unsigned int jobID);
  /*!
   \brief Cancel all remaining jobs, preparing for shutdown
   Should be called prior to destroying any objects that may be being used as callbacks
   \sa CancelJob(), AddJob()
   */
  void CancelJobs();
  /*!
   \brief Re-start accepting jobs again
   Called after calling CancelJobs() to allow this manager to accept more jobs
   \throws std::logic_error if the manager was not previously cancelled
   \sa CancelJobs()
   */
  void Restart();
  /*!
   \brief Checks to see if any jobs of a specific type are currently processing.
   \param type Job type to search for
   \return Number of matching jobs
   */
  int IsProcessing(const std::string &type) const;
  /*!
   \brief Suspends queueing of jobs with priority PRIORITY_LOW_PAUSABLE until unpaused
   Useful to (for ex) stop queuing thumb jobs during video start/playback.
   Does not affect currently processing jobs, use IsProcessing to see if any need to be waited on
   \sa UnPauseJobs()
   */
  void PauseJobs();
  /*!
   \brief Resumes queueing of (previously paused) jobs with priority PRIORITY_LOW_PAUSABLE
   \sa PauseJobs()
   */
  void UnPauseJobs();
  /*!
   \brief Checks to see if any jobs with specific priority are currently processing.
   \param priority to search for
   \return true if processing jobs, else returns false
   */
  bool IsProcessing(const CJob::PRIORITY &priority) const;
protected:
  friend class CJobWorker;
  friend class CJob;
  friend class CJobQueue;
  /*!
   \brief Get a new job to process. Blocks until a new job is available, or a timeout has occurred.
   \sa CJob
   */
  CJob* GetNextJob();
  /*!
   \brief Callback from CJobWorker after a job has completed.
   Calls IJobCallback::OnJobComplete(), and then destroys job.
   \param job a pointer to the calling subclassed CJob instance.
   \param success the result from the DoWork call
   \sa IJobCallback, CJob
   */
  void  OnJobComplete(bool success, CJob *job);
  /*!
   \brief Callback from CJob to report progress and check for cancellation.
   Checks for cancellation, and calls IJobCallback::OnJobProgress().
   \param progress amount of processing performed to date, out of total.
   \param total total amount of processing.
   \param job pointer to the calling subclassed CJob instance.
   \return true if the job has been cancelled, else returns false.
   \sa IJobCallback, CJob
   */
  bool  OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const;
private:
  CJobManager(const CJobManager&) = delete;
  CJobManager const& operator=(CJobManager const&) = delete;
  /*! \brief Pop a job off the job queue and add to the processing queue ready to process
   \return the job to process, NULL if no jobs are available
   */
  CJob *PopJob();
  void StartWorkers(CJob::PRIORITY priority);
  void RemoveWorker(const CJobWorker *worker);
  static unsigned int GetMaxWorkers(CJob::PRIORITY priority);
  unsigned int m_jobCounter;
  typedef std::deque<CWorkItem>    JobQueue;
  typedef std::vector<CWorkItem>   Processing;
  typedef std::vector<CJobWorker*> Workers;
  JobQueue   m_jobQueue[CJob::PRIORITY_DEDICATED + 1];
  bool       m_pauseJobs;
  Processing m_processing;
  Workers    m_workers;
  mutable CCriticalSection m_section;
  CEvent           m_jobEvent;
  bool             m_running;
};
 
     |