#include "xsync_udp_factory_impl.hpp" #include "zqthread.hpp" // #include // #include #define ENABLE_LOG #ifdef ENABLE_LOG #include "../src/logger.hpp" #endif #define TAG "XSYNC_UDP" #pragma comment(lib, "ws2_32") using namespace xsync; using namespace iflytop; /******************************************************************************* * XSUDP * *******************************************************************************/ #define USB_NO_BLOCK_UDP 0 class XSUDP : public I_XSUDP { uint32_t m_ip; int m_localport; struct sockaddr_in localadd = {}; int m_sock_fd = 0; unique_ptr m_zq_thread; onMessage_t m_onMessage; char* m_rxbuf = nullptr; size_t m_rxbufsize = 0; public: virtual xs_error_code_t initialize(string ip, int localport) override; virtual xs_error_code_t sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) override; virtual xs_error_code_t receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) override; virtual xs_error_code_t startReceive(onMessage_t onMessage) override; virtual xs_error_code_t stopReceive() override; virtual xs_error_code_t clearRxBuffer() override; virtual ~XSUDP(); }; const char* fmtip(uint32_t ip) { static char ipstr[16]; sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff); return ipstr; } xs_error_code_t XSUDP::initialize(string ip, int localport) { localadd.sin_family = AF_INET; localadd.sin_addr.s_addr = inet_addr(ip.c_str()); localadd.sin_port = htons(localport); // 创建客户端用于通信的Socket m_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (m_sock_fd < 0) return kxs_ec_socket_fail; int ret = bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd)); if (ret < 0) return kxs_ec_bind_fail; #if USB_NO_BLOCK_UDP u_long mode = 1; if (ioctlsocket(m_sock_fd, FIONBIO, &mode) != NO_ERROR) { return kxs_ec_setsockopt_rx_timeout_fail; } #endif return kxs_ec_success; } xs_error_code_t XSUDP::sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) { struct sockaddr_in sockaddr; sockaddr.sin_family = AF_INET; sockaddr.sin_addr.s_addr = inet_addr(to.ip.c_str()); sockaddr.sin_port = htons(to.port); int ret = ::sendto(m_sock_fd, data, length, 0, (struct sockaddr*)&sockaddr, sizeof(sockaddr)); if (sendlength) *sendlength = ret; if (ret >= 0) { return kxs_ec_success; } return kxs_ec_send_fail; } xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) { #if USB_NO_BLOCK_UDP struct sockaddr_in sockaddr = {0}; int32_t overtime_10ms = overtimems / 10; int ret = 0; overtime_10ms += 1; for (size_t i = 0;; i++) { int senderAddressLen = sizeof(sockaddr); ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen); length = ret; if (ret > 0) break; if (i >= overtime_10ms) break; Sleep(10); } if (ret <= 0) { return kxs_ec_overtime; } uint32_t ip = ntohl(sockaddr.sin_addr.s_addr); from.ip = string(fmtip(ip)); from.port = ntohs(sockaddr.sin_port); return kxs_ec_success; #else struct sockaddr_in sockaddr = {0}; timeval timeout; timeout.tv_sec = overtimems; // 这个结构体的单位是ms,不是秒 timeout.tv_usec = 0; if (timeout.tv_sec == 0) { timeout.tv_sec = 1; } if (setsockopt(m_sock_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)) == -1) { return kxs_ec_setsockopt_rx_timeout_fail; } int senderAddressLen = sizeof(sockaddr); int ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen); length = ret; data[length] = 0; if (ret < 0) { // ZLOGI(TAG, "recvfrom error %d %s", ret, strerror(errno)); // if (errno == EWOULDBLOCK || errno == EAGAIN) { return kxs_ec_overtime; // } else { // return kxs_ec_receive_fail; // } } // inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN); uint32_t ip = ntohl(sockaddr.sin_addr.s_addr); from.ip = string(fmtip(ip)); from.port = ntohs(sockaddr.sin_port); return kxs_ec_success; #endif } xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) { m_onMessage = onMessage; if (m_zq_thread) { return kxs_ec_success; } m_rxbuf = (char*)malloc(10240); m_rxbufsize = 10240; m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() { while (!m_zq_thread->isTryExit()) { memset(m_rxbuf, 0, m_rxbufsize); int32_t length = m_rxbufsize; XsyncNetAdd from; xs_error_code_t ret = receive(m_rxbuf, length, from, 1000); if (ret == kxs_ec_success) { if (m_onMessage) m_onMessage(from, (uint8_t*)m_rxbuf, length); } } })); m_zq_thread->start(); return kxs_ec_success; } xs_error_code_t XSUDP::stopReceive() { if (m_zq_thread) { m_zq_thread->quit(); m_zq_thread->wait(); m_zq_thread.reset(nullptr); } if (!m_rxbuf) { free(m_rxbuf); m_rxbuf = nullptr; m_rxbufsize = 0; } } xs_error_code_t XSUDP::clearRxBuffer() { #if USB_NO_BLOCK_UDP char buf[1024]; int32_t length = 1024; XsyncNetAdd from; xs_error_code_t ret = kxs_ec_success; while (ret == kxs_ec_success) { ret = receive(buf, length, from, 1); } #endif return kxs_ec_success; } XSUDP::~XSUDP() { stopReceive(); if (m_sock_fd > 0) { closesocket(m_sock_fd); m_sock_fd = -1; } } /******************************************************************************* * xSyncUdpFactoryImpl * *******************************************************************************/ void XSyncUdpFactoryImpl::initialize() {} XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() { static XSyncUdpFactoryImpl instance; return &instance; } shared_ptr XSyncUdpFactoryImpl::createXSUDP() { return make_shared(); }