diff --git a/src/main/java/com/qyft/gd/device/client/TcpClient.java b/src/main/java/com/qyft/gd/device/client/TcpClient.java index 4f38b2b..a406fea 100644 --- a/src/main/java/com/qyft/gd/device/client/TcpClient.java +++ b/src/main/java/com/qyft/gd/device/client/TcpClient.java @@ -1,19 +1,30 @@ package com.qyft.gd.device.client; +import cn.hutool.json.JSONUtil; +import com.qyft.gd.device.common.jsonrpc.JsonRpcRequest; import com.qyft.gd.device.config.TcpConfig; import com.qyft.gd.device.handler.DeviceMessageHandler; +import com.qyft.gd.device.model.bo.DeviceFeedback; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.CharsetUtil; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; +import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -23,14 +34,16 @@ public class TcpClient { private static final Logger log = LoggerFactory.getLogger(TcpClient.class); private final TcpConfig tcpConfig; - private final DeviceMessageHandler deviceMessageHandler; + private DeviceMessageHandler deviceMessageHandler; private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; private Bootstrap bootstrap; - @PostConstruct - public void init() { + private final Map> responseMap = new ConcurrentHashMap<>(); + + public void setDeviceMessageHandler(DeviceMessageHandler handler) { + this.deviceMessageHandler = handler; if (tcpConfig.isEnable()) { connect(); } @@ -77,11 +90,40 @@ public class TcpClient { } } - public void sendCommand(String command) { + public void send(String request) { if (channel != null && channel.isActive()) { - channel.writeAndFlush(command); + ByteBuf byteBuf = Unpooled.copiedBuffer(request, CharsetUtil.UTF_8); + channel.writeAndFlush(byteBuf); + } else { + log.error("TCP服务未连接,无法发送请求: {}", request); + } + } + + + public DeviceFeedback sendCommand(JsonRpcRequest request) { + if (request.getId() == null) { + request.setId(UUID.randomUUID().toString()); + } + CompletableFuture future = new CompletableFuture<>(); + responseMap.put(request.getId(), future); + try { + this.send(JSONUtil.toJsonStr(request)); + return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应 + } catch (Exception e) { + log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e); + } finally { + responseMap.remove(request.getId()); //确保完成后移除 + } + return null; + } + + public void handleTcpResponse(DeviceFeedback deviceFeedback) { + String requestId = deviceFeedback.getId(); + CompletableFuture future = responseMap.remove(requestId); + if (future != null) { + future.complete(deviceFeedback); } else { - log.error("TCP服务未连接,无法发送指令: {}", command); + log.error("未找到 requestId: {} 对应的等待请求", requestId); } } } diff --git a/src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcRequest.java b/src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcRequest.java new file mode 100644 index 0000000..ebda331 --- /dev/null +++ b/src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcRequest.java @@ -0,0 +1,24 @@ +package com.qyft.gd.device.common.jsonrpc; + +import lombok.Data; + +import java.util.List; + +/** + * TCP JSON RPC请求 + */ +@Data +public class JsonRpcRequest { + /** + * 请求id + */ + private String id; + /** + * 请求方法 + */ + private String method; + /** + * 请求参数 + */ + private List params; +} diff --git a/src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcResponse.java b/src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcResponse.java new file mode 100644 index 0000000..ec7122b --- /dev/null +++ b/src/main/java/com/qyft/gd/device/common/jsonrpc/JsonRpcResponse.java @@ -0,0 +1,16 @@ +package com.qyft.gd.device.common.jsonrpc; + +import cn.hutool.json.JSONObject; +import lombok.Data; + +@Data +public class JsonRpcResponse { + /** + * 数据类型 + */ + private String type; + /** + * 数据 + */ + private JSONObject data; +} diff --git a/src/main/java/com/qyft/gd/device/config/TcpConfig.java b/src/main/java/com/qyft/gd/device/config/TcpConfig.java index 5ddb328..d200354 100644 --- a/src/main/java/com/qyft/gd/device/config/TcpConfig.java +++ b/src/main/java/com/qyft/gd/device/config/TcpConfig.java @@ -28,5 +28,10 @@ public class TcpConfig { * 连接超时时间(毫秒) */ private int timeout; + /** + * 指令反馈超时时间 + */ + private int feedbackTimeout; + } diff --git a/src/main/java/com/qyft/gd/device/controller/TestController.java b/src/main/java/com/qyft/gd/device/controller/TestController.java new file mode 100644 index 0000000..f8282f2 --- /dev/null +++ b/src/main/java/com/qyft/gd/device/controller/TestController.java @@ -0,0 +1,29 @@ +package com.qyft.gd.device.controller; + +import com.qyft.gd.device.service.DeviceService; +import com.qyft.gd.system.common.result.Result; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Tag(name = "测试") +@RestController +@RequestMapping("/api/test") +@RequiredArgsConstructor +@Slf4j +public class TestController { + private final DeviceService deviceService; + + @Operation(summary = "开门") + @GetMapping("/openDoor") + public Result openDoor() { + deviceService.openDoor(); + return Result.success(); + } + + +} diff --git a/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java b/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java index aece38f..ea89ea5 100644 --- a/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java +++ b/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java @@ -1,44 +1,61 @@ package com.qyft.gd.device.handler; -import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import com.qyft.gd.device.client.TcpClient; import com.qyft.gd.device.common.constant.TcpMessageType; +import com.qyft.gd.device.common.jsonrpc.JsonRpcResponse; +import com.qyft.gd.device.model.bo.DeviceFeedback; import com.qyft.gd.device.model.bo.DeviceStatus; import com.qyft.gd.device.service.DeviceStateService; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.CharsetUtil; +import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +@Slf4j @Component @ChannelHandler.Sharable @RequiredArgsConstructor -public class DeviceMessageHandler extends SimpleChannelInboundHandler { - - private static final Logger log = LoggerFactory.getLogger(DeviceMessageHandler.class); +public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { private final DeviceStateService deviceStateService; + private final TcpClient tcpClient; + + @PostConstruct + public void init() { + tcpClient.setDeviceMessageHandler(this); + } + @Override - protected void channelRead0(ChannelHandlerContext ctx, String msg) { - try { - // 解析 JSON - JSONObject json = JSONUtil.parseObj(msg); - String type = json.getStr("type"); + public void channelActive(ChannelHandlerContext ctx) { + System.out.println("client ctx =" + ctx); + ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server!", CharsetUtil.UTF_8)); + } - if(TcpMessageType.STATUS.equals(type)){//设备状态 - DeviceStatus deviceStatus = json.toBean(DeviceStatus.class); + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf buf = (ByteBuf) msg; + String serverMsg = buf.toString(CharsetUtil.UTF_8); + log.info("TCP服务消息:{}", serverMsg); + try { + JsonRpcResponse jsonRpcResponse = JSONUtil.toBean(serverMsg, JsonRpcResponse.class); + if (TcpMessageType.STATUS.equals(jsonRpcResponse.getType())) {//设备状态 + DeviceStatus deviceStatus = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceStatus.class); deviceStateService.updateDeviceStatus(deviceStatus); // 更新设备状态 - log.info("设备状态已更新: {}", json.toStringPretty()); - }else if(TcpMessageType.ALARM.equals(type)){//设备报警 - - }else if(TcpMessageType.FEEDBACK.equals(type)){//设备指令反馈 + } else if (TcpMessageType.ALARM.equals(jsonRpcResponse.getType())) {//设备报警 + } else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈 + DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class); + tcpClient.handleTcpResponse(deviceFeedback); } } catch (Exception e) { - log.error("TCP数据处理错误: {}, error: {}", msg, e.getMessage(), e); + log.error("TCP数据处理错误: {}, error: {}", serverMsg, e.getMessage(), e); } } } \ No newline at end of file diff --git a/src/main/java/com/qyft/gd/device/model/bo/DeviceFeedback.java b/src/main/java/com/qyft/gd/device/model/bo/DeviceFeedback.java new file mode 100644 index 0000000..9ab4108 --- /dev/null +++ b/src/main/java/com/qyft/gd/device/model/bo/DeviceFeedback.java @@ -0,0 +1,26 @@ +package com.qyft.gd.device.model.bo; + +import lombok.Data; + +/** + * 设备当前状态 + */ +@Data +public class DeviceFeedback { + /** + * 请求id + */ + private String id; + /** + * 请求数据 + */ + private Object result; + + private DeviceFeedbackError error; + + @Data + static class DeviceFeedbackError{ + private String code; + private String message; + } +} diff --git a/src/main/java/com/qyft/gd/device/service/DeviceService.java b/src/main/java/com/qyft/gd/device/service/DeviceService.java index 3f5273e..ede2e8f 100644 --- a/src/main/java/com/qyft/gd/device/service/DeviceService.java +++ b/src/main/java/com/qyft/gd/device/service/DeviceService.java @@ -1,10 +1,184 @@ package com.qyft.gd.device.service; +import com.qyft.gd.device.client.TcpClient; +import com.qyft.gd.device.common.jsonrpc.JsonRpcRequest; +import com.qyft.gd.device.model.bo.DeviceFeedback; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Slf4j @Service @RequiredArgsConstructor public class DeviceService { + private final TcpClient tcpClient; + + /** + * 移动导轨机械臂的导轨 + * + * @param distance 移动距离 + */ + public boolean moveRailArmRail(double distance) { + return true; + } + + // 移动导轨机械臂的关节 + + /** + * 移动导轨机械臂的关节 + * + * @param joint1 关节1角度 + * @param joint2 关节2角度 + * @param distance 移动距离 + */ + public boolean moveRailArmJoint(double joint1, double joint2, double distance) { + return true; + } + + /** + * 导轨机械臂运动到指定点位 + * + * @param x 坐标x + * @param y 坐标y + * @param z 坐标z + */ + public boolean moveRailArmToPoint(int x, int y, int z) { + return true; + } + + /** + * 设置导轨机械臂的速度 + * + * @param speed 速度值 + */ + public boolean setRailArmSpeed(int speed) { + return true; + } + + /** + * 开门 + */ + public boolean openDoor() { + JsonRpcRequest request = new JsonRpcRequest(); + request.setMethod("openDoor"); + List list = new ArrayList<>(); + list.add("1"); + list.add("2"); + list.add("3"); + request.setParams(list); + DeviceFeedback deviceFeedback = tcpClient.sendCommand(request); + return true; + } + + /** + * 关门 + */ + public boolean closeDoor() { + return true; + } + + /** + * 移动加液机械臂的关节 + * + * @param joint1 关节1角度 + * @param joint2 关节2角度 + */ + public boolean moveLiquidArmJoint(double joint1, double joint2) { + return true; + } + + /** + * 加液机械臂运动到指定点位 + * + * @param x 坐标x + * @param y 坐标y + * @param z 坐标z + */ + public boolean moveLiquidArmToPoint(int x, int y, int z) { + return true; + } + + /** + * 设置加液机械臂的速度 + * + * @param speed 速度值 + */ + public boolean setLiquidArmSpeed(int speed) { + return true; + } + + /** + * 加液 + * + * @param pumpId 泵id + * @param volume 液体体积 + */ + public boolean addLiquid(int pumpId, int volume) { + return true; + } + + + /** + * 设置加液泵流量 + * + * @param pumpId 泵id + * @param flowRate 流量值 + */ + public boolean setFlowRate(int pumpId, int flowRate) { + return true; + } + + /** + * 开始摇匀 + */ + public boolean startShaking() { + return true; + } + + /** + * 停止摇匀 + */ + public boolean stopShaking() { + return true; + } + + /** + * 设置摇匀速度 + * + * @param speed 速度值 + */ + public boolean setShakingSpeed(int speed) { + return true; + } + + /** + * 抬起托盘到指定高度 + * + * @param distance 距离 + */ + public boolean moveTrayToHeight(double distance) { + return true; + } + + /** + * 设置托盘抬起速度 + * + * @param speed 速度值 + */ + public boolean setTraySpeed(int speed) { + return true; + } + + /** + * 拍照 + */ + public boolean takePhoto() { + return true; + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index fa70249..af9e1ec 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -35,8 +35,9 @@ jwt: #与设备TCP链接 tcp: enable: false # 是否开启 TCP 连接 - host: 192.168.1.100 + host: 127.0.0.1 port: 9000 reconnect: 5000 # 断线重连间隔(单位:毫秒) timeout: 10000 # 连接超时时间(单位:毫秒) + feedback-timeout: 500000