You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

92 lines
2.4 KiB

  1. //
  2. // Created by zhaohe on 19-5-21.
  3. //
  4. #include "thread.hpp"
  5. #include <atomic>
  6. #include <mutex>
  7. #include <thread>
  8. #include "iflytopcpp/core/basic/autodo.hpp"
  9. #include "iflytopcpp/core/basic/concurrentqueue/blockingconcurrentqueue.h"
  10. #include "iflytopcpp/core/basic/signal/signal.hpp"
  11. #include "iflytopcpp/core/zexception/zexception.hpp"
  12. #include "unistd.h"
  13. using namespace std;
  14. using namespace iflytop;
  15. using namespace core;
  16. using namespace moodycamel;
  17. static std::mutex m_threadinfolock;
  18. static map<pthread_t, Thread *> m_thread_map;
  19. Thread::Thread(std::function<void()> run) { Thread("notSet", run); }
  20. Thread::Thread(string name, function<void()> run) {
  21. this->run = run;
  22. this->name = name;
  23. if (this->run == nullptr) {
  24. throw zexception("Create Thread run == nullptr");
  25. }
  26. bool threadInitialized = false;
  27. workThread.reset(new std::thread([&]() {
  28. this->id = pthread_self();
  29. {
  30. std::lock_guard<std::mutex> lock(m_threadinfolock);
  31. m_thread_map[this->id] = this;
  32. }
  33. threadInitialized = true;
  34. Autodo autodo([&]() { this->waitingForJoin = true; });
  35. try {
  36. this->run();
  37. } catch (const std::exception &exception) {
  38. logger->error("thread[{}] catch exception,{}", this->name, exception.what());
  39. throw exception;
  40. } catch (...) {
  41. logger->error("thread[{}] catch unkown,exception", this->name);
  42. throw;
  43. }
  44. }));
  45. while (!threadInitialized) {
  46. usleep(1);
  47. }
  48. }
  49. void Thread::wake() { signal.notify(); }
  50. void Thread::join() {
  51. std::lock_guard<std::mutex> lock(lock_);
  52. if (!hasJointd) {
  53. exitFlag = true;
  54. signal.notify();
  55. workThread->join();
  56. hasJointd = true;
  57. {
  58. std::lock_guard<std::mutex> lock(m_threadinfolock);
  59. m_thread_map.erase(id);
  60. }
  61. }
  62. };
  63. Thread::~Thread() {
  64. if (!hasJointd) {
  65. logger->error(fmt::format("thread {} unjoin ", name));
  66. exit(-1);
  67. }
  68. }
  69. pthread_t Thread::getId() const { return id; }
  70. ThisThread::ThisThread() {
  71. {
  72. std::lock_guard<std::mutex> lock(m_threadinfolock);
  73. if (m_thread_map.find(getId()) == m_thread_map.end()) {
  74. throw zexception(fmt::format("thread {} not reg thread info", getId()));
  75. }
  76. m_thread = m_thread_map[getId()];
  77. }
  78. }
  79. bool ThisThread::getExitFlag() { return m_thread->getExitFlag(); }
  80. void ThisThread::sleep() { m_thread->sleep(); }
  81. void ThisThread::sleepForMs(int64_t ms) { m_thread->sleepForMs(ms); }
  82. void ThisThread::wake() { m_thread->wake(); }