WebSocketServer.java 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package com.kmall.admin.websocket;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.kmall.admin.entity.OrderEntity;
  4. import com.kmall.admin.entity.PickUpCodeEntity;
  5. import com.kmall.admin.service.OrderService;
  6. import com.kmall.admin.service.PickUpCodeService;
  7. import com.kmall.api.util.SendMsgUtil;
  8. import com.kmall.common.utils.R;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. import javax.websocket.*;
  14. import javax.websocket.server.PathParam;
  15. import javax.websocket.server.ServerEndpoint;
  16. import java.io.IOException;
  17. import java.text.SimpleDateFormat;
  18. import java.util.*;
  19. import java.util.concurrent.CopyOnWriteArraySet;
  20. import java.util.concurrent.atomic.AtomicInteger;
  21. /**
  22. * WebSocket服务端
  23. *
  24. * 新建一个CopyOnWriteArraySet SessionSet 用于接收当前会话的Session,方便进行推送消息。单机版实现到这里就可以
  25. * 集群版(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage方法即可
  26. *
  27. * @author Scott Chen
  28. * @date 2019-11-06
  29. */
  30. // 前端发起的启动WebSocket服务地址
  31. @ServerEndpoint(value = "/ws/server/{param}")
  32. @Component
  33. public class WebSocketServer {
  34. private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
  35. private static final AtomicInteger OnlineCount = new AtomicInteger(0);
  36. private static CopyOnWriteArraySet<Session> SessionSet ;
  37. // 存放取货码,key是收银员编号
  38. private static Map<String, List<PickUpCodeEntity>> pickUpCodeMap ;
  39. private static List<PickUpCodeEntity> pickUpList ;
  40. static SimpleDateFormat sdf = null;
  41. static {
  42. // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
  43. SessionSet = new CopyOnWriteArraySet<Session>();
  44. pickUpCodeMap = new HashMap<>();
  45. pickUpList = new ArrayList<>();
  46. sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  47. }
  48. private static PickUpCodeService pickUpCodeService;
  49. private static OrderService orderService;
  50. @Autowired
  51. public void setOptionService(PickUpCodeService pickUpCodeService,OrderService orderService) {
  52. WebSocketServer.pickUpCodeService = pickUpCodeService;
  53. WebSocketServer.orderService = orderService;
  54. }
  55. public static void delete(String sessionId, Integer storeId, PickUpCodeEntity pickUpCodeEntity) throws IOException {
  56. sendMessage(sessionId,"delete",storeId+"");
  57. }
  58. /**
  59. * 连接建立成功调用的方法
  60. */
  61. @OnOpen
  62. public void onOpen(Session session,@PathParam(value="param")String param ) {
  63. List<PickUpCodeEntity> pickUpCodeEntities = pickUpCodeMap.get(param);
  64. // TODO
  65. // if(pickUpCodeEntities == null){
  66. // 查询所有的取货码
  67. Map<String,Object> dataParam = new HashMap<>();
  68. dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  69. dataParam.put("storeId",param);
  70. pickUpList = pickUpCodeService.queryList(dataParam);
  71. pickUpCodeMap.put(param,pickUpList);
  72. // }
  73. SessionSet.add(session);
  74. logger.info("WebSocket连接请求 session id:{}", session.getId());
  75. // 在线数加1
  76. int cnt = OnlineCount.incrementAndGet();
  77. logger.info("WebSocket当前连接数为:{}", cnt);
  78. sendMessage(session, "服务端返回WebSocket连接成功",param);
  79. }
  80. /**
  81. * 连接关闭调用的方法
  82. */
  83. @OnClose
  84. public void onClose(Session session) {
  85. SessionSet.remove(session);
  86. logger.info("WebSocket连接关闭 session id:{}", session.getId());
  87. int cnt = OnlineCount.decrementAndGet();
  88. logger.info("WebSocket连接关闭,当前连接数为:{}", cnt);
  89. }
  90. /**
  91. * 收到客户端消息后调用的方法
  92. *
  93. * @param message 客户端发送过来的消息
  94. */
  95. @OnMessage
  96. public void onMessage(String message, Session session) {
  97. logger.info("来自客户端[{}]的消息:[{}]", session.getId(), message);
  98. logger.info("Server开始广播推送客户端[{}]发来的新消息", session.getId());
  99. // sendMessage(session, message);
  100. broadcastInfo(message + ",【消息产生于:" + session.getId() + "】");
  101. }
  102. /**
  103. * 出现错误
  104. *
  105. * @param session
  106. * @param error
  107. */
  108. @OnError
  109. public void onError(Session session, Throwable error) {
  110. logger.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId());
  111. error.printStackTrace();
  112. }
  113. /**
  114. * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
  115. *
  116. * @param session
  117. * @param message
  118. */
  119. public static void sendMessage(Session session, String message,String picNo) {
  120. try {
  121. // List<PickUpCodeEntity> pickUpCodeEntities = new ArrayList<>();
  122. // logger.info(picNo);
  123. // if(picNo != null){
  124. // pickUpCodeEntities = pickUpCodeMap.get(picNo);
  125. // }
  126. Map<String,Object> data = new HashMap<>();
  127. data.put("delayResponse",false);
  128. /**
  129. * 查询是否有卡单的情况
  130. * 步骤:
  131. * 1.查询所有状态为201且下单时间超过5分钟的订单
  132. * 2.判断订单数是否超过10个,如果超过10个,则去查询oms是否有该订单,如果有该订单,是否存在回执
  133. * 3.循环查询,如果第一个有回执,则查询第二个,直到查询到最后一个,如果都有回执,则无异常,如果其中有一个单无回执,则证明海关可能卡单
  134. * 4.如果海关卡单,给页面一个卡单提示,并且发送短信给客服相关人员
  135. */
  136. try{
  137. // 1.查询所有状态为201且下单时间超过5分钟的订单
  138. // List<OrderEntity> all201Order = orderService.queryAll201Order();
  139. // if(all201Order != null && all201Order.size() >= 10){
  140. // // 循环查询oms是否有回执
  141. // for (OrderEntity orderEntity : all201Order) {
  142. // List<String> resultMessage = orderService.queryInveResponse(orderEntity.getOrderSn());
  143. // if(resultMessage == null || resultMessage.size() == 0){
  144. // String sendTemplate = "【CW惠州门店】时间:" + sdf.format(new Date()) + ",订单出现卡单情况!";
  145. ////
  146. //// SendMsgUtil.sendMsg("18825104165", sendTemplate);
  147. //// SendMsgUtil.sendMsg("", sendTemplate);
  148. //
  149. // // 如果能进这里,证明卡回执了
  150. // data.put("delayResponse",true);
  151. // }
  152. // }
  153. // }
  154. }catch (Exception e){
  155. }
  156. if("服务端返回WebSocket连接成功".equalsIgnoreCase(message)){
  157. data.put("sessionId",session.getId());
  158. data.put("pickUpCodeList",pickUpList);
  159. String info = JSONObject.toJSONString(data);
  160. session.getBasicRemote().sendText(info);
  161. // logger.info("发送条形码给前端,内容:" + info);
  162. }else{
  163. if("broadcast".equalsIgnoreCase(message)){
  164. Map<String,Object> dataParam = new HashMap<>();
  165. dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  166. List<PickUpCodeEntity> pickUpList = pickUpCodeService.queryList(dataParam);
  167. data.put("pickUpCodeList", pickUpList);
  168. String info = JSONObject.toJSONString(data);
  169. session.getBasicRemote().sendText(info);
  170. // logger.info("发送条形码给前端,内容:" + info);
  171. }else {
  172. // 根据订单id查询条形码,返回给前端页面
  173. PickUpCodeEntity pickUpCodeEntity = pickUpCodeService.queryObject(message);
  174. pickUpList.add(pickUpCodeEntity);
  175. data.put("pickUpCodeList", pickUpList);
  176. String info = JSONObject.toJSONString(data);
  177. session.getBasicRemote().sendText(info);
  178. // logger.info("发送条形码给前端,内容:" + info);
  179. }
  180. }
  181. } catch (IOException e) {
  182. logger.error("发送消息出错:{}", e.getMessage());
  183. e.printStackTrace();
  184. }
  185. }
  186. /**
  187. * 群发消息
  188. *
  189. * @param message
  190. * @throws IOException
  191. */
  192. public static void broadcastInfo(String message) {
  193. logger.info("Server broadcast info....");
  194. for (Session session : SessionSet) {
  195. if (session.isOpen()) {
  196. logger.info("broadcast send session id is " + session.getId());
  197. sendMessage(session, message,null);
  198. }
  199. }
  200. }
  201. /**
  202. * 指定Session发送消息
  203. *
  204. * @param sessionId
  205. * @param message
  206. * @throws IOException
  207. */
  208. public static void sendMessage(String sessionId, String message,String storeId) throws IOException {
  209. logger.info("request session id is " + sessionId);
  210. Session session = null;
  211. for (Session s : SessionSet) {
  212. if (s.getId().equals(sessionId)) {
  213. session = s;
  214. break;
  215. }
  216. }
  217. if (session != null) {
  218. sendMessage(session, message,storeId);
  219. logger.info("sessionId:"+session.getId());
  220. } else {
  221. logger.warn("没有找到你指定ID的会话:{}", sessionId);
  222. }
  223. }
  224. }