From 808d14861ae0228613d85c2c7eb986daad0f51c5 Mon Sep 17 00:00:00 2001 From: zhaohe Date: Mon, 13 Nov 2023 15:08:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=89=8D=E7=AB=AF=E6=B6=88=E6=81=AF=E9=80=9A?= =?UTF-8?q?=E8=BF=87=E5=B7=A5=E4=BD=9C=E9=98=9F=E5=88=97=E8=BF=9B=E8=A1=8C?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=8C=E4=BF=9D=E8=AF=81=E7=BA=BF=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iflytop_front_end_service.cpp | 22 +++++++++++++++------- .../iflytop_front_end_service.hpp | 13 ++++++++----- src/iflytop/core/components/jobs/work_queue.cpp | 20 ++++++++++---------- src/iflytop/core/components/jobs/work_queue.hpp | 9 ++++++--- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.cpp b/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.cpp index c6d1553..7e7a2b0 100644 --- a/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.cpp +++ b/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.cpp @@ -13,17 +13,24 @@ void IflytopFrontEndService::initialize(string ip) { logger->info("IflytopFrontEndService initialize {}:{}", ip, 19001); logger->info("IflytopFrontEndService initialize {}:{}", ip, 19002); + m_work_queue.reset(new WorkQueue("IflytopFrontEndService-WQ")); + m_work_queue->startWork(); + m_server.reset(new WebSocketServer(19001, ip)); m_server->setOnConnectionCallback([this](weak_ptr webSocket, shared_ptr connectionState) { logger->info("Remote ip: {}", connectionState->getRemoteIp()); auto ws = webSocket.lock(); if (!ws) return; ws->setOnMessageCallback([this, webSocket, connectionState](const ix::WebSocketMessagePtr& msg) { - try { - onMessageCallback(webSocket, connectionState, msg); - } catch (const std::exception& e) { - logger->error("catch exception,onMessageCallback error: {}", e.what()); - } + shared_ptr messageSPtr = make_shared(*msg); + + m_work_queue->enQueue([this, webSocket, connectionState, messageSPtr]() { + try { + onMessageCallback(webSocket, connectionState, messageSPtr); + } catch (const std::exception& e) { + logger->error("catch exception,onMessageCallback error: {}", e.what()); + } + }); }); }); @@ -102,7 +109,8 @@ void IflytopFrontEndService::sendReport(json& report) { } } -void IflytopFrontEndService::onMessageCallback(weak_ptr webSocket, shared_ptr connectionState, const ix::WebSocketMessagePtr& msg) { +void IflytopFrontEndService::onMessageCallback(weak_ptr webSocket, shared_ptr connectionState, + shared_ptr msg) { if (msg->type == ix::WebSocketMessageType::Open) { logger->info("New connection"); logger->info("id : {}", connectionState->getId()); @@ -125,7 +133,7 @@ void IflytopFrontEndService::onMessageCallback(weak_ptr webSocket, sh onDisconnect(webSocket, msg); } } -void IflytopFrontEndService::processRxMessage(weak_ptr webSocket, const ix::WebSocketMessagePtr& msg) { +void IflytopFrontEndService::processRxMessage(weak_ptr webSocket, shared_ptr msg) { if (msg->binary) { logger->warn("receive binary message,ignore"); return; diff --git a/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.hpp b/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.hpp index 2f0515e..62295d3 100644 --- a/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.hpp +++ b/src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.hpp @@ -18,6 +18,7 @@ #include "iflytop/components/simple_udp/simple_udp.hpp" #include "iflytop/core/basic/nod/nod.hpp" #include "iflytop/core/spdlogfactory/logger.hpp" +#include "iflytoplinuxsdk/src/iflytop/core/components/jobs/work_queue.hpp" /** * @brief * @@ -42,13 +43,15 @@ class IflytopFrontEndService : public enable_shared_from_this webSocket, const ix::WebSocketMessagePtr& msg)> onMessage; - nod::signal webSocket, const ix::WebSocketMessagePtr& msg)> onConnect; - nod::signal webSocket, const ix::WebSocketMessagePtr& msg)> onDisconnect; + nod::signal webSocket, shared_ptr msg)> onConnect; + nod::signal webSocket, shared_ptr msg)> onDisconnect; nod::signal webSocket, json& cmd, json& receipt)> onMessage; nod::signal onUdpCmdMessage; + unique_ptr m_work_queue; + private: // // WebSocketServer的使用参考:http://192.168.1.3:3000/z3rd_lib/IXWebSocket/src/branch/master/docs/usage.md @@ -65,10 +68,10 @@ class IflytopFrontEndService : public enable_shared_from_thissendto(from, data, len); } + void sendToUDP(struct sockaddr_in* from, const char* data, size_t len) { m_udp->sendto(from, data, len); } private: - void onMessageCallback(weak_ptr webSocket, shared_ptr connectionState, const ix::WebSocketMessagePtr& msg); - void processRxMessage(weak_ptr webSocket, const ix::WebSocketMessagePtr& msg); + void onMessageCallback(weak_ptr webSocket, shared_ptr connectionState, shared_ptr msg); + void processRxMessage(weak_ptr webSocket, shared_ptr msg); }; } // namespace iflytop \ No newline at end of file diff --git a/src/iflytop/core/components/jobs/work_queue.cpp b/src/iflytop/core/components/jobs/work_queue.cpp index d3ba123..6d84c40 100644 --- a/src/iflytop/core/components/jobs/work_queue.cpp +++ b/src/iflytop/core/components/jobs/work_queue.cpp @@ -29,9 +29,15 @@ using namespace iflytop; using namespace core; using namespace std; -WorkQueue::WorkQueue(string name) { +WorkQueue::WorkQueue(string name) { name_ = name; }; +void WorkQueue::clear() { + function call_; + while (queue_.try_dequeue(call_)) { + } +}; +void WorkQueue::startWork() { is_running_ = true; - thread_.reset(new Thread(name, [this]() { + thread_.reset(new Thread(name_, [this]() { while (1) { if (!is_running_) { break; @@ -41,15 +47,9 @@ WorkQueue::WorkQueue(string name) { if (call_) call_(); } })); -}; -void WorkQueue::clear() { - function call_; - while (queue_.try_dequeue(call_)) { - /*clear queue*/ - } -}; +} size_t WorkQueue::getSize() { return queue_.size_approx(); } -void WorkQueue::enQueue(function func) { queue_.enqueue(func); } +void WorkQueue::enQueue(function func) { queue_.enqueue(func); } WorkQueue::~WorkQueue() { is_running_ = false; queue_.enqueue(nullptr); diff --git a/src/iflytop/core/components/jobs/work_queue.hpp b/src/iflytop/core/components/jobs/work_queue.hpp index ce7984c..9743cf8 100644 --- a/src/iflytop/core/components/jobs/work_queue.hpp +++ b/src/iflytop/core/components/jobs/work_queue.hpp @@ -28,15 +28,18 @@ namespace iflytop { namespace core { using namespace std; class WorkQueue { - bool is_running_ = false; + string name_; + bool is_running_ = false; unique_ptr thread_; + moodycamel::BlockingConcurrentQueue> queue_; public: WorkQueue(string name); - void clear(); + void startWork(); + void clear(); size_t getSize(); - void enQueue(function func); + void enQueue(function func); ~WorkQueue(); private: