Browse Source

前端消息通过工作队列进行处理,保证线性

disinfection_machine
zhaohe 2 years ago
parent
commit
808d14861a
  1. 22
      src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.cpp
  2. 13
      src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.hpp
  3. 20
      src/iflytop/core/components/jobs/work_queue.cpp
  4. 9
      src/iflytop/core/components/jobs/work_queue.hpp

22
src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.cpp

@ -13,17 +13,24 @@ void IflytopFrontEndService::initialize(string ip) {
logger->info("IflytopFrontEndService initialize {}:{}", ip, 19001);
logger->info("IflytopFrontEndService initialize {}:{}", ip, 19002);
m_work_queue.reset(new WorkQueue("IflytopFrontEndService-WQ"));
m_work_queue->startWork();
m_server.reset(new WebSocketServer(19001, ip));
m_server->setOnConnectionCallback([this](weak_ptr<WebSocket> webSocket, shared_ptr<ConnectionState> connectionState) {
logger->info("Remote ip: {}", connectionState->getRemoteIp());
auto ws = webSocket.lock();
if (!ws) return;
ws->setOnMessageCallback([this, webSocket, connectionState](const ix::WebSocketMessagePtr& msg) {
try {
onMessageCallback(webSocket, connectionState, msg);
} catch (const std::exception& e) {
logger->error("catch exception,onMessageCallback error: {}", e.what());
}
shared_ptr<ix::WebSocketMessage> messageSPtr = make_shared<ix::WebSocketMessage>(*msg);
m_work_queue->enQueue([this, webSocket, connectionState, messageSPtr]() {
try {
onMessageCallback(webSocket, connectionState, messageSPtr);
} catch (const std::exception& e) {
logger->error("catch exception,onMessageCallback error: {}", e.what());
}
});
});
});
@ -102,7 +109,8 @@ void IflytopFrontEndService::sendReport(json& report) {
}
}
void IflytopFrontEndService::onMessageCallback(weak_ptr<WebSocket> webSocket, shared_ptr<ConnectionState> connectionState, const ix::WebSocketMessagePtr& msg) {
void IflytopFrontEndService::onMessageCallback(weak_ptr<WebSocket> webSocket, shared_ptr<ConnectionState> connectionState,
shared_ptr<ix::WebSocketMessage> msg) {
if (msg->type == ix::WebSocketMessageType::Open) {
logger->info("New connection");
logger->info("id : {}", connectionState->getId());
@ -125,7 +133,7 @@ void IflytopFrontEndService::onMessageCallback(weak_ptr<WebSocket> webSocket, sh
onDisconnect(webSocket, msg);
}
}
void IflytopFrontEndService::processRxMessage(weak_ptr<WebSocket> webSocket, const ix::WebSocketMessagePtr& msg) {
void IflytopFrontEndService::processRxMessage(weak_ptr<WebSocket> webSocket, shared_ptr<ix::WebSocketMessage> msg) {
if (msg->binary) {
logger->warn("receive binary message,ignore");
return;

13
src/iflytop/components/iflytop_front_end_service/iflytop_front_end_service.hpp

@ -18,6 +18,7 @@
#include "iflytop/components/simple_udp/simple_udp.hpp"
#include "iflytop/core/basic/nod/nod.hpp"
#include "iflytop/core/spdlogfactory/logger.hpp"
#include "iflytoplinuxsdk/src/iflytop/core/components/jobs/work_queue.hpp"
/**
* @brief
*
@ -42,13 +43,15 @@ class IflytopFrontEndService : public enable_shared_from_this<IflytopFrontEndSer
public:
// nod::signal<void(weak_ptr<WebSocket> webSocket, const ix::WebSocketMessagePtr& msg)> onMessage;
nod::signal<void(weak_ptr<WebSocket> webSocket, const ix::WebSocketMessagePtr& msg)> onConnect;
nod::signal<void(weak_ptr<WebSocket> webSocket, const ix::WebSocketMessagePtr& msg)> onDisconnect;
nod::signal<void(weak_ptr<WebSocket> webSocket, shared_ptr<ix::WebSocketMessage> msg)> onConnect;
nod::signal<void(weak_ptr<WebSocket> webSocket, shared_ptr<ix::WebSocketMessage> msg)> onDisconnect;
nod::signal<void(weak_ptr<WebSocket> webSocket, json& cmd, json& receipt)> onMessage;
nod::signal<void(struct sockaddr_in* from, char* data, size_t len)> onUdpCmdMessage;
unique_ptr<WorkQueue> m_work_queue;
private:
//
// WebSocketServer的使用参考:http://192.168.1.3:3000/z3rd_lib/IXWebSocket/src/branch/master/docs/usage.md
@ -65,10 +68,10 @@ class IflytopFrontEndService : public enable_shared_from_this<IflytopFrontEndSer
// void sendMessage(const string& message);
void sendReport(json& report);
void sendToUDP(struct sockaddr_in* from,const char* data, size_t len) { m_udp->sendto(from, data, len); }
void sendToUDP(struct sockaddr_in* from, const char* data, size_t len) { m_udp->sendto(from, data, len); }
private:
void onMessageCallback(weak_ptr<WebSocket> webSocket, shared_ptr<ConnectionState> connectionState, const ix::WebSocketMessagePtr& msg);
void processRxMessage(weak_ptr<WebSocket> webSocket, const ix::WebSocketMessagePtr& msg);
void onMessageCallback(weak_ptr<WebSocket> webSocket, shared_ptr<ConnectionState> connectionState, shared_ptr<ix::WebSocketMessage> msg);
void processRxMessage(weak_ptr<WebSocket> webSocket, shared_ptr<ix::WebSocketMessage> msg);
};
} // namespace iflytop

20
src/iflytop/core/components/jobs/work_queue.cpp

@ -29,9 +29,15 @@ using namespace iflytop;
using namespace core;
using namespace std;
WorkQueue::WorkQueue(string name) {
WorkQueue::WorkQueue(string name) { name_ = name; };
void WorkQueue::clear() {
function<void(void)> call_;
while (queue_.try_dequeue(call_)) {
}
};
void WorkQueue::startWork() {
is_running_ = true;
thread_.reset(new Thread(name, [this]() {
thread_.reset(new Thread(name_, [this]() {
while (1) {
if (!is_running_) {
break;
@ -41,15 +47,9 @@ WorkQueue::WorkQueue(string name) {
if (call_) call_();
}
}));
};
void WorkQueue::clear() {
function<void(void)> call_;
while (queue_.try_dequeue(call_)) {
/*clear queue*/
}
};
}
size_t WorkQueue::getSize() { return queue_.size_approx(); }
void WorkQueue::enQueue(function<void(void)> func) { queue_.enqueue(func); }
void WorkQueue::enQueue(function<void(void)> func) { queue_.enqueue(func); }
WorkQueue::~WorkQueue() {
is_running_ = false;
queue_.enqueue(nullptr);

9
src/iflytop/core/components/jobs/work_queue.hpp

@ -28,15 +28,18 @@ namespace iflytop {
namespace core {
using namespace std;
class WorkQueue {
bool is_running_ = false;
string name_;
bool is_running_ = false;
unique_ptr<Thread> thread_;
moodycamel::BlockingConcurrentQueue<function<void(void)>> queue_;
public:
WorkQueue(string name);
void clear();
void startWork();
void clear();
size_t getSize();
void enQueue(function<void(void)> func);
void enQueue(function<void(void)> func);
~WorkQueue();
private:

Loading…
Cancel
Save