You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

151 lines
4.6 KiB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
  1. #include "xsync_udp_factory_impl.hpp"
  2. #include "zqthread.hpp"
  3. //
  4. #include <winsock2.h>
  5. //
  6. #include <Windows.h>
  7. #pragma comment(lib, "ws2_32")
  8. using namespace iflytop;
  9. /*******************************************************************************
  10. * XSUDP *
  11. *******************************************************************************/
  12. class XSUDP : public I_XSUDP {
  13. uint32_t m_ip;
  14. int m_localport;
  15. struct sockaddr_in localadd = {};
  16. int m_sock_fd = 0;
  17. unique_ptr<ZQThread> m_zq_thread;
  18. onMessage_t m_onMessage;
  19. char* m_rxbuf = nullptr;
  20. size_t m_rxbufsize = 0;
  21. public:
  22. virtual xs_error_code_t initialize(string ip, int localport) override;
  23. virtual xs_error_code_t sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) override;
  24. virtual xs_error_code_t receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) override;
  25. virtual xs_error_code_t startReceive(onMessage_t onMessage) override;
  26. virtual xs_error_code_t stopReceive() override;
  27. virtual ~XSUDP();
  28. };
  29. const char* fmtip(uint32_t ip) {
  30. static char ipstr[16];
  31. sprintf(ipstr, "%d.%d.%d.%d", (ip >> 24) & 0xff, (ip >> 16) & 0xff, (ip >> 8) & 0xff, ip & 0xff);
  32. return ipstr;
  33. }
  34. xs_error_code_t XSUDP::initialize(string ip, int localport) {
  35. localadd.sin_family = AF_INET;
  36. localadd.sin_addr.s_addr = inet_addr(ip.c_str());
  37. localadd.sin_port = htons(localport);
  38. // 创建客户端用于通信的Socket
  39. m_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
  40. if (m_sock_fd < 0) return kxs_ec_socket_fail;
  41. int ret = bind(m_sock_fd, (struct sockaddr*)&localadd, sizeof(localadd));
  42. if (ret < 0) return kxs_ec_bind_fail;
  43. return kxs_ec_success;
  44. }
  45. xs_error_code_t XSUDP::sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) {
  46. struct sockaddr_in sockaddr;
  47. sockaddr.sin_family = AF_INET;
  48. sockaddr.sin_addr.s_addr = inet_addr(to.ip.c_str());
  49. sockaddr.sin_port = htons(to.port);
  50. int ret = ::sendto(m_sock_fd, data, length, 0, (struct sockaddr*)&sockaddr, sizeof(sockaddr));
  51. if (sendlength) *sendlength = ret;
  52. if (ret >= 0) {
  53. return kxs_ec_success;
  54. }
  55. return kxs_ec_send_fail;
  56. }
  57. xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) {
  58. struct sockaddr_in sockaddr = {0};
  59. timeval timeout;
  60. timeout.tv_sec = overtimems;
  61. timeout.tv_usec = 0;
  62. if (setsockopt(m_sock_fd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout, sizeof(timeout)) == -1) {
  63. return kxs_ec_setsockopt_rx_timeout_fail;
  64. }
  65. int senderAddressLen = sizeof(sockaddr);
  66. int ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen);
  67. length = ret;
  68. if (ret < 0) {
  69. // if (errno == EWOULDBLOCK || errno == EAGAIN) {
  70. return kxs_ec_overtime;
  71. // } else {
  72. // return kxs_ec_receive_fail;
  73. // }
  74. }
  75. // inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN);
  76. uint32_t ip = ntohl(sockaddr.sin_addr.s_addr);
  77. from.ip = string(fmtip(ip));
  78. from.port = ntohs(sockaddr.sin_port);
  79. return kxs_ec_success;
  80. }
  81. xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) {
  82. m_onMessage = onMessage;
  83. if (m_zq_thread) {
  84. return kxs_ec_success;
  85. }
  86. m_rxbuf = (char*)malloc(10240);
  87. m_rxbufsize = 10240;
  88. m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() {
  89. while (!m_zq_thread->isTryExit()) {
  90. memset(m_rxbuf, 0, m_rxbufsize);
  91. int32_t length = m_rxbufsize;
  92. XsyncNetAdd from;
  93. xs_error_code_t ret = receive(m_rxbuf, length, from, 1000);
  94. if (ret == kxs_ec_success) {
  95. if (m_onMessage) m_onMessage(from, (uint8_t*)m_rxbuf, length);
  96. }
  97. }
  98. }));
  99. m_zq_thread->start();
  100. }
  101. xs_error_code_t XSUDP::stopReceive() {
  102. if (m_zq_thread) {
  103. m_zq_thread->quit();
  104. m_zq_thread->wait();
  105. m_zq_thread.reset(nullptr);
  106. }
  107. if (!m_rxbuf) {
  108. free(m_rxbuf);
  109. m_rxbuf = nullptr;
  110. m_rxbufsize = 0;
  111. }
  112. }
  113. XSUDP::~XSUDP() {
  114. stopReceive();
  115. if (m_sock_fd > 0) {
  116. closesocket(m_sock_fd);
  117. m_sock_fd = -1;
  118. }
  119. }
  120. /*******************************************************************************
  121. * xSyncUdpFactoryImpl *
  122. *******************************************************************************/
  123. void XSyncUdpFactoryImpl::initialize() {}
  124. XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() {
  125. static XSyncUdpFactoryImpl instance;
  126. return &instance;
  127. }
  128. shared_ptr<I_XSUDP> XSyncUdpFactoryImpl::createXSUDP() { return make_shared<XSUDP>(); }