// // Created by zhaohe on 19-5-21. // #include "thread.hpp" #include #include #include #include "iflytopcpp/core/basic/autodo.hpp" #include "iflytopcpp/core/basic/concurrentqueue/blockingconcurrentqueue.h" #include "iflytopcpp/core/basic/signal/signal.hpp" #include "iflytopcpp/core/zexception/zexception.hpp" #include "unistd.h" using namespace std; using namespace iflytop; using namespace core; using namespace moodycamel; static std::mutex m_threadinfolock; static map m_thread_map; Thread::Thread(std::function run) { Thread("notSet", run); } Thread::Thread(string name, function run) { this->run = run; this->name = name; if (this->run == nullptr) { throw zexception("Create Thread run == nullptr"); } bool threadInitialized = false; workThread.reset(new std::thread([&]() { this->id = pthread_self(); { std::lock_guard lock(m_threadinfolock); m_thread_map[this->id] = this; } threadInitialized = true; Autodo autodo([&]() { this->waitingForJoin = true; }); try { this->run(); } catch (const std::exception &exception) { logger->error("thread[{}] catch exception,{}", this->name, exception.what()); throw exception; } catch (...) { logger->error("thread[{}] catch unkown,exception", this->name); throw; } })); while (!threadInitialized) { usleep(1); } } void Thread::wake() { signal.notify(); } void Thread::join() { std::lock_guard lock(lock_); if (!hasJointd) { exitFlag = true; signal.notify(); workThread->join(); hasJointd = true; { std::lock_guard lock(m_threadinfolock); m_thread_map.erase(id); } } }; Thread::~Thread() { if (!hasJointd) { logger->error(fmt::format("thread {} unjoin ", name)); exit(-1); } } pthread_t Thread::getId() const { return id; } ThisThread::ThisThread() { { std::lock_guard lock(m_threadinfolock); if (m_thread_map.find(getId()) == m_thread_map.end()) { throw zexception(fmt::format("thread {} not reg thread info", getId())); } m_thread = m_thread_map[getId()]; } } bool ThisThread::getExitFlag() { return m_thread->getExitFlag(); } void ThisThread::sleep() { m_thread->sleep(); } void ThisThread::sleepForMs(int64_t ms) { m_thread->sleepForMs(ms); } void ThisThread::wake() { m_thread->wake(); }