diff --git a/src/xsync_v2.cpp b/src/xsync_v2.cpp index 6d56474..aa49479 100644 --- a/src/xsync_v2.cpp +++ b/src/xsync_v2.cpp @@ -49,6 +49,48 @@ using namespace std; I_XSUDPFactory *g_xsync_udp_factory = nullptr; +class RxContext { + public: + std::recursive_mutex lock_; + + uint8_t rxpacket[1024] = {0}; + int32_t rxpacket_length = 0; + bool m_isready = false; + bool m_isWait = false; + + int32_t m_expectIndex = 0; + + void setRxPacket(uint8_t *data, int32_t length) { + lock_guard lock(lock_); + memcpy(rxpacket, data, length); + rxpacket_length = length; + m_isready = true; + } + + void clear() { + lock_guard lock(lock_); + memset(rxpacket, 0, sizeof(rxpacket)); + rxpacket_length = 0; + m_isready = false; + m_isWait = false; + m_expectIndex = 0; + } + + void setWait(bool wait, int32_t waitIndex) { + lock_guard lock(lock_); + m_isWait = wait; + m_expectIndex = waitIndex; + } + + bool isReady() { return m_isready; } + bool isWait() { return m_isWait; } + + int32_t getExpectIndex() { return m_expectIndex; } + + uint8_t *data() { return rxpacket; } + size_t size() { return rxpacket_length; } +}; + class Xsync : public IXsync { private: /* data */ @@ -75,6 +117,8 @@ class Xsync : public IXsync { bool destoryflag = false; int64_t m_last_receive_packet_tp = 0; + RxContext rxContext; + private: void _setNetworkState(bool connected, string ip) { lock_guard lock(connectStatelock_); @@ -116,6 +160,19 @@ class Xsync : public IXsync { // 寄存器读写UDP xsync_reg_udp = g_xsync_udp_factory->createXSUDP(); ecode = xsync_reg_udp->initialize("0.0.0.0", IFLYTOP_XSYNC_SERVICE_PC_PORT); + xsync_reg_udp->startReceive([this](XsyncNetAdd &from, uint8_t *data, size_t length) { + iflytop_xsync_packet_header_t *rx_data = (iflytop_xsync_packet_header_t *)data; + if (!rxContext.isWait()) { + return; + } + + if (rx_data->index != rxContext.getExpectIndex()) { + return; + } + + rxContext.setRxPacket(data, length); + }); + if (ecode != kxs_ec_success) goto err; // TIMECODE上报消息接收UDP @@ -741,7 +798,7 @@ xs_error_code_t Xsync::readMac(string &mac) { return kxs_ec_success; } - +#if 0 xs_error_code_t Xsync::xsync_send_cmd_block(iflytop_xsync_packet_header_t *cmd, iflytop_xsync_packet_header_t *rx_data, int32_t buffersize, int32_t overtime_ms) { lock_guard lock(lock_); @@ -774,6 +831,38 @@ xs_error_code_t Xsync::xsync_send_cmd_block(iflytop_xsync_packet_header_t *cmd, return (xs_error_code_t)rx_data->data[0]; } +#endif + +xs_error_code_t Xsync::xsync_send_cmd_block(iflytop_xsync_packet_header_t *cmd, iflytop_xsync_packet_header_t *rx_data, int32_t buffersize, int32_t overtime_ms) { + lock_guard lock(lock_); + + string xsyncip = _getXsyncIp(); + m_xsync_reg_udp->clearRxBuffer(); + + if (!m_xsync_reg_udp) return kxs_ec_device_offline; + if (xsyncip.empty()) return kxs_ec_device_offline; + + cmd->index = txpacket_index++; + + rxContext.clear(); + rxContext.setWait(true, cmd->index); + + XsyncNetAdd toadd = {xsyncip, IFLYTOP_XSYNC_SERVICE_XSYNC_PORT}; + xs_error_code_t ecode = // + m_xsync_reg_udp->sendto(toadd, (const char *)cmd, sizeof(iflytop_xsync_packet_header_t) + cmd->ndata * 4, nullptr); + if (ecode != kxs_ec_success) { + return ecode; + } + + for (int32_t i = 0; i < overtime_ms; i++) { + if (rxContext.isReady()) { + memcpy(rx_data, rxContext.data(), rxContext.size()); + return (xs_error_code_t)rx_data->data[0]; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + return kxs_ec_overtime; +} xs_error_code_t Xsync::reg_write(uint32_t regadd, uint32_t regvalue, int32_t overtime_ms) { // uint32_t readbak = 0;