NeoPZ
TPZThreadPool.h
Go to the documentation of this file.
1 #ifndef TPZTHREADPOOL_H
2 #define TPZTHREADPOOL_H
3 
4 #include <vector>
5 #include "TPZPriorityQueue.h"
6 #include <limits>
7 
8 #include <cstdarg>
9 #include <functional>
10 
11 #include "tpzautopointer.h"
12 
13 #include <thread>
14 #include <future>
15 #include <condition_variable>
16 #include <mutex>
17 #include <memory>
18 #include "TPZTask.h"
19 #include "TPZReschedulableTask.h"
20 #include "TPZTaskGroup.h"
21 
24 public:
26  static TPZThreadPool &globalInstance();
27 
29  // @return a std::future<void> object that allows to block the calling thread if necessary (by calling wait())
30  std::shared_future<void> run(const int priority, TPZAutoPointer<std::packaged_task<void(void) > > &task, TPZTaskGroup *taskGroup = NULL);
31 
33 
34  void reschedule(const int priority, TPZAutoPointer<TPZReschedulableTask> &task);
35  std::shared_future<void> runNow(TPZAutoPointer<TPZReschedulableTask> &task);
36 
37  template<typename... Args>
38  std::shared_future<void> run(const int priority, TPZTaskGroup *taskGroup, std::function<void(Args...) > func, Args... args) {
39  checkForMaxAndMinPriority(priority);
40  TPZAutoPointer < std::packaged_task<void(void) >> task(new std::packaged_task<void(void) >(std::bind(func, args...)));
41  return run(priority, task, taskGroup);
42  }
43 
44  template<typename... Args>
45  TPZAutoPointer<TPZReschedulableTask> runReschedulable(const int priority, TPZTaskGroup *taskGroup, std::function<void(Args...) > func, Args... args) {
46  checkForMaxAndMinPriority(priority);
47  TPZAutoPointer < std::packaged_task<void(void) >> task(new std::packaged_task<void(void) >(std::bind(func, args...)));
48  TPZAutoPointer<TPZReschedulableTask> rescTask(new TPZReschedulableTask(priority, task, taskGroup));
49  run(rescTask);
50  return rescTask;
51  }
52 
54  void SetNumThreads(const unsigned numThreads);
55  int maxPriority() const;
56  int minPriority() const;
57 
59  int threadCount() const;
60 
61 private:
62  TPZThreadPool();
63  int ActualThreadCount() const;
64  void threadsLoop();
65  void updatePriorities();
67  TPZAutoPointer<TPZTask> appendTaskToQueue(const int priority, TPZAutoPointer<std::packaged_task<void(void) >> &task, const bool system_task, TPZTaskGroup *taskGroup = NULL);
68  void checkForMaxAndMinPriority(const int priority);
70 
72  // The only use of this method is to run a thread_join task when the number of threads was decreased
73  template<typename... Args>
74  std::shared_future<void> runSystemTask(const int priority, std::function<void(Args...) > func, Args... args) {
75  checkForMaxAndMinPriority(priority);
76  TPZAutoPointer < std::packaged_task<void(void) >> task(new std::packaged_task<void(void) >(std::bind(func, args...)));
77  std::shared_future<void> fut = task->get_future().share();
78  appendTaskToQueue(priority, task, true, NULL);
79  return fut;
80  }
81 
83  std::vector<std::thread> mThreads;
86  unsigned int mThreadsToDelete;
87  unsigned int mZombieThreads;
88  bool mStop;
89  TPZPriorityQueue<TPZAutoPointer<TPZTask>, std::vector<TPZAutoPointer<TPZTask>>, TPZTaskOrdering> mTasksQueue;
90  std::condition_variable mTaskAvailableCond;
93 
94 };
95 
96 #endif // TPZTHREADPOOL_H
pthread_mutex_t mutex
Semaphore which controls multiple threads.
TPZAutoPointer< TPZReschedulableTask > runReschedulable(const int priority, TPZTaskGroup *taskGroup, std::function< void(Args...) > func, Args... args)
Definition: TPZThreadPool.h:45
unsigned int mThreadsToDelete
Definition: TPZThreadPool.h:86
int ActualThreadCount() const
std::condition_variable mTaskAvailableCond
Definition: TPZThreadPool.h:90
Administers tasks that will be executed asynchronously.
Definition: TPZThreadPool.h:23
std::mutex mThreadsMutex
one mutex to synchronize access to the data structures
Definition: TPZThreadPool.h:85
int threadCount() const
std::shared_future< void > runSystemTask(const int priority, std::function< void(Args...) > func, Args... args)
Submits and processes a "maximum priority" system task.
Definition: TPZThreadPool.h:74
void appendTaskToQueue(TPZAutoPointer< TPZTask > &task)
static TPZThreadPool & globalInstance()
TPZPriorityQueue< TPZAutoPointer< TPZTask >, std::vector< TPZAutoPointer< TPZTask > >, TPZTaskOrdering > mTasksQueue
Definition: TPZThreadPool.h:89
int maxPriority() const
void updatePriorities()
void reschedule(const int priority, TPZAutoPointer< TPZReschedulableTask > &task)
std::shared_future< void > run(const int priority, TPZAutoPointer< std::packaged_task< void(void) > > &task, TPZTaskGroup *taskGroup=NULL)
submits a task to be executed by TPZThreadPool
void checkForMaxAndMinPriority(const int priority)
Contains declaration of the TPZAutoPointer class which has Increment and Decrement actions are mutexe...
unsigned int mZombieThreads
Definition: TPZThreadPool.h:87
void SetNumThreads(const unsigned numThreads)
sets the number of threads to be executed simultaneously
Simple struct needed by std::priority_queue for ordering the items.
Definition: TPZTask.h:48
std::shared_future< void > run(const int priority, TPZTaskGroup *taskGroup, std::function< void(Args...) > func, Args... args)
Definition: TPZThreadPool.h:38
int minPriority() const
std::vector< std::thread > mThreads
vector of thread objects
Definition: TPZThreadPool.h:83
std::shared_future< void > runNow(TPZAutoPointer< TPZReschedulableTask > &task)
This class implements a reference counter mechanism to administer a dynamically allocated object...