diff --git a/core/components/jobs/thread_pool_task_scheduler.cpp b/core/components/jobs/thread_pool_task_scheduler.cpp new file mode 100644 index 0000000..606b790 --- /dev/null +++ b/core/components/jobs/thread_pool_task_scheduler.cpp @@ -0,0 +1,221 @@ +// +// Created by zhaohe on 19-5-21. +// + +#include "thread_pool_task_scheduler.hpp" +using namespace std; +using namespace iflytop; +using namespace core; +using namespace chrono; + +ThreadPoolTaskScheduler::ThreadPoolTaskScheduler(const string& name, int numThread) + : name(name), numThread(numThread) {} +void ThreadPoolTaskScheduler::initialize() { + for (int32_t i = 0; i < numThread; ++i) { + unique_ptr threadWrapper(new ThreadWrapper()); + threadWrapper->timerThread = i == 0; + threadWrapper->sleepd = false; + + ThreadWrapper* threadWrapperP = threadWrapper.get(); + threadWrapper->thread.reset(new Thread(fmt::format("{}-{}{}", "ThreadPoolTaskScheduler", name, i), + [this, threadWrapperP]() { threadProcessFunction(threadWrapperP); })); + + threads.push_back(move(threadWrapper)); + } +} +void ThreadPoolTaskScheduler::pushTask(const string& debugInfo, function work) { + shared_ptr newTask(new Task()); + newTask->work = work; + newTask->debugInfo = debugInfo; + tasks.enqueue(newTask); + + /************************************************/ + // 如有有正在休息的线程,只唤醒正在休息的线程即可 + // 否则唤醒全部线程,让所有线程竞争新的Task + bool waked = false; + for (auto& threadW : threads) { + if (threadW->sleepd) { + threadW->thread->wake(); + waked = true; + break; + } + } + + if (!waked) + for (auto& threadW : threads) threadW->thread->wake(); +} + +void ThreadPoolTaskScheduler::threadProcessFunction(ThreadWrapper* threadWrapper) { + shared_ptr task; + ThisThread thisThread; + while (!thisThread.getExitFlag()) { + threadWrapper->sleepd = false; + + if (tasks.try_dequeue(task)) { + if (task && task->work) { + try { + logger->debug("Do {} task", task->debugInfo); + task->work(); + } catch (const std::exception& e) { + logger->error("catch exception on Task {}\nexception:{}", task->debugInfo, e.what()); + } + } + } + + if (threadWrapper->timerThread) { + timerProcess(); + } + + if (tasks.size_approx() != 0) { + continue; + } + + threadWrapper->sleepd = true; + if (threadWrapper->timerThread) { + sleepUntilNextTimer(); + } else { + thisThread.sleep(); + } + } +} + +/** + * @brief Set the Interval object + * + * @param work + * @param interval_ms + * @param doItAtOnce + * @return int + */ +int ThreadPoolTaskScheduler::setInterval(function work, int32_t interval_ms, bool doItAtOnce) { + return setTimer(work, interval_ms, doItAtOnce, false); +} +/** + * @brief Set the Timeout object + * + * @param work + * @param timeout_ms + * @return int + */ +int ThreadPoolTaskScheduler::setTimeout(function work, int32_t timeout_ms) { + return setTimer(work, timeout_ms, false, true); +} +/** + * @brief 停止定时器 + * + * @param id + */ +void ThreadPoolTaskScheduler::stopTimer(uint32_t& id) { + std::lock_guard lock(lockMapAndVector); + for (auto& each : newTimers) { + if (each->id == id) each->stoped = true; + } + + for (auto& each : timers) { + auto& timer = each.second; + if (timer->id == id) timer->stoped = true; + } + + id = 0; +}; + +int ThreadPoolTaskScheduler::setTimer(function work, int32_t interval_ms, bool doItAtOnce, + bool doItOnlyOneTime) { + std::lock_guard lock(lockMapAndVector); + shared_ptr timer(new Timer()); + timer->doItOnlyOneTime = doItOnlyOneTime; + if (doItAtOnce) { + timer->elapsedTime = tu_steady().now(); + } else { + timer->elapsedTime = tu_steady().addms(interval_ms); + } + timer->id = timerId; + timer->interval = interval_ms; + timer->work = work; + newTimers.push_back(timer); + timerId++; + + /********************************/ + // 唤醒Timer线程 + for (auto& threadW : threads) { + if (threadW->timerThread) { + threadW->thread->wake(); + break; + } + } + + return timer->id; +} + +void ThreadPoolTaskScheduler::timerProcess() { + vector disableTimer; + + /**********************处理新Timer************/ + { + std::lock_guard lock(lockMapAndVector); + for (auto& each : newTimers) { + timers[each->id] = each; + } + newTimers.clear(); + + disableTimer.clear(); + for (auto& each : timers) { + auto& timer = each.second; + if (timer->stoped) disableTimer.push_back(each.first); + } + for (auto& each : disableTimer) timers.erase(each); + } + + /********************************************/ + auto now = tu_steady().now(); + for (auto& each : timers) { + auto& timer = each.second; + if (timer->stoped) { + continue; + } + + if (timer->hasDoneOnce && timer->doItOnlyOneTime) { + disableTimer.push_back(each.first); + continue; + } + + if (now.time_since_epoch() > timer->elapsedTime.time_since_epoch()) { + if (timer->work) timer->work(); + timer->elapsedTime = tu_steady().addms(now, timer->interval); + timer->hasDoneOnce = true; + } + } + + /********************************************/ +} + +void ThreadPoolTaskScheduler::sleepUntilNextTimer() { + ThisThread thisThread; + time_point nextwakeTime; + bool first = false; + bool hasSetedNextwakeTime = false; + for (auto& each : timers) { + auto& timer = each.second; + if (timer->stoped) { + continue; + } + + if (first) { + nextwakeTime = timer->elapsedTime; + first = true; + hasSetedNextwakeTime = true; + } + + if (timer->elapsedTime < nextwakeTime) { + nextwakeTime = timer->elapsedTime; + hasSetedNextwakeTime = true; + } + } + + if (hasSetedNextwakeTime) { + int32_t sleepTime = tu_steady().dToMs(tu_steady().now() - nextwakeTime); + thisThread.sleepForMs(sleepTime); + } else { + thisThread.sleep(); + } +} \ No newline at end of file diff --git a/core/components/jobs/thread_pool_task_scheduler.hpp b/core/components/jobs/thread_pool_task_scheduler.hpp new file mode 100644 index 0000000..ad88c15 --- /dev/null +++ b/core/components/jobs/thread_pool_task_scheduler.hpp @@ -0,0 +1,106 @@ +// +// Created by zhaohe on 19-5-21. +// + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +// #include "zwtimecpp/core/base/object.hpp" +#include "iflytopcpp/core/spdlogfactory/logger.hpp" +#include "iflytopcpp/core/thread/thread.hpp" +#include "iflytopcpp/core/basic/concurrentqueue/concurrentqueue.h" +#include "iflytopcpp/core/components/time_util.hpp" + +#define CODE_LOCATION (fmt::format("{}:{}", __FILE__, __LINE__)) + +namespace iflytop { +namespace core { +using namespace std; +using namespace moodycamel; +using namespace chrono; +/** + * 线程池 + * 任务调度 + * 线程池 + * 定时任务 + */ +class ThreadPoolTaskScheduler { + ENABLE_LOGGER(ThreadPoolTaskScheduler) + + private: + class ThreadWrapper { + public: + unique_ptr thread; + bool sleepd = false; + bool timerThread = false; + private: + }; + + class Task { + public: + function work; + string debugInfo; + + private: + }; + + class Timer { + public: + uint32_t id; + int32_t interval; //周期 + bool doItOnlyOneTime = false; //是否运行一次 + time_point elapsedTime; //下次执行时间 + function work; //回调 + bool stoped = false; //已经停止 + bool hasDoneOnce = false; //已经执行过一次 + private: + }; + + private: + vector> threads; + string name; + int numThread = 0; + ConcurrentQueue> tasks; + std::mutex lock_; + + /**********************timer**********************/ + map> timers; + uint32_t timerId = {1}; + vector> newTimers; + shared_ptr nextTimer; + atomic_bool timerStateUpdate = {false}; + std::mutex lockMapAndVector; + + public: + ThreadPoolTaskScheduler(const string& name, int num); + void initialize(); + void pushTask(const string& debugInfo, function work); + + int setInterval(function work, int32_t interval_ms, + bool doItAtOnce = false); + int setTimeout(function work, int32_t timeout_ms); + void stopTimer(uint32_t& id); + + ~ThreadPoolTaskScheduler() { + // for (auto& thread : threads) thread->join(); + } + + private: + int setTimer(function work, int32_t interval_ms, bool doItAtOnce, + bool doItOnlyOneTime); + void threadProcessFunction(ThreadWrapper*threadWrapper); + void timerProcess(); + void sleepUntilNextTimer(); +}; +} +} \ No newline at end of file diff --git a/core/components/jobs/work_queue.cpp b/core/components/jobs/work_queue.cpp new file mode 100644 index 0000000..b7cdb52 --- /dev/null +++ b/core/components/jobs/work_queue.cpp @@ -0,0 +1,57 @@ +/* + * File: work_queue.hpp + * Project: worker_queue + * File Created: Thursday, 18th October 2018 5:23:19 pm + * Author: zhaohe (1013909206@qq.com) + * ----- + * Last Modified: Thursday, 18th October 2018 5:23:25 pm + * Modified By: zhaohe (1013909206@qq.com>) + * ----- + * Copyright 2017 - 2018 zwsd, zwsd + */ +#include "work_queue.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iflytopcpp/core/basic/concurrentqueue/blockingconcurrentqueue.h" +#include "iflytopcpp/core/thread/thread.hpp" + +using namespace iflytop; +using namespace core; +using namespace std; + +WorkQueue::WorkQueue(string name) { + is_running_ = true; + thread_.reset(new Thread(name, [this]() { + while (1) { + if (!is_running_) { + break; + } + function call_; + queue_.wait_dequeue(call_); + if (call_) call_(); + } + })); +}; +void WorkQueue::clear() { + function call_; + while (queue_.try_dequeue(call_)) { + /*clear queue*/ + } +}; +size_t WorkQueue::getSize() { return queue_.size_approx(); } +void WorkQueue::enQueue(function func) { queue_.enqueue(func); } +WorkQueue::~WorkQueue() { + is_running_ = false; + queue_.enqueue(nullptr); + if (thread_) thread_->join(); +} diff --git a/core/components/jobs/work_queue.hpp b/core/components/jobs/work_queue.hpp new file mode 100644 index 0000000..73344b3 --- /dev/null +++ b/core/components/jobs/work_queue.hpp @@ -0,0 +1,46 @@ +/* + * File: work_queue.hpp + * Project: worker_queue + * File Created: Thursday, 18th October 2018 5:23:19 pm + * Author: zhaohe (1013909206@qq.com) + * ----- + * Last Modified: Thursday, 18th October 2018 5:23:25 pm + * Modified By: zhaohe (1013909206@qq.com>) + * ----- + * Copyright 2017 - 2018 zwsd, zwsd + */ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "iflytopcpp/core/basic/concurrentqueue/blockingconcurrentqueue.h" +#include "iflytopcpp/core/thread/thread.hpp" + +namespace iflytop { +namespace core { +using namespace std; +class WorkQueue { + bool is_running_ = false; + unique_ptr thread_; + moodycamel::BlockingConcurrentQueue> queue_; + + public: + WorkQueue(string name); + void clear(); + size_t getSize(); + void enQueue(function func); + ~WorkQueue(); + + private: +}; +} // namespace core + +} // namespace iflytop diff --git a/module.cmake b/module.cmake index fbcf807..296e12a 100644 --- a/module.cmake +++ b/module.cmake @@ -16,6 +16,9 @@ set(DEP_SRC dep/iflytopcpp/core/thread/thread.cpp dep/iflytopcpp/core/zexception/zexception.cpp + dep/iflytopcpp/core/components/jobs/work_queue.cpp + dep/iflytopcpp/core/components/jobs/thread_pool_task_scheduler.cpp + dep/iflytopcpp/core/components/uart/uart.cpp dep/iflytopcpp/core/components/modbus/modbus.cpp dep/iflytopcpp/core/components/modbus/zmodbus_common.c