Browse Source

TCP对接

master
白凤吉 6 months ago
parent
commit
1bd400b2c2
  1. 54
      src/main/java/com/qyft/gd/device/client/TcpClient.java
  2. 24
      src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcRequest.java
  3. 16
      src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcResponse.java
  4. 5
      src/main/java/com/qyft/gd/device/config/TcpConfig.java
  5. 29
      src/main/java/com/qyft/gd/device/controller/TestController.java
  6. 55
      src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java
  7. 26
      src/main/java/com/qyft/gd/device/model/bo/DeviceFeedback.java
  8. 174
      src/main/java/com/qyft/gd/device/service/DeviceService.java
  9. 3
      src/main/resources/application.yml

54
src/main/java/com/qyft/gd/device/client/TcpClient.java

@ -1,19 +1,30 @@
package com.qyft.gd.device.client;
import cn.hutool.json.JSONUtil;
import com.qyft.gd.device.common.jsonrpc.JsonRpcRequest;
import com.qyft.gd.device.config.TcpConfig;
import com.qyft.gd.device.handler.DeviceMessageHandler;
import com.qyft.gd.device.model.bo.DeviceFeedback;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@ -23,14 +34,16 @@ public class TcpClient {
private static final Logger log = LoggerFactory.getLogger(TcpClient.class);
private final TcpConfig tcpConfig;
private final DeviceMessageHandler deviceMessageHandler;
private DeviceMessageHandler deviceMessageHandler;
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
private Bootstrap bootstrap;
@PostConstruct
public void init() {
private final Map<String, CompletableFuture<DeviceFeedback>> responseMap = new ConcurrentHashMap<>();
public void setDeviceMessageHandler(DeviceMessageHandler handler) {
this.deviceMessageHandler = handler;
if (tcpConfig.isEnable()) {
connect();
}
@ -77,11 +90,40 @@ public class TcpClient {
}
}
public void sendCommand(String command) {
public void send(String request) {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(command);
ByteBuf byteBuf = Unpooled.copiedBuffer(request, CharsetUtil.UTF_8);
channel.writeAndFlush(byteBuf);
} else {
log.error("TCP服务未连接,无法发送请求: {}", request);
}
}
public DeviceFeedback sendCommand(JsonRpcRequest request) {
if (request.getId() == null) {
request.setId(UUID.randomUUID().toString());
}
CompletableFuture<DeviceFeedback> future = new CompletableFuture<>();
responseMap.put(request.getId(), future);
try {
this.send(JSONUtil.toJsonStr(request));
return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应
} catch (Exception e) {
log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e);
} finally {
responseMap.remove(request.getId()); //确保完成后移除
}
return null;
}
public void handleTcpResponse(DeviceFeedback deviceFeedback) {
String requestId = deviceFeedback.getId();
CompletableFuture<DeviceFeedback> future = responseMap.remove(requestId);
if (future != null) {
future.complete(deviceFeedback);
} else {
log.error("TCP服务未连接,无法发送指令: {}", command);
log.error("未找到 requestId: {} 对应的等待请求", requestId);
}
}
}

24
src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcRequest.java

@ -0,0 +1,24 @@
package com.qyft.gd.device.common.jsonrpc;
import lombok.Data;
import java.util.List;
/**
* TCP JSON RPC请求
*/
@Data
public class JsonRpcRequest {
/**
* 请求id
*/
private String id;
/**
* 请求方法
*/
private String method;
/**
* 请求参数
*/
private List<String> params;
}

16
src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcResponse.java

@ -0,0 +1,16 @@
package com.qyft.gd.device.common.jsonrpc;
import cn.hutool.json.JSONObject;
import lombok.Data;
@Data
public class JsonRpcResponse {
/**
* 数据类型
*/
private String type;
/**
* 数据
*/
private JSONObject data;
}

5
src/main/java/com/qyft/gd/device/config/TcpConfig.java

@ -28,5 +28,10 @@ public class TcpConfig {
* 连接超时时间毫秒
*/
private int timeout;
/**
* 指令反馈超时时间
*/
private int feedbackTimeout;
}

29
src/main/java/com/qyft/gd/device/controller/TestController.java

@ -0,0 +1,29 @@
package com.qyft.gd.device.controller;
import com.qyft.gd.device.service.DeviceService;
import com.qyft.gd.system.common.result.Result;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Tag(name = "测试")
@RestController
@RequestMapping("/api/test")
@RequiredArgsConstructor
@Slf4j
public class TestController {
private final DeviceService deviceService;
@Operation(summary = "开门")
@GetMapping("/openDoor")
public Result<String> openDoor() {
deviceService.openDoor();
return Result.success();
}
}

55
src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java

@ -1,44 +1,61 @@
package com.qyft.gd.device.handler;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qyft.gd.device.client.TcpClient;
import com.qyft.gd.device.common.constant.TcpMessageType;
import com.qyft.gd.device.common.jsonrpc.JsonRpcResponse;
import com.qyft.gd.device.model.bo.DeviceFeedback;
import com.qyft.gd.device.model.bo.DeviceStatus;
import com.qyft.gd.device.service.DeviceStateService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class DeviceMessageHandler extends SimpleChannelInboundHandler<String> {
private static final Logger log = LoggerFactory.getLogger(DeviceMessageHandler.class);
public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
private final DeviceStateService deviceStateService;
private final TcpClient tcpClient;
@PostConstruct
public void init() {
tcpClient.setDeviceMessageHandler(this);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
try {
// 解析 JSON
JSONObject json = JSONUtil.parseObj(msg);
String type = json.getStr("type");
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("client ctx =" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server!", CharsetUtil.UTF_8));
}
if(TcpMessageType.STATUS.equals(type)){//设备状态
DeviceStatus deviceStatus = json.toBean(DeviceStatus.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
String serverMsg = buf.toString(CharsetUtil.UTF_8);
log.info("TCP服务消息:{}", serverMsg);
try {
JsonRpcResponse jsonRpcResponse = JSONUtil.toBean(serverMsg, JsonRpcResponse.class);
if (TcpMessageType.STATUS.equals(jsonRpcResponse.getType())) {//设备状态
DeviceStatus deviceStatus = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceStatus.class);
deviceStateService.updateDeviceStatus(deviceStatus); // 更新设备状态
log.info("设备状态已更新: {}", json.toStringPretty());
}else if(TcpMessageType.ALARM.equals(type)){//设备报警
}else if(TcpMessageType.FEEDBACK.equals(type)){//设备指令反馈
} else if (TcpMessageType.ALARM.equals(jsonRpcResponse.getType())) {//设备报警
} else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈
DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class);
tcpClient.handleTcpResponse(deviceFeedback);
}
} catch (Exception e) {
log.error("TCP数据处理错误: {}, error: {}", msg, e.getMessage(), e);
log.error("TCP数据处理错误: {}, error: {}", serverMsg, e.getMessage(), e);
}
}
}

26
src/main/java/com/qyft/gd/device/model/bo/DeviceFeedback.java

@ -0,0 +1,26 @@
package com.qyft.gd.device.model.bo;
import lombok.Data;
/**
* 设备当前状态
*/
@Data
public class DeviceFeedback {
/**
* 请求id
*/
private String id;
/**
* 请求数据
*/
private Object result;
private DeviceFeedbackError error;
@Data
static class DeviceFeedbackError{
private String code;
private String message;
}
}

174
src/main/java/com/qyft/gd/device/service/DeviceService.java

@ -1,10 +1,184 @@
package com.qyft.gd.device.service;
import com.qyft.gd.device.client.TcpClient;
import com.qyft.gd.device.common.jsonrpc.JsonRpcRequest;
import com.qyft.gd.device.model.bo.DeviceFeedback;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
@RequiredArgsConstructor
public class DeviceService {
private final TcpClient tcpClient;
/**
* 移动导轨机械臂的导轨
*
* @param distance 移动距离
*/
public boolean moveRailArmRail(double distance) {
return true;
}
// 移动导轨机械臂的关节
/**
* 移动导轨机械臂的关节
*
* @param joint1 关节1角度
* @param joint2 关节2角度
* @param distance 移动距离
*/
public boolean moveRailArmJoint(double joint1, double joint2, double distance) {
return true;
}
/**
* 导轨机械臂运动到指定点位
*
* @param x 坐标x
* @param y 坐标y
* @param z 坐标z
*/
public boolean moveRailArmToPoint(int x, int y, int z) {
return true;
}
/**
* 设置导轨机械臂的速度
*
* @param speed 速度值
*/
public boolean setRailArmSpeed(int speed) {
return true;
}
/**
* 开门
*/
public boolean openDoor() {
JsonRpcRequest request = new JsonRpcRequest();
request.setMethod("openDoor");
List<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
request.setParams(list);
DeviceFeedback deviceFeedback = tcpClient.sendCommand(request);
return true;
}
/**
* 关门
*/
public boolean closeDoor() {
return true;
}
/**
* 移动加液机械臂的关节
*
* @param joint1 关节1角度
* @param joint2 关节2角度
*/
public boolean moveLiquidArmJoint(double joint1, double joint2) {
return true;
}
/**
* 加液机械臂运动到指定点位
*
* @param x 坐标x
* @param y 坐标y
* @param z 坐标z
*/
public boolean moveLiquidArmToPoint(int x, int y, int z) {
return true;
}
/**
* 设置加液机械臂的速度
*
* @param speed 速度值
*/
public boolean setLiquidArmSpeed(int speed) {
return true;
}
/**
* 加液
*
* @param pumpId 泵id
* @param volume 液体体积
*/
public boolean addLiquid(int pumpId, int volume) {
return true;
}
/**
* 设置加液泵流量
*
* @param pumpId 泵id
* @param flowRate 流量值
*/
public boolean setFlowRate(int pumpId, int flowRate) {
return true;
}
/**
* 开始摇匀
*/
public boolean startShaking() {
return true;
}
/**
* 停止摇匀
*/
public boolean stopShaking() {
return true;
}
/**
* 设置摇匀速度
*
* @param speed 速度值
*/
public boolean setShakingSpeed(int speed) {
return true;
}
/**
* 抬起托盘到指定高度
*
* @param distance 距离
*/
public boolean moveTrayToHeight(double distance) {
return true;
}
/**
* 设置托盘抬起速度
*
* @param speed 速度值
*/
public boolean setTraySpeed(int speed) {
return true;
}
/**
* 拍照
*/
public boolean takePhoto() {
return true;
}
}

3
src/main/resources/application.yml

@ -35,8 +35,9 @@ jwt:
#与设备TCP链接
tcp:
enable: false # 是否开启 TCP 连接
host: 192.168.1.100
host: 127.0.0.1
port: 9000
reconnect: 5000 # 断线重连间隔(单位:毫秒)
timeout: 10000 # 连接超时时间(单位:毫秒)
feedback-timeout: 500000
Loading…
Cancel
Save