From b7ea6f2bbc0e01f81638c94216bb0c1a55e07ed5 Mon Sep 17 00:00:00 2001 From: zhaohe Date: Thu, 4 Apr 2024 10:57:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9udp=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E7=B1=BB=E4=B8=BAC++=E6=A0=87=E5=87=86=E5=BA=93=E7=9A=84?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bak/xsync_udp_factory_impl.cpp | 220 +++++++++++++++++++++++++++++++++++++++++ bak/xsync_udp_factory_impl.hpp | 29 ++++++ libxsync | 2 +- src/xsync_udp_factory_impl.cpp | 128 +++++++----------------- 4 files changed, 283 insertions(+), 96 deletions(-) create mode 100644 bak/xsync_udp_factory_impl.cpp create mode 100644 bak/xsync_udp_factory_impl.hpp diff --git a/bak/xsync_udp_factory_impl.cpp b/bak/xsync_udp_factory_impl.cpp new file mode 100644 index 0000000..2fb937c --- /dev/null +++ b/bak/xsync_udp_factory_impl.cpp @@ -0,0 +1,220 @@ +#include "xsync_udp_factory_impl.hpp" + +#include "zqthread.hpp" +// +#include +// +#include + +#define ENABLE_LOG +#ifdef ENABLE_LOG +#include "../src/logger.hpp" +#endif +#define TAG "XSYNC_UDP" +#pragma comment(lib, "ws2_32") +using namespace xsync; +using namespace iflytop; + +/******************************************************************************* + * XSUDP * + *******************************************************************************/ + +#define USB_NO_BLOCK_UDP 0 + +class XSUDP : public I_XSUDP { + uint32_t m_ip; + int m_localport; + + struct sockaddr_in localadd = {}; + int m_sock_fd = 0; + unique_ptr m_zq_thread; + onMessage_t m_onMessage; + + char* m_rxbuf = nullptr; + size_t m_rxbufsize = 0; + + public: + virtual xs_error_code_t initialize(string ip, int localport) override; + virtual xs_error_code_t sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) override; + virtual xs_error_code_t receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) override; + virtual xs_error_code_t startReceive(onMessage_t onMessage) override; + virtual xs_error_code_t stopReceive() override; + virtual xs_error_code_t clearRxBuffer() override; + virtual ~XSUDP(); +}; + +const char* fmtip(uint32_t ip) { + static char ipstr[16]; + sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff); + return ipstr; +} + +xs_error_code_t XSUDP::initialize(string ip, int localport) { + localadd.sin_family = AF_INET; + localadd.sin_addr.s_addr = inet_addr(ip.c_str()); + localadd.sin_port = htons(localport); + + // 创建客户端用于通信的Socket + m_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (m_sock_fd < 0) return kxs_ec_socket_fail; + + int ret = bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd)); + if (ret < 0) return kxs_ec_bind_fail; + +#if USB_NO_BLOCK_UDP + u_long mode = 1; + if (ioctlsocket(m_sock_fd, FIONBIO, &mode) != NO_ERROR) { + return kxs_ec_setsockopt_rx_timeout_fail; + } +#endif + return kxs_ec_success; +} + +xs_error_code_t XSUDP::sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) { + struct sockaddr_in sockaddr; + sockaddr.sin_family = AF_INET; + sockaddr.sin_addr.s_addr = inet_addr(to.ip.c_str()); + sockaddr.sin_port = htons(to.port); + int ret = ::sendto(m_sock_fd, data, length, 0, (struct sockaddr*)&sockaddr, sizeof(sockaddr)); + if (sendlength) *sendlength = ret; + if (ret >= 0) { + return kxs_ec_success; + } + return kxs_ec_send_fail; +} +xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) { +#if USB_NO_BLOCK_UDP + struct sockaddr_in sockaddr = {0}; + int32_t overtime_10ms = overtimems / 10; + int ret = 0; + overtime_10ms += 1; + for (size_t i = 0;; i++) { + int senderAddressLen = sizeof(sockaddr); + ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen); + length = ret; + + if (ret > 0) break; + if (i >= overtime_10ms) break; + Sleep(10); + } + + if (ret <= 0) { + return kxs_ec_overtime; + } + + uint32_t ip = ntohl(sockaddr.sin_addr.s_addr); + from.ip = string(fmtip(ip)); + from.port = ntohs(sockaddr.sin_port); + + return kxs_ec_success; +#else + struct sockaddr_in sockaddr = {0}; + + timeval timeout; + timeout.tv_sec = overtimems; // 这个结构体的单位是ms,不是秒 + timeout.tv_usec = 0; + + if (timeout.tv_sec == 0) { + timeout.tv_sec = 1; + } + + if (setsockopt(m_sock_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)) == -1) { + return kxs_ec_setsockopt_rx_timeout_fail; + } + int senderAddressLen = sizeof(sockaddr); + int ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen); + length = ret; + data[length] = 0; + + if (ret < 0) { + // ZLOGI(TAG, "recvfrom error %d %s", ret, strerror(errno)); + // if (errno == EWOULDBLOCK || errno == EAGAIN) { + return kxs_ec_overtime; + // } else { + // return kxs_ec_receive_fail; + // } + } + + // inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN); + uint32_t ip = ntohl(sockaddr.sin_addr.s_addr); + from.ip = string(fmtip(ip)); + from.port = ntohs(sockaddr.sin_port); + + return kxs_ec_success; +#endif +} + +xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) { + m_onMessage = onMessage; + + if (m_zq_thread) { + return kxs_ec_success; + } + m_rxbuf = (char*)malloc(10240); + m_rxbufsize = 10240; + + m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() { + while (!m_zq_thread->isTryExit()) { + memset(m_rxbuf, 0, m_rxbufsize); + int32_t length = m_rxbufsize; + XsyncNetAdd from; + xs_error_code_t ret = receive(m_rxbuf, length, from, 1000); + if (ret == kxs_ec_success) { + if (m_onMessage) m_onMessage(from, (uint8_t*)m_rxbuf, length); + } + } + })); + m_zq_thread->start(); + return kxs_ec_success; +} + +xs_error_code_t XSUDP::stopReceive() { + if (m_zq_thread) { + m_zq_thread->quit(); + m_zq_thread->wait(); + m_zq_thread.reset(nullptr); + } + + if (!m_rxbuf) { + free(m_rxbuf); + m_rxbuf = nullptr; + m_rxbufsize = 0; + } +} + +xs_error_code_t XSUDP::clearRxBuffer() { +#if USB_NO_BLOCK_UDP + char buf[1024]; + int32_t length = 1024; + XsyncNetAdd from; + xs_error_code_t ret = kxs_ec_success; + + while (ret == kxs_ec_success) { + ret = receive(buf, length, from, 1); + } +#endif + + return kxs_ec_success; +} + +XSUDP::~XSUDP() { + stopReceive(); + + if (m_sock_fd > 0) { + closesocket(m_sock_fd); + m_sock_fd = -1; + } +} + +/******************************************************************************* + * xSyncUdpFactoryImpl * + *******************************************************************************/ + +void XSyncUdpFactoryImpl::initialize() {} + +XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() { + static XSyncUdpFactoryImpl instance; + return &instance; +} + +shared_ptr XSyncUdpFactoryImpl::createXSUDP() { return make_shared(); } \ No newline at end of file diff --git a/bak/xsync_udp_factory_impl.hpp b/bak/xsync_udp_factory_impl.hpp new file mode 100644 index 0000000..42409f1 --- /dev/null +++ b/bak/xsync_udp_factory_impl.hpp @@ -0,0 +1,29 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "xsync_v2.hpp" +namespace xsync { +using namespace std; + +class XSyncUdpFactoryImpl : public I_XSUDPFactory { + private: + /* data */ + XSyncUdpFactoryImpl(){}; + + public: + static XSyncUdpFactoryImpl* Ins(); + + void initialize(); + virtual shared_ptr createXSUDP() override; +}; + +} // namespace xsync \ No newline at end of file diff --git a/libxsync b/libxsync index 7b7c6a3..5592dbb 160000 --- a/libxsync +++ b/libxsync @@ -1 +1 @@ -Subproject commit 7b7c6a3b5442f59c6dd14715eedd7275b00b0765 +Subproject commit 5592dbb769b0b9262e534615d169ec52ab79bc2c diff --git a/src/xsync_udp_factory_impl.cpp b/src/xsync_udp_factory_impl.cpp index 2fb937c..6b43330 100644 --- a/src/xsync_udp_factory_impl.cpp +++ b/src/xsync_udp_factory_impl.cpp @@ -1,37 +1,30 @@ #include "xsync_udp_factory_impl.hpp" -#include "zqthread.hpp" -// -#include -// #include +#include + +#include -#define ENABLE_LOG -#ifdef ENABLE_LOG -#include "../src/logger.hpp" -#endif -#define TAG "XSYNC_UDP" #pragma comment(lib, "ws2_32") using namespace xsync; -using namespace iflytop; - -/******************************************************************************* - * XSUDP * - *******************************************************************************/ - -#define USB_NO_BLOCK_UDP 0 +const char* fmtip(uint32_t ip) { + static char ipstr[16]; + sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff); + return ipstr; +} class XSUDP : public I_XSUDP { uint32_t m_ip; int m_localport; - struct sockaddr_in localadd = {}; - int m_sock_fd = 0; - unique_ptr m_zq_thread; - onMessage_t m_onMessage; + struct sockaddr_in localadd = {}; + int m_sock_fd = 0; + unique_ptr m_thread; + onMessage_t m_onMessage; - char* m_rxbuf = nullptr; - size_t m_rxbufsize = 0; + char* m_rxbuf = nullptr; + size_t m_rxbufsize = 0; + bool m_tryStopFlag = false; public: virtual xs_error_code_t initialize(string ip, int localport) override; @@ -43,12 +36,6 @@ class XSUDP : public I_XSUDP { virtual ~XSUDP(); }; -const char* fmtip(uint32_t ip) { - static char ipstr[16]; - sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff); - return ipstr; -} - xs_error_code_t XSUDP::initialize(string ip, int localport) { localadd.sin_family = AF_INET; localadd.sin_addr.s_addr = inet_addr(ip.c_str()); @@ -58,15 +45,8 @@ xs_error_code_t XSUDP::initialize(string ip, int localport) { m_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (m_sock_fd < 0) return kxs_ec_socket_fail; - int ret = bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd)); + int ret = ::bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd)); if (ret < 0) return kxs_ec_bind_fail; - -#if USB_NO_BLOCK_UDP - u_long mode = 1; - if (ioctlsocket(m_sock_fd, FIONBIO, &mode) != NO_ERROR) { - return kxs_ec_setsockopt_rx_timeout_fail; - } -#endif return kxs_ec_success; } @@ -83,31 +63,6 @@ xs_error_code_t XSUDP::sendto(const XsyncNetAdd& to, const char* data, int32_t l return kxs_ec_send_fail; } xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) { -#if USB_NO_BLOCK_UDP - struct sockaddr_in sockaddr = {0}; - int32_t overtime_10ms = overtimems / 10; - int ret = 0; - overtime_10ms += 1; - for (size_t i = 0;; i++) { - int senderAddressLen = sizeof(sockaddr); - ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen); - length = ret; - - if (ret > 0) break; - if (i >= overtime_10ms) break; - Sleep(10); - } - - if (ret <= 0) { - return kxs_ec_overtime; - } - - uint32_t ip = ntohl(sockaddr.sin_addr.s_addr); - from.ip = string(fmtip(ip)); - from.port = ntohs(sockaddr.sin_port); - - return kxs_ec_success; -#else struct sockaddr_in sockaddr = {0}; timeval timeout; @@ -127,34 +82,28 @@ xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, i data[length] = 0; if (ret < 0) { - // ZLOGI(TAG, "recvfrom error %d %s", ret, strerror(errno)); - // if (errno == EWOULDBLOCK || errno == EAGAIN) { return kxs_ec_overtime; - // } else { - // return kxs_ec_receive_fail; - // } } - // inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN); uint32_t ip = ntohl(sockaddr.sin_addr.s_addr); from.ip = string(fmtip(ip)); from.port = ntohs(sockaddr.sin_port); return kxs_ec_success; -#endif } xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) { m_onMessage = onMessage; - if (m_zq_thread) { + if (m_thread) { return kxs_ec_success; } - m_rxbuf = (char*)malloc(10240); - m_rxbufsize = 10240; + m_rxbuf = (char*)malloc(10240); + m_rxbufsize = 10240; + m_tryStopFlag = false; - m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() { - while (!m_zq_thread->isTryExit()) { + m_thread.reset(new thread([this]() { + while (!m_tryStopFlag) { memset(m_rxbuf, 0, m_rxbufsize); int32_t length = m_rxbufsize; XsyncNetAdd from; @@ -164,15 +113,14 @@ xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) { } } })); - m_zq_thread->start(); return kxs_ec_success; } xs_error_code_t XSUDP::stopReceive() { - if (m_zq_thread) { - m_zq_thread->quit(); - m_zq_thread->wait(); - m_zq_thread.reset(nullptr); + if (m_thread) { + m_tryStopFlag = true; + m_thread->join(); + m_thread.reset(nullptr); } if (!m_rxbuf) { @@ -180,41 +128,31 @@ xs_error_code_t XSUDP::stopReceive() { m_rxbuf = nullptr; m_rxbufsize = 0; } + + return kxs_ec_success; } xs_error_code_t XSUDP::clearRxBuffer() { -#if USB_NO_BLOCK_UDP - char buf[1024]; - int32_t length = 1024; - XsyncNetAdd from; - xs_error_code_t ret = kxs_ec_success; - - while (ret == kxs_ec_success) { - ret = receive(buf, length, from, 1); - } -#endif + // xs_error_code_t ret = receive(m_rxbuf, length, from, 1000); + // char rxbuf[1024] = {0}; + // int32_t len = 0; + // xs_error_code_t ret; + // ret = receive(rxbuf, len, 2); return kxs_ec_success; } XSUDP::~XSUDP() { stopReceive(); - if (m_sock_fd > 0) { closesocket(m_sock_fd); m_sock_fd = -1; } } - -/******************************************************************************* - * xSyncUdpFactoryImpl * - *******************************************************************************/ - void XSyncUdpFactoryImpl::initialize() {} XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() { static XSyncUdpFactoryImpl instance; return &instance; } - shared_ptr XSyncUdpFactoryImpl::createXSUDP() { return make_shared(); } \ No newline at end of file