Browse Source

update

master
zhaohe 2 years ago
parent
commit
f424993e14
  1. 20
      mainwindow.cpp
  2. 52
      src/xsync_udp_factory_impl.cpp
  3. 1
      src/xsync_udp_factory_impl.hpp
  4. 5
      src/zqthread.cpp
  5. 4
      src/zqthread.hpp

20
mainwindow.cpp

@ -120,22 +120,12 @@ MainWindow::~MainWindow() { delete ui; }
void MainWindow::mainWindowsRun() { // void MainWindow::mainWindowsRun() { //
XSyncUdpFactoryImpl::Ins()->initialize(); XSyncUdpFactoryImpl::Ins()->initialize();
auto xsudp = XSyncUdpFactoryImpl::Ins()->createXSUDP();
auto xsudp = XSyncUdpFactoryImpl::Ins()->createXSUDP();
XS_ASSERT(xsudp->initialize("0.0.0.0", 9999)); XS_ASSERT(xsudp->initialize("0.0.0.0", 9999));
xsudp->startReceive([this, xsudp](XsyncNetAdd &from, uint8_t *data, size_t length) {
ZLOGI(TAG, "receive from <%s:%d> (%d) :%s", from.ip.c_str(), from.port, data, length);
xsudp->sendto(from, "hello\n", 5, NULL);
});
while (true) {
XS_ASSERT(xsudp->sendto(XsyncNetAdd("127.0.0.1", 9973), "hello\n", 5, NULL));
ZQThread::sleep(1);
XsyncNetAdd from;
char rxcache[1024] = {0};
int rxlength = 1024;
auto ecode = xsudp->receive(rxcache, rxlength, from, 5000);
if (ecode == kxs_ec_success) {
ZLOGI(TAG, "receive from %s:%d %s(%d)", from.ip.c_str(), from.port, rxcache, rxlength);
} else {
ZLOGE(TAG, "receive fail,ecode:%d,%d", ecode, rxlength);
}
}
} }

52
src/xsync_udp_factory_impl.cpp

@ -1,21 +1,34 @@
#include "xsync_udp_factory_impl.hpp" #include "xsync_udp_factory_impl.hpp"
#include "zqthread.hpp"
//
#include <winsock2.h> #include <winsock2.h>
// //
#include <Windows.h> #include <Windows.h>
#pragma comment(lib, "ws2_32") #pragma comment(lib, "ws2_32")
using namespace iflytop; using namespace iflytop;
/*******************************************************************************
* XSUDP *
*******************************************************************************/
class XSUDP : public I_XSUDP { class XSUDP : public I_XSUDP {
uint32_t m_ip; uint32_t m_ip;
int m_localport; int m_localport;
struct sockaddr_in localadd = {};
int m_sock_fd = 0;
struct sockaddr_in localadd = {};
int m_sock_fd = 0;
unique_ptr<ZQThread> m_zq_thread;
onMessage_t m_onMessage;
char* m_rxbuf = nullptr;
size_t m_rxbufsize = 0;
public: public:
virtual xs_error_code_t initialize(string ip, int localport) override; virtual xs_error_code_t initialize(string ip, int localport) override;
virtual xs_error_code_t sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) override; virtual xs_error_code_t sendto(const XsyncNetAdd& to, const char* data, int32_t length, int32_t* sendlength) override;
virtual xs_error_code_t receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) override; virtual xs_error_code_t receive(char* data, int32_t& length, XsyncNetAdd& from, int overtimems) override;
virtual xs_error_code_t startReceive(onMessage_t onMessage) override;
;
}; };
const char* fmtip(uint32_t ip) { const char* fmtip(uint32_t ip) {
@ -60,18 +73,15 @@ xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, i
return kxs_ec_setsockopt_rx_timeout_fail; return kxs_ec_setsockopt_rx_timeout_fail;
} }
// sockaddr.sin_family = AF_INET;
// sockaddr.sin_addr.s_addr = inet_addr(from.ip.c_str());
// sockaddr.sin_port = htons(from.port);
int senderAddressLen = sizeof(sockaddr); int senderAddressLen = sizeof(sockaddr);
int ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen); int ret = ::recvfrom(m_sock_fd, (char*)data, length, 0, (struct sockaddr*)&sockaddr, &senderAddressLen);
length = ret; length = ret;
if (ret < 0) { if (ret < 0) {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
return kxs_ec_overtime;
} else {
return kxs_ec_receive_fail;
}
// if (errno == EWOULDBLOCK || errno == EAGAIN) {
return kxs_ec_overtime;
// } else {
// return kxs_ec_receive_fail;
// }
} }
// inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN); // inet_ntop(AF_INET, &(sockaddr.sin_addr), ip, INET_ADDRSTRLEN);
@ -82,6 +92,25 @@ xs_error_code_t XSUDP::receive(char* data, int32_t& length, XsyncNetAdd& from, i
return kxs_ec_success; return kxs_ec_success;
} }
xs_error_code_t XSUDP::startReceive(onMessage_t onMessage) {
m_onMessage = onMessage;
m_rxbuf = (char*)malloc(10240);
m_rxbufsize = 10240;
m_zq_thread.reset(new ZQThread("udplistener_thread", [this]() {
while (!m_zq_thread->isTryExit()) {
memset(m_rxbuf, 0, m_rxbufsize);
int32_t length = m_rxbufsize;
XsyncNetAdd from;
xs_error_code_t ret = receive(m_rxbuf, length, from, 1000);
if (ret == kxs_ec_success) {
if (m_onMessage) m_onMessage(from, (uint8_t*)m_rxbuf, length);
}
}
}));
m_zq_thread->start();
}
/******************************************************************************* /*******************************************************************************
* xSyncUdpFactoryImpl * * xSyncUdpFactoryImpl *
*******************************************************************************/ *******************************************************************************/
@ -93,5 +122,4 @@ XSyncUdpFactoryImpl* XSyncUdpFactoryImpl::Ins() {
return &instance; return &instance;
} }
shared_ptr<I_XSUDPListener> XSyncUdpFactoryImpl::createXSUDPListener() { return nullptr; }
shared_ptr<I_XSUDP> XSyncUdpFactoryImpl::createXSUDP() { return make_shared<XSUDP>(); }
shared_ptr<I_XSUDP> XSyncUdpFactoryImpl::createXSUDP() { return make_shared<XSUDP>(); }

1
src/xsync_udp_factory_impl.hpp

@ -23,7 +23,6 @@ class XSyncUdpFactoryImpl : public I_XSyncUDPFactory {
static XSyncUdpFactoryImpl* Ins(); static XSyncUdpFactoryImpl* Ins();
void initialize(); void initialize();
virtual shared_ptr<I_XSUDPListener> createXSUDPListener() override;
virtual shared_ptr<I_XSUDP> createXSUDP() override; virtual shared_ptr<I_XSUDP> createXSUDP() override;
}; };

5
src/zqthread.cpp

@ -10,3 +10,8 @@ ZQThread::ZQThread(const char* name, function<void()> cb) {
void ZQThread::run() { void ZQThread::run() {
if (cb_) cb_(); if (cb_) cb_();
} }
void ZQThread::quit() {
tryexitflag_ = true;
QThread::quit();
}

4
src/zqthread.hpp

@ -22,9 +22,13 @@ class ZQThread : public QThread {
private: private:
QString name_; QString name_;
function<void()> cb_; function<void()> cb_;
bool tryexitflag_ = false;
public: public:
ZQThread(const char* name, function<void()> cb); ZQThread(const char* name, function<void()> cb);
void quit();
bool isTryExit() { return tryexitflag_; }
protected: protected:
virtual void run() override; virtual void run() override;

Loading…
Cancel
Save