From f424993e14ddec73edd8a34a7027ee7cce034cb9 Mon Sep 17 00:00:00 2001 From: zhaohe Date: Thu, 4 Jan 2024 20:30:45 +0800 Subject: [PATCH] update --- mainwindow.cpp | 22 +++++------------- src/xsync_udp_factory_impl.cpp | 52 ++++++++++++++++++++++++++++++++---------- src/xsync_udp_factory_impl.hpp | 1 - src/zqthread.cpp | 7 +++++- src/zqthread.hpp | 4 ++++ 5 files changed, 56 insertions(+), 30 deletions(-) diff --git a/mainwindow.cpp b/mainwindow.cpp index 67a11d0..b5f846f 100644 --- a/mainwindow.cpp +++ b/mainwindow.cpp @@ -120,22 +120,12 @@ MainWindow::~MainWindow() { delete ui; } void MainWindow::mainWindowsRun() { // XSyncUdpFactoryImpl::Ins()->initialize(); - auto xsudp = XSyncUdpFactoryImpl::Ins()->createXSUDP(); + auto xsudp = XSyncUdpFactoryImpl::Ins()->createXSUDP(); XS_ASSERT(xsudp->initialize("0.0.0.0", 9999)); - - while (true) { - XS_ASSERT(xsudp->sendto(XsyncNetAdd("127.0.0.1", 9973), "hello\n", 5, NULL)); - ZQThread::sleep(1); - - XsyncNetAdd from; - char rxcache[1024] = {0}; - int rxlength = 1024; - auto ecode = xsudp->receive(rxcache, rxlength, from, 5000); - if (ecode == kxs_ec_success) { - ZLOGI(TAG, "receive from %s:%d %s(%d)", from.ip.c_str(), from.port, rxcache, rxlength); - } else { - ZLOGE(TAG, "receive fail,ecode:%d,%d", ecode, rxlength); - } - } + xsudp->startReceive([this, xsudp](XsyncNetAdd &from, uint8_t *data, size_t length) { + ZLOGI(TAG, "receive from <%s:%d> (%d) :%s", from.ip.c_str(), from.port, data, length); + xsudp->sendto(from, "hello\n", 5, NULL); + }); + } diff --git a/src/xsync_udp_factory_impl.cpp b/src/xsync_udp_factory_impl.cpp index 96c76d3..e40786d 100644 --- a/src/xsync_udp_factory_impl.cpp +++ b/src/xsync_udp_factory_impl.cpp @@ -1,21 +1,34 @@ #include "xsync_udp_factory_impl.hpp" +#include "zqthread.hpp" +// #include // #include #pragma comment(lib, "ws2_32") using namespace iflytop; + +/******************************************************************************* + * XSUDP * + *******************************************************************************/ class XSUDP : public I_XSUDP { uint32_t m_ip; int m_localport; - struct sockaddr_in localadd = {}; - int m_sock_fd = 0; + struct sockaddr_in localadd = {}; + int m_sock_fd = 0; + unique_ptr m_zq_thread; + onMessage_t m_onMessage; + + char* m_rxbuf = nullptr; + size_t m_rxbufsize = 0; public: virtual xs_error_code_t initialize(string ip, int localport) override; virtual xs_error_code_t sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) override; virtual xs_error_code_t receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) override; + virtual xs_error_code_t startReceive(onMessage_t onMessage) override; + ; }; const char* fmtip(uint32_t ip) { @@ -60,18 +73,15 @@ xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, i return kxs_ec_setsockopt_rx_timeout_fail; } - // sockaddr.sin_family = AF_INET; - // sockaddr.sin_addr.s_addr = inet_addr(from.ip.c_str()); - // sockaddr.sin_port = htons(from.port); int senderAddressLen = sizeof(sockaddr); int ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen); length = ret; if (ret < 0) { - if (errno == EWOULDBLOCK || errno == EAGAIN) { - return kxs_ec_overtime; - } else { - return kxs_ec_receive_fail; - } + // if (errno == EWOULDBLOCK || errno == EAGAIN) { + return kxs_ec_overtime; + // } else { + // return kxs_ec_receive_fail; + // } } // inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN); @@ -82,6 +92,25 @@ xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, i return kxs_ec_success; } +xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) { + m_onMessage = onMessage; + m_rxbuf = (char*)malloc(10240); + m_rxbufsize = 10240; + + m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() { + while (!m_zq_thread->isTryExit()) { + memset(m_rxbuf, 0, m_rxbufsize); + int32_t length = m_rxbufsize; + XsyncNetAdd from; + xs_error_code_t ret = receive(m_rxbuf, length, from, 1000); + if (ret == kxs_ec_success) { + if (m_onMessage) m_onMessage(from, (uint8_t*)m_rxbuf, length); + } + } + })); + m_zq_thread->start(); +} + /******************************************************************************* * xSyncUdpFactoryImpl * *******************************************************************************/ @@ -93,5 +122,4 @@ XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() { return &instance; } -shared_ptr XSyncUdpFactoryImpl::createXSUDPListener() { return nullptr; } -shared_ptr XSyncUdpFactoryImpl::createXSUDP() { return make_shared(); } \ No newline at end of file +shared_ptr XSyncUdpFactoryImpl::createXSUDP() { return make_shared(); } \ No newline at end of file diff --git a/src/xsync_udp_factory_impl.hpp b/src/xsync_udp_factory_impl.hpp index 41a11c3..d075ff3 100644 --- a/src/xsync_udp_factory_impl.hpp +++ b/src/xsync_udp_factory_impl.hpp @@ -23,7 +23,6 @@ class XSyncUdpFactoryImpl : public I_XSyncUDPFactory { static XSyncUdpFactoryImpl* Ins(); void initialize(); - virtual shared_ptr createXSUDPListener() override; virtual shared_ptr createXSUDP() override; }; diff --git a/src/zqthread.cpp b/src/zqthread.cpp index ac072a5..5444615 100644 --- a/src/zqthread.cpp +++ b/src/zqthread.cpp @@ -9,4 +9,9 @@ ZQThread::ZQThread(const char* name, function cb) { void ZQThread::run() { if (cb_) cb_(); -} \ No newline at end of file +} + +void ZQThread::quit() { + tryexitflag_ = true; + QThread::quit(); +} diff --git a/src/zqthread.hpp b/src/zqthread.hpp index 05c3751..938d76e 100644 --- a/src/zqthread.hpp +++ b/src/zqthread.hpp @@ -22,9 +22,13 @@ class ZQThread : public QThread { private: QString name_; function cb_; + bool tryexitflag_ = false; public: ZQThread(const char* name, function cb); + void quit(); + + bool isTryExit() { return tryexitflag_; } protected: virtual void run() override;