Browse Source

修改xsync指令接收回执方式为异步接收

master
zhaohe 1 year ago
parent
commit
5592dbb769
  1. 91
      src/xsync_v2.cpp

91
src/xsync_v2.cpp

@ -49,6 +49,48 @@ using namespace std;
I_XSUDPFactory *g_xsync_udp_factory = nullptr;
class RxContext {
public:
std::recursive_mutex lock_;
uint8_t rxpacket[1024] = {0};
int32_t rxpacket_length = 0;
bool m_isready = false;
bool m_isWait = false;
int32_t m_expectIndex = 0;
void setRxPacket(uint8_t *data, int32_t length) {
lock_guard<recursive_mutex> lock(lock_);
memcpy(rxpacket, data, length);
rxpacket_length = length;
m_isready = true;
}
void clear() {
lock_guard<recursive_mutex> lock(lock_);
memset(rxpacket, 0, sizeof(rxpacket));
rxpacket_length = 0;
m_isready = false;
m_isWait = false;
m_expectIndex = 0;
}
void setWait(bool wait, int32_t waitIndex) {
lock_guard<recursive_mutex> lock(lock_);
m_isWait = wait;
m_expectIndex = waitIndex;
}
bool isReady() { return m_isready; }
bool isWait() { return m_isWait; }
int32_t getExpectIndex() { return m_expectIndex; }
uint8_t *data() { return rxpacket; }
size_t size() { return rxpacket_length; }
};
class Xsync : public IXsync {
private:
/* data */
@ -75,6 +117,8 @@ class Xsync : public IXsync {
bool destoryflag = false;
int64_t m_last_receive_packet_tp = 0;
RxContext rxContext;
private:
void _setNetworkState(bool connected, string ip) {
lock_guard<recursive_mutex> lock(connectStatelock_);
@ -116,6 +160,19 @@ class Xsync : public IXsync {
// 寄存器读写UDP
xsync_reg_udp = g_xsync_udp_factory->createXSUDP();
ecode = xsync_reg_udp->initialize("0.0.0.0", IFLYTOP_XSYNC_SERVICE_PC_PORT);
xsync_reg_udp->startReceive([this](XsyncNetAdd &from, uint8_t *data, size_t length) {
iflytop_xsync_packet_header_t *rx_data = (iflytop_xsync_packet_header_t *)data;
if (!rxContext.isWait()) {
return;
}
if (rx_data->index != rxContext.getExpectIndex()) {
return;
}
rxContext.setRxPacket(data, length);
});
if (ecode != kxs_ec_success) goto err;
// TIMECODE上报消息接收UDP
@ -741,7 +798,7 @@ xs_error_code_t Xsync::readMac(string &mac) {
return kxs_ec_success;
}
#if 0
xs_error_code_t Xsync::xsync_send_cmd_block(iflytop_xsync_packet_header_t *cmd, iflytop_xsync_packet_header_t *rx_data, int32_t buffersize, int32_t overtime_ms) {
lock_guard<recursive_mutex> lock(lock_);
@ -774,6 +831,38 @@ xs_error_code_t Xsync::xsync_send_cmd_block(iflytop_xsync_packet_header_t *cmd,
return (xs_error_code_t)rx_data->data[0];
}
#endif
xs_error_code_t Xsync::xsync_send_cmd_block(iflytop_xsync_packet_header_t *cmd, iflytop_xsync_packet_header_t *rx_data, int32_t buffersize, int32_t overtime_ms) {
lock_guard<recursive_mutex> lock(lock_);
string xsyncip = _getXsyncIp();
m_xsync_reg_udp->clearRxBuffer();
if (!m_xsync_reg_udp) return kxs_ec_device_offline;
if (xsyncip.empty()) return kxs_ec_device_offline;
cmd->index = txpacket_index++;
rxContext.clear();
rxContext.setWait(true, cmd->index);
XsyncNetAdd toadd = {xsyncip, IFLYTOP_XSYNC_SERVICE_XSYNC_PORT};
xs_error_code_t ecode = //
m_xsync_reg_udp->sendto(toadd, (const char *)cmd, sizeof(iflytop_xsync_packet_header_t) + cmd->ndata * 4, nullptr);
if (ecode != kxs_ec_success) {
return ecode;
}
for (int32_t i = 0; i < overtime_ms; i++) {
if (rxContext.isReady()) {
memcpy(rx_data, rxContext.data(), rxContext.size());
return (xs_error_code_t)rx_data->data[0];
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return kxs_ec_overtime;
}
xs_error_code_t Xsync::reg_write(uint32_t regadd, uint32_t regvalue, int32_t overtime_ms) { //
uint32_t readbak = 0;

Loading…
Cancel
Save