Browse Source

修改udp实现类为C++标准库的实现方式

master
zhaohe 1 year ago
parent
commit
b7ea6f2bbc
  1. 220
      bak/xsync_udp_factory_impl.cpp
  2. 29
      bak/xsync_udp_factory_impl.hpp
  3. 2
      libxsync
  4. 128
      src/xsync_udp_factory_impl.cpp

220
bak/xsync_udp_factory_impl.cpp

@ -0,0 +1,220 @@
#include "xsync_udp_factory_impl.hpp"
#include "zqthread.hpp"
//
#include <winsock2.h>
//
#include <Windows.h>
#define ENABLE_LOG
#ifdef ENABLE_LOG
#include "../src/logger.hpp"
#endif
#define TAG "XSYNC_UDP"
#pragma comment(lib, "ws2_32")
using namespace xsync;
using namespace iflytop;
/*******************************************************************************
* XSUDP *
*******************************************************************************/
#define USB_NO_BLOCK_UDP 0
class XSUDP : public I_XSUDP {
uint32_t m_ip;
int m_localport;
struct sockaddr_in localadd = {};
int m_sock_fd = 0;
unique_ptr<ZQThread> m_zq_thread;
onMessage_t m_onMessage;
char* m_rxbuf = nullptr;
size_t m_rxbufsize = 0;
public:
virtual xs_error_code_t initialize(string ip, int localport) override;
virtual xs_error_code_t sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) override;
virtual xs_error_code_t receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) override;
virtual xs_error_code_t startReceive(onMessage_t onMessage) override;
virtual xs_error_code_t stopReceive() override;
virtual xs_error_code_t clearRxBuffer() override;
virtual ~XSUDP();
};
const char* fmtip(uint32_t ip) {
static char ipstr[16];
sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff);
return ipstr;
}
xs_error_code_t XSUDP::initialize(string ip, int localport) {
localadd.sin_family = AF_INET;
localadd.sin_addr.s_addr = inet_addr(ip.c_str());
localadd.sin_port = htons(localport);
// 创建客户端用于通信的Socket
m_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (m_sock_fd < 0) return kxs_ec_socket_fail;
int ret = bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd));
if (ret < 0) return kxs_ec_bind_fail;
#if USB_NO_BLOCK_UDP
u_long mode = 1;
if (ioctlsocket(m_sock_fd, FIONBIO, &mode) != NO_ERROR) {
return kxs_ec_setsockopt_rx_timeout_fail;
}
#endif
return kxs_ec_success;
}
xs_error_code_t XSUDP::sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) {
struct sockaddr_in sockaddr;
sockaddr.sin_family = AF_INET;
sockaddr.sin_addr.s_addr = inet_addr(to.ip.c_str());
sockaddr.sin_port = htons(to.port);
int ret = ::sendto(m_sock_fd, data, length, 0, (struct sockaddr*)&sockaddr, sizeof(sockaddr));
if (sendlength) *sendlength = ret;
if (ret >= 0) {
return kxs_ec_success;
}
return kxs_ec_send_fail;
}
xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) {
#if USB_NO_BLOCK_UDP
struct sockaddr_in sockaddr = {0};
int32_t overtime_10ms = overtimems / 10;
int ret = 0;
overtime_10ms += 1;
for (size_t i = 0;; i++) {
int senderAddressLen = sizeof(sockaddr);
ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen);
length = ret;
if (ret > 0) break;
if (i >= overtime_10ms) break;
Sleep(10);
}
if (ret <= 0) {
return kxs_ec_overtime;
}
uint32_t ip = ntohl(sockaddr.sin_addr.s_addr);
from.ip = string(fmtip(ip));
from.port = ntohs(sockaddr.sin_port);
return kxs_ec_success;
#else
struct sockaddr_in sockaddr = {0};
timeval timeout;
timeout.tv_sec = overtimems; // 这个结构体的单位是ms,不是秒
timeout.tv_usec = 0;
if (timeout.tv_sec == 0) {
timeout.tv_sec = 1;
}
if (setsockopt(m_sock_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)) == -1) {
return kxs_ec_setsockopt_rx_timeout_fail;
}
int senderAddressLen = sizeof(sockaddr);
int ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen);
length = ret;
data[length] = 0;
if (ret < 0) {
// ZLOGI(TAG, "recvfrom error %d %s", ret, strerror(errno));
// if (errno == EWOULDBLOCK || errno == EAGAIN) {
return kxs_ec_overtime;
// } else {
// return kxs_ec_receive_fail;
// }
}
// inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN);
uint32_t ip = ntohl(sockaddr.sin_addr.s_addr);
from.ip = string(fmtip(ip));
from.port = ntohs(sockaddr.sin_port);
return kxs_ec_success;
#endif
}
xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) {
m_onMessage = onMessage;
if (m_zq_thread) {
return kxs_ec_success;
}
m_rxbuf = (char*)malloc(10240);
m_rxbufsize = 10240;
m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() {
while (!m_zq_thread->isTryExit()) {
memset(m_rxbuf, 0, m_rxbufsize);
int32_t length = m_rxbufsize;
XsyncNetAdd from;
xs_error_code_t ret = receive(m_rxbuf, length, from, 1000);
if (ret == kxs_ec_success) {
if (m_onMessage) m_onMessage(from, (uint8_t*)m_rxbuf, length);
}
}
}));
m_zq_thread->start();
return kxs_ec_success;
}
xs_error_code_t XSUDP::stopReceive() {
if (m_zq_thread) {
m_zq_thread->quit();
m_zq_thread->wait();
m_zq_thread.reset(nullptr);
}
if (!m_rxbuf) {
free(m_rxbuf);
m_rxbuf = nullptr;
m_rxbufsize = 0;
}
}
xs_error_code_t XSUDP::clearRxBuffer() {
#if USB_NO_BLOCK_UDP
char buf[1024];
int32_t length = 1024;
XsyncNetAdd from;
xs_error_code_t ret = kxs_ec_success;
while (ret == kxs_ec_success) {
ret = receive(buf, length, from, 1);
}
#endif
return kxs_ec_success;
}
XSUDP::~XSUDP() {
stopReceive();
if (m_sock_fd > 0) {
closesocket(m_sock_fd);
m_sock_fd = -1;
}
}
/*******************************************************************************
* xSyncUdpFactoryImpl *
*******************************************************************************/
void XSyncUdpFactoryImpl::initialize() {}
XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() {
static XSyncUdpFactoryImpl instance;
return &instance;
}
shared_ptr<I_XSUDP> XSyncUdpFactoryImpl::createXSUDP() { return make_shared<XSUDP>(); }

29
bak/xsync_udp_factory_impl.hpp

@ -0,0 +1,29 @@
#pragma once
#include <fstream>
#include <functional>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <vector>
#include "xsync_v2.hpp"
namespace xsync {
using namespace std;
class XSyncUdpFactoryImpl : public I_XSUDPFactory {
private:
/* data */
XSyncUdpFactoryImpl(){};
public:
static XSyncUdpFactoryImpl* Ins();
void initialize();
virtual shared_ptr<I_XSUDP> createXSUDP() override;
};
} // namespace xsync

2
libxsync

@ -1 +1 @@
Subproject commit 7b7c6a3b5442f59c6dd14715eedd7275b00b0765
Subproject commit 5592dbb769b0b9262e534615d169ec52ab79bc2c

128
src/xsync_udp_factory_impl.cpp

@ -1,37 +1,30 @@
#include "xsync_udp_factory_impl.hpp" #include "xsync_udp_factory_impl.hpp"
#include "zqthread.hpp"
//
#include <winsock2.h>
//
#include <Windows.h> #include <Windows.h>
#include <winsock2.h>
#include <thread>
#define ENABLE_LOG
#ifdef ENABLE_LOG
#include "../src/logger.hpp"
#endif
#define TAG "XSYNC_UDP"
#pragma comment(lib, "ws2_32") #pragma comment(lib, "ws2_32")
using namespace xsync; using namespace xsync;
using namespace iflytop;
/*******************************************************************************
* XSUDP *
*******************************************************************************/
#define USB_NO_BLOCK_UDP 0
const char* fmtip(uint32_t ip) {
static char ipstr[16];
sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff);
return ipstr;
}
class XSUDP : public I_XSUDP { class XSUDP : public I_XSUDP {
uint32_t m_ip; uint32_t m_ip;
int m_localport; int m_localport;
struct sockaddr_in localadd = {};
int m_sock_fd = 0;
unique_ptr<ZQThread> m_zq_thread;
onMessage_t m_onMessage;
struct sockaddr_in localadd = {};
int m_sock_fd = 0;
unique_ptr<thread> m_thread;
onMessage_t m_onMessage;
char* m_rxbuf = nullptr;
size_t m_rxbufsize = 0;
char* m_rxbuf = nullptr;
size_t m_rxbufsize = 0;
bool m_tryStopFlag = false;
public: public:
virtual xs_error_code_t initialize(string ip, int localport) override; virtual xs_error_code_t initialize(string ip, int localport) override;
@ -43,12 +36,6 @@ class XSUDP : public I_XSUDP {
virtual ~XSUDP(); virtual ~XSUDP();
}; };
const char* fmtip(uint32_t ip) {
static char ipstr[16];
sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff);
return ipstr;
}
xs_error_code_t XSUDP::initialize(string ip, int localport) { xs_error_code_t XSUDP::initialize(string ip, int localport) {
localadd.sin_family = AF_INET; localadd.sin_family = AF_INET;
localadd.sin_addr.s_addr = inet_addr(ip.c_str()); localadd.sin_addr.s_addr = inet_addr(ip.c_str());
@ -58,15 +45,8 @@ xs_error_code_t XSUDP::initialize(string ip, int localport) {
m_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); m_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (m_sock_fd < 0) return kxs_ec_socket_fail; if (m_sock_fd < 0) return kxs_ec_socket_fail;
int ret = bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd));
int ret = ::bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd));
if (ret < 0) return kxs_ec_bind_fail; if (ret < 0) return kxs_ec_bind_fail;
#if USB_NO_BLOCK_UDP
u_long mode = 1;
if (ioctlsocket(m_sock_fd, FIONBIO, &mode) != NO_ERROR) {
return kxs_ec_setsockopt_rx_timeout_fail;
}
#endif
return kxs_ec_success; return kxs_ec_success;
} }
@ -83,31 +63,6 @@ xs_error_code_t XSUDP::sendto(const XsyncNetAdd& to, const char* data, int32_t l
return kxs_ec_send_fail; return kxs_ec_send_fail;
} }
xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) { xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) {
#if USB_NO_BLOCK_UDP
struct sockaddr_in sockaddr = {0};
int32_t overtime_10ms = overtimems / 10;
int ret = 0;
overtime_10ms += 1;
for (size_t i = 0;; i++) {
int senderAddressLen = sizeof(sockaddr);
ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen);
length = ret;
if (ret > 0) break;
if (i >= overtime_10ms) break;
Sleep(10);
}
if (ret <= 0) {
return kxs_ec_overtime;
}
uint32_t ip = ntohl(sockaddr.sin_addr.s_addr);
from.ip = string(fmtip(ip));
from.port = ntohs(sockaddr.sin_port);
return kxs_ec_success;
#else
struct sockaddr_in sockaddr = {0}; struct sockaddr_in sockaddr = {0};
timeval timeout; timeval timeout;
@ -127,34 +82,28 @@ xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, i
data[length] = 0; data[length] = 0;
if (ret < 0) { if (ret < 0) {
// ZLOGI(TAG, "recvfrom error %d %s", ret, strerror(errno));
// if (errno == EWOULDBLOCK || errno == EAGAIN) {
return kxs_ec_overtime; return kxs_ec_overtime;
// } else {
// return kxs_ec_receive_fail;
// }
} }
// inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN);
uint32_t ip = ntohl(sockaddr.sin_addr.s_addr); uint32_t ip = ntohl(sockaddr.sin_addr.s_addr);
from.ip = string(fmtip(ip)); from.ip = string(fmtip(ip));
from.port = ntohs(sockaddr.sin_port); from.port = ntohs(sockaddr.sin_port);
return kxs_ec_success; return kxs_ec_success;
#endif
} }
xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) { xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) {
m_onMessage = onMessage; m_onMessage = onMessage;
if (m_zq_thread) {
if (m_thread) {
return kxs_ec_success; return kxs_ec_success;
} }
m_rxbuf = (char*)malloc(10240);
m_rxbufsize = 10240;
m_rxbuf = (char*)malloc(10240);
m_rxbufsize = 10240;
m_tryStopFlag = false;
m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() {
while (!m_zq_thread->isTryExit()) {
m_thread.reset(new thread([this]() {
while (!m_tryStopFlag) {
memset(m_rxbuf, 0, m_rxbufsize); memset(m_rxbuf, 0, m_rxbufsize);
int32_t length = m_rxbufsize; int32_t length = m_rxbufsize;
XsyncNetAdd from; XsyncNetAdd from;
@ -164,15 +113,14 @@ xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) {
} }
} }
})); }));
m_zq_thread->start();
return kxs_ec_success; return kxs_ec_success;
} }
xs_error_code_t XSUDP::stopReceive() { xs_error_code_t XSUDP::stopReceive() {
if (m_zq_thread) {
m_zq_thread->quit();
m_zq_thread->wait();
m_zq_thread.reset(nullptr);
if (m_thread) {
m_tryStopFlag = true;
m_thread->join();
m_thread.reset(nullptr);
} }
if (!m_rxbuf) { if (!m_rxbuf) {
@ -180,41 +128,31 @@ xs_error_code_t XSUDP::stopReceive() {
m_rxbuf = nullptr; m_rxbuf = nullptr;
m_rxbufsize = 0; m_rxbufsize = 0;
} }
return kxs_ec_success;
} }
xs_error_code_t XSUDP::clearRxBuffer() { xs_error_code_t XSUDP::clearRxBuffer() {
#if USB_NO_BLOCK_UDP
char buf[1024];
int32_t length = 1024;
XsyncNetAdd from;
xs_error_code_t ret = kxs_ec_success;
while (ret == kxs_ec_success) {
ret = receive(buf, length, from, 1);
}
#endif
// xs_error_code_t ret = receive(m_rxbuf, length, from, 1000);
// char rxbuf[1024] = {0};
// int32_t len = 0;
// xs_error_code_t ret;
// ret = receive(rxbuf, len, 2);
return kxs_ec_success; return kxs_ec_success;
} }
XSUDP::~XSUDP() { XSUDP::~XSUDP() {
stopReceive(); stopReceive();
if (m_sock_fd > 0) { if (m_sock_fd > 0) {
closesocket(m_sock_fd); closesocket(m_sock_fd);
m_sock_fd = -1; m_sock_fd = -1;
} }
} }
/*******************************************************************************
* xSyncUdpFactoryImpl *
*******************************************************************************/
void XSyncUdpFactoryImpl::initialize() {} void XSyncUdpFactoryImpl::initialize() {}
XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() { XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() {
static XSyncUdpFactoryImpl instance; static XSyncUdpFactoryImpl instance;
return &instance; return &instance;
} }
shared_ptr<I_XSUDP> XSyncUdpFactoryImpl::createXSUDP() { return make_shared<XSUDP>(); } shared_ptr<I_XSUDP> XSyncUdpFactoryImpl::createXSUDP() { return make_shared<XSUDP>(); }
Loading…
Cancel
Save