WebSocketServer.java 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package com.kmall.admin.websocket;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.kmall.admin.entity.PickUpCodeEntity;
  4. import com.kmall.admin.service.PickUpCodeService;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import javax.websocket.*;
  10. import javax.websocket.server.PathParam;
  11. import javax.websocket.server.ServerEndpoint;
  12. import java.io.IOException;
  13. import java.text.SimpleDateFormat;
  14. import java.util.*;
  15. import java.util.concurrent.CopyOnWriteArraySet;
  16. import java.util.concurrent.atomic.AtomicInteger;
  17. /**
  18. * WebSocket服务端
  19. *
  20. * 新建一个CopyOnWriteArraySet SessionSet 用于接收当前会话的Session,方便进行推送消息。单机版实现到这里就可以
  21. * 集群版(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage方法即可
  22. *
  23. * @author Scott Chen
  24. * @date 2019-11-06
  25. */
  26. // 前端发起的启动WebSocket服务地址
  27. @ServerEndpoint(value = "/ws/server/{param}")
  28. @Component
  29. public class WebSocketServer {
  30. private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
  31. private static final AtomicInteger OnlineCount = new AtomicInteger(0);
  32. private static CopyOnWriteArraySet<Session> SessionSet ;
  33. // 存放取货码,key是收银员编号
  34. private static Map<String, List<PickUpCodeEntity>> pickUpCodeMap ;
  35. private static List<PickUpCodeEntity> pickUpList ;
  36. static {
  37. // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
  38. SessionSet = new CopyOnWriteArraySet<Session>();
  39. pickUpCodeMap = new HashMap<>();
  40. pickUpList = new ArrayList<>();
  41. }
  42. private static PickUpCodeService pickUpCodeService;
  43. @Autowired
  44. public void setOptionService(PickUpCodeService pickUpCodeService) {
  45. WebSocketServer.pickUpCodeService = pickUpCodeService;
  46. }
  47. public static void delete(String sessionId, Integer storeId, PickUpCodeEntity pickUpCodeEntity) throws IOException {
  48. // List<PickUpCodeEntity> pickUpCodeEntities = pickUpCodeMap.get(storeId+"");
  49. // for(PickUpCodeEntity pick : pickUpList){
  50. // if(pick.getPickUpCodeSn().equalsIgnoreCase(pickUpCodeEntity.getPickUpCodeSn())){
  51. // pickUpList.remove(pick);
  52. // }
  53. // }
  54. sendMessage(sessionId,"delete",storeId+"");
  55. }
  56. /**
  57. * 连接建立成功调用的方法
  58. */
  59. @OnOpen
  60. public void onOpen(Session session,@PathParam(value="param")String param ) {
  61. List<PickUpCodeEntity> pickUpCodeEntities = pickUpCodeMap.get(param);
  62. // TODO
  63. // if(pickUpCodeEntities == null){
  64. // 查询所有的取货码
  65. Map<String,Object> dataParam = new HashMap<>();
  66. dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  67. // dataParam.put("pickUpCodeStatus","0");
  68. pickUpList = pickUpCodeService.queryList(dataParam);
  69. pickUpCodeMap.put(param,pickUpList);
  70. // }
  71. SessionSet.add(session);
  72. logger.info("WebSocket连接请求 session id:{}", session.getId());
  73. // 在线数加1
  74. int cnt = OnlineCount.incrementAndGet();
  75. logger.info("WebSocket当前连接数为:{}", cnt);
  76. sendMessage(session, "服务端返回WebSocket连接成功",param);
  77. }
  78. /**
  79. * 连接关闭调用的方法
  80. */
  81. @OnClose
  82. public void onClose(Session session) {
  83. SessionSet.remove(session);
  84. logger.info("WebSocket连接关闭 session id:{}", session.getId());
  85. int cnt = OnlineCount.decrementAndGet();
  86. logger.info("WebSocket连接关闭,当前连接数为:{}", cnt);
  87. }
  88. /**
  89. * 收到客户端消息后调用的方法
  90. *
  91. * @param message 客户端发送过来的消息
  92. */
  93. @OnMessage
  94. public void onMessage(String message, Session session) {
  95. logger.info("来自客户端[{}]的消息:[{}]", session.getId(), message);
  96. logger.info("Server开始广播推送客户端[{}]发来的新消息", session.getId());
  97. // sendMessage(session, message);
  98. broadcastInfo(message + ",【消息产生于:" + session.getId() + "】");
  99. }
  100. /**
  101. * 出现错误
  102. *
  103. * @param session
  104. * @param error
  105. */
  106. @OnError
  107. public void onError(Session session, Throwable error) {
  108. logger.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId());
  109. error.printStackTrace();
  110. }
  111. /**
  112. * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
  113. *
  114. * @param session
  115. * @param message
  116. */
  117. public static void sendMessage(Session session, String message,String picNo) {
  118. try {
  119. // List<PickUpCodeEntity> pickUpCodeEntities = new ArrayList<>();
  120. logger.info(picNo);
  121. // if(picNo != null){
  122. // pickUpCodeEntities = pickUpCodeMap.get(picNo);
  123. // }
  124. Map<String,Object> data = new HashMap<>();
  125. if("服务端返回WebSocket连接成功".equalsIgnoreCase(message)){
  126. data.put("sessionId",session.getId());
  127. data.put("pickUpCodeList",pickUpList);
  128. String info = JSONObject.toJSONString(data);
  129. session.getBasicRemote().sendText(info);
  130. // logger.info("发送条形码给前端,内容:" + info);
  131. }else{
  132. if("broadcast".equalsIgnoreCase(message)){
  133. Map<String,Object> dataParam = new HashMap<>();
  134. dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
  135. List<PickUpCodeEntity> pickUpList = pickUpCodeService.queryList(dataParam);
  136. data.put("pickUpCodeList", pickUpList);
  137. String info = JSONObject.toJSONString(data);
  138. session.getBasicRemote().sendText(info);
  139. // logger.info("发送条形码给前端,内容:" + info);
  140. }else {
  141. // 根据订单id查询条形码,返回给前端页面
  142. PickUpCodeEntity pickUpCodeEntity = pickUpCodeService.queryObject(message);
  143. pickUpList.add(pickUpCodeEntity);
  144. data.put("pickUpCodeList", pickUpList);
  145. String info = JSONObject.toJSONString(data);
  146. session.getBasicRemote().sendText(info);
  147. // logger.info("发送条形码给前端,内容:" + info);
  148. }
  149. }
  150. } catch (IOException e) {
  151. logger.error("发送消息出错:{}", e.getMessage());
  152. e.printStackTrace();
  153. }
  154. }
  155. /**
  156. * 群发消息
  157. *
  158. * @param message
  159. * @throws IOException
  160. */
  161. public static void broadcastInfo(String message) {
  162. logger.info("Server broadcast info....");
  163. for (Session session : SessionSet) {
  164. if (session.isOpen()) {
  165. logger.info("broadcast send session id is " + session.getId());
  166. sendMessage(session, message,null);
  167. }
  168. }
  169. }
  170. /**
  171. * 指定Session发送消息
  172. *
  173. * @param sessionId
  174. * @param message
  175. * @throws IOException
  176. */
  177. public static void sendMessage(String sessionId, String message,String storeId) throws IOException {
  178. logger.info("request session id is " + sessionId);
  179. Session session = null;
  180. for (Session s : SessionSet) {
  181. if (s.getId().equals(sessionId)) {
  182. session = s;
  183. break;
  184. }
  185. }
  186. if (session != null) {
  187. sendMessage(session, message,storeId);
  188. logger.info("sessionId:"+session.getId());
  189. } else {
  190. logger.warn("没有找到你指定ID的会话:{}", sessionId);
  191. }
  192. }
  193. }