Browse Source

添加线程池和工作队列

disinfection_machine
zhaohe 3 years ago
parent
commit
98d930131f
  1. 221
      core/components/jobs/thread_pool_task_scheduler.cpp
  2. 106
      core/components/jobs/thread_pool_task_scheduler.hpp
  3. 57
      core/components/jobs/work_queue.cpp
  4. 46
      core/components/jobs/work_queue.hpp
  5. 3
      module.cmake

221
core/components/jobs/thread_pool_task_scheduler.cpp

@ -0,0 +1,221 @@
//
// Created by zhaohe on 19-5-21.
//
#include "thread_pool_task_scheduler.hpp"
using namespace std;
using namespace iflytop;
using namespace core;
using namespace chrono;
ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(const string& name, int numThread)
: name(name), numThread(numThread) {}
void ThreadPoolTaskScheduler::initialize() {
for (int32_t i = 0; i < numThread; ++i) {
unique_ptr<ThreadWrapper> threadWrapper(new ThreadWrapper());
threadWrapper->timerThread = i == 0;
threadWrapper->sleepd = false;
ThreadWrapper* threadWrapperP = threadWrapper.get();
threadWrapper->thread.reset(new Thread(fmt::format("{}-{}{}", "ThreadPoolTaskScheduler", name, i),
[this, threadWrapperP]() { threadProcessFunction(threadWrapperP); }));
threads.push_back(move(threadWrapper));
}
}
void ThreadPoolTaskScheduler::pushTask(const string& debugInfo, function<void(void)> work) {
shared_ptr<Task> newTask(new Task());
newTask->work = work;
newTask->debugInfo = debugInfo;
tasks.enqueue(newTask);
/************************************************/
// 如有有正在休息的线程,只唤醒正在休息的线程即可
// 否则唤醒全部线程,让所有线程竞争新的Task
bool waked = false;
for (auto& threadW : threads) {
if (threadW->sleepd) {
threadW->thread->wake();
waked = true;
break;
}
}
if (!waked)
for (auto& threadW : threads) threadW->thread->wake();
}
void ThreadPoolTaskScheduler::threadProcessFunction(ThreadWrapper* threadWrapper) {
shared_ptr<Task> task;
ThisThread thisThread;
while (!thisThread.getExitFlag()) {
threadWrapper->sleepd = false;
if (tasks.try_dequeue(task)) {
if (task && task->work) {
try {
logger->debug("Do {} task", task->debugInfo);
task->work();
} catch (const std::exception& e) {
logger->error("catch exception on Task {}\nexception:{}", task->debugInfo, e.what());
}
}
}
if (threadWrapper->timerThread) {
timerProcess();
}
if (tasks.size_approx() != 0) {
continue;
}
threadWrapper->sleepd = true;
if (threadWrapper->timerThread) {
sleepUntilNextTimer();
} else {
thisThread.sleep();
}
}
}
/**
* @brief Set the Interval object
*
* @param work
* @param interval_ms
* @param doItAtOnce
* @return int
*/
int ThreadPoolTaskScheduler::setInterval(function<void(void)> work, int32_t interval_ms, bool doItAtOnce) {
return setTimer(work, interval_ms, doItAtOnce, false);
}
/**
* @brief Set the Timeout object
*
* @param work
* @param timeout_ms
* @return int
*/
int ThreadPoolTaskScheduler::setTimeout(function<void(void)> work, int32_t timeout_ms) {
return setTimer(work, timeout_ms, false, true);
}
/**
* @brief
*
* @param id
*/
void ThreadPoolTaskScheduler::stopTimer(uint32_t& id) {
std::lock_guard<std::mutex> lock(lockMapAndVector);
for (auto& each : newTimers) {
if (each->id == id) each->stoped = true;
}
for (auto& each : timers) {
auto& timer = each.second;
if (timer->id == id) timer->stoped = true;
}
id = 0;
};
int ThreadPoolTaskScheduler::setTimer(function<void(void)> work, int32_t interval_ms, bool doItAtOnce,
bool doItOnlyOneTime) {
std::lock_guard<std::mutex> lock(lockMapAndVector);
shared_ptr<Timer> timer(new Timer());
timer->doItOnlyOneTime = doItOnlyOneTime;
if (doItAtOnce) {
timer->elapsedTime = tu_steady().now();
} else {
timer->elapsedTime = tu_steady().addms(interval_ms);
}
timer->id = timerId;
timer->interval = interval_ms;
timer->work = work;
newTimers.push_back(timer);
timerId++;
/********************************/
// 唤醒Timer线程
for (auto& threadW : threads) {
if (threadW->timerThread) {
threadW->thread->wake();
break;
}
}
return timer->id;
}
void ThreadPoolTaskScheduler::timerProcess() {
vector<uint32_t> disableTimer;
/**********************处理新Timer************/
{
std::lock_guard<std::mutex> lock(lockMapAndVector);
for (auto& each : newTimers) {
timers[each->id] = each;
}
newTimers.clear();
disableTimer.clear();
for (auto& each : timers) {
auto& timer = each.second;
if (timer->stoped) disableTimer.push_back(each.first);
}
for (auto& each : disableTimer) timers.erase(each);
}
/********************************************/
auto now = tu_steady().now();
for (auto& each : timers) {
auto& timer = each.second;
if (timer->stoped) {
continue;
}
if (timer->hasDoneOnce && timer->doItOnlyOneTime) {
disableTimer.push_back(each.first);
continue;
}
if (now.time_since_epoch() > timer->elapsedTime.time_since_epoch()) {
if (timer->work) timer->work();
timer->elapsedTime = tu_steady().addms(now, timer->interval);
timer->hasDoneOnce = true;
}
}
/********************************************/
}
void ThreadPoolTaskScheduler::sleepUntilNextTimer() {
ThisThread thisThread;
time_point<steady_clock> nextwakeTime;
bool first = false;
bool hasSetedNextwakeTime = false;
for (auto& each : timers) {
auto& timer = each.second;
if (timer->stoped) {
continue;
}
if (first) {
nextwakeTime = timer->elapsedTime;
first = true;
hasSetedNextwakeTime = true;
}
if (timer->elapsedTime < nextwakeTime) {
nextwakeTime = timer->elapsedTime;
hasSetedNextwakeTime = true;
}
}
if (hasSetedNextwakeTime) {
int32_t sleepTime = tu_steady().dToMs(tu_steady().now() - nextwakeTime);
thisThread.sleepForMs(sleepTime);
} else {
thisThread.sleep();
}
}

106
core/components/jobs/thread_pool_task_scheduler.hpp

@ -0,0 +1,106 @@
//
// Created by zhaohe on 19-5-21.
//
#pragma once
#include <fstream>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <memory>
#include <mutex>
#include <queue>
#include <set>
#include <sstream>
#include <string>
#include <vector>
// #include "zwtimecpp/core/base/object.hpp"
#include "iflytopcpp/core/spdlogfactory/logger.hpp"
#include "iflytopcpp/core/thread/thread.hpp"
#include "iflytopcpp/core/basic/concurrentqueue/concurrentqueue.h"
#include "iflytopcpp/core/components/time_util.hpp"
#define CODE_LOCATION (fmt::format("{}:{}", __FILE__, __LINE__))
namespace iflytop {
namespace core {
using namespace std;
using namespace moodycamel;
using namespace chrono;
/**
* 线
*
* 线
*
*/
class ThreadPoolTaskScheduler {
ENABLE_LOGGER(ThreadPoolTaskScheduler)
private:
class ThreadWrapper {
public:
unique_ptr<Thread> thread;
bool sleepd = false;
bool timerThread = false;
private:
};
class Task {
public:
function<void(void)> work;
string debugInfo;
private:
};
class Timer {
public:
uint32_t id;
int32_t interval; //周期
bool doItOnlyOneTime = false; //是否运行一次
time_point<steady_clock> elapsedTime; //下次执行时间
function<void(void)> work; //回调
bool stoped = false; //已经停止
bool hasDoneOnce = false; //已经执行过一次
private:
};
private:
vector<unique_ptr<ThreadWrapper>> threads;
string name;
int numThread = 0;
ConcurrentQueue<shared_ptr<Task>> tasks;
std::mutex lock_;
/**********************timer**********************/
map<uint32_t, shared_ptr<Timer>> timers;
uint32_t timerId = {1};
vector<shared_ptr<Timer>> newTimers;
shared_ptr<Timer> nextTimer;
atomic_bool timerStateUpdate = {false};
std::mutex lockMapAndVector;
public:
ThreadPoolTaskScheduler(const string& name, int num);
void initialize();
void pushTask(const string& debugInfo, function<void(void)> work);
int setInterval(function<void(void)> work, int32_t interval_ms,
bool doItAtOnce = false);
int setTimeout(function<void(void)> work, int32_t timeout_ms);
void stopTimer(uint32_t& id);
~ThreadPoolTaskScheduler() {
// for (auto& thread : threads) thread->join();
}
private:
int setTimer(function<void(void)> work, int32_t interval_ms, bool doItAtOnce,
bool doItOnlyOneTime);
void threadProcessFunction(ThreadWrapper*threadWrapper);
void timerProcess();
void sleepUntilNextTimer();
};
}
}

57
core/components/jobs/work_queue.cpp

@ -0,0 +1,57 @@
/*
* File: work_queue.hpp
* Project: worker_queue
* File Created: Thursday, 18th October 2018 5:23:19 pm
* Author: zhaohe (1013909206@qq.com)
* -----
* Last Modified: Thursday, 18th October 2018 5:23:25 pm
* Modified By: zhaohe (1013909206@qq.com>)
* -----
* Copyright 2017 - 2018 zwsd, zwsd
*/
#include "work_queue.hpp"
#include <fstream>
#include <functional>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "iflytopcpp/core/basic/concurrentqueue/blockingconcurrentqueue.h"
#include "iflytopcpp/core/thread/thread.hpp"
using namespace iflytop;
using namespace core;
using namespace std;
WorkQueue::WorkQueue(string name) {
is_running_ = true;
thread_.reset(new Thread(name, [this]() {
while (1) {
if (!is_running_) {
break;
}
function<void(void)> call_;
queue_.wait_dequeue(call_);
if (call_) call_();
}
}));
};
void WorkQueue::clear() {
function<void(void)> call_;
while (queue_.try_dequeue(call_)) {
/*clear queue*/
}
};
size_t WorkQueue::getSize() { return queue_.size_approx(); }
void WorkQueue::enQueue(function<void(void)> func) { queue_.enqueue(func); }
WorkQueue::~WorkQueue() {
is_running_ = false;
queue_.enqueue(nullptr);
if (thread_) thread_->join();
}

46
core/components/jobs/work_queue.hpp

@ -0,0 +1,46 @@
/*
* File: work_queue.hpp
* Project: worker_queue
* File Created: Thursday, 18th October 2018 5:23:19 pm
* Author: zhaohe (1013909206@qq.com)
* -----
* Last Modified: Thursday, 18th October 2018 5:23:25 pm
* Modified By: zhaohe (1013909206@qq.com>)
* -----
* Copyright 2017 - 2018 zwsd, zwsd
*/
#pragma once
#include <fstream>
#include <functional>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "iflytopcpp/core/basic/concurrentqueue/blockingconcurrentqueue.h"
#include "iflytopcpp/core/thread/thread.hpp"
namespace iflytop {
namespace core {
using namespace std;
class WorkQueue {
bool is_running_ = false;
unique_ptr<Thread> thread_;
moodycamel::BlockingConcurrentQueue<function<void(void)>> queue_;
public:
WorkQueue(string name);
void clear();
size_t getSize();
void enQueue(function<void(void)> func);
~WorkQueue();
private:
};
} // namespace core
} // namespace iflytop

3
module.cmake

@ -16,6 +16,9 @@ set(DEP_SRC
dep/iflytopcpp/core/thread/thread.cpp dep/iflytopcpp/core/thread/thread.cpp
dep/iflytopcpp/core/zexception/zexception.cpp dep/iflytopcpp/core/zexception/zexception.cpp
dep/iflytopcpp/core/components/jobs/work_queue.cpp
dep/iflytopcpp/core/components/jobs/thread_pool_task_scheduler.cpp
dep/iflytopcpp/core/components/uart/uart.cpp dep/iflytopcpp/core/components/uart/uart.cpp
dep/iflytopcpp/core/components/modbus/modbus.cpp dep/iflytopcpp/core/components/modbus/modbus.cpp
dep/iflytopcpp/core/components/modbus/zmodbus_common.c dep/iflytopcpp/core/components/modbus/zmodbus_common.c

Loading…
Cancel
Save