123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- package com.kmall.admin.websocket;
- import com.alibaba.fastjson.JSONObject;
- import com.kmall.admin.entity.PickUpCodeEntity;
- import com.kmall.admin.service.PickUpCodeService;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.IOException;
- import java.text.SimpleDateFormat;
- import java.util.*;
- import java.util.concurrent.CopyOnWriteArraySet;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * WebSocket服务端
- *
- * 新建一个CopyOnWriteArraySet SessionSet 用于接收当前会话的Session,方便进行推送消息。单机版实现到这里就可以
- * 集群版(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage方法即可
- *
- * @author Scott Chen
- * @date 2019-11-06
- */
- // 前端发起的启动WebSocket服务地址
- @ServerEndpoint(value = "/ws/server/{param}")
- @Component
- public class WebSocketServer {
- private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
- private static final AtomicInteger OnlineCount = new AtomicInteger(0);
- private static CopyOnWriteArraySet<Session> SessionSet ;
- // 存放取货码,key是收银员编号
- private static Map<String, List<PickUpCodeEntity>> pickUpCodeMap ;
- private static List<PickUpCodeEntity> pickUpList ;
- static {
- // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
- SessionSet = new CopyOnWriteArraySet<Session>();
- pickUpCodeMap = new HashMap<>();
- pickUpList = new ArrayList<>();
- }
- private static PickUpCodeService pickUpCodeService;
- @Autowired
- public void setOptionService(PickUpCodeService pickUpCodeService) {
- WebSocketServer.pickUpCodeService = pickUpCodeService;
- }
- public static void delete(String sessionId, Integer storeId, PickUpCodeEntity pickUpCodeEntity) throws IOException {
- // List<PickUpCodeEntity> pickUpCodeEntities = pickUpCodeMap.get(storeId+"");
- // for(PickUpCodeEntity pick : pickUpList){
- // if(pick.getPickUpCodeSn().equalsIgnoreCase(pickUpCodeEntity.getPickUpCodeSn())){
- // pickUpList.remove(pick);
- // }
- // }
- sendMessage(sessionId,"delete",storeId+"");
- }
- /**
- * 连接建立成功调用的方法
- */
- @OnOpen
- public void onOpen(Session session,@PathParam(value="param")String param ) {
- List<PickUpCodeEntity> pickUpCodeEntities = pickUpCodeMap.get(param);
- // TODO
- // if(pickUpCodeEntities == null){
- // 查询所有的取货码
- Map<String,Object> dataParam = new HashMap<>();
- dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
- // dataParam.put("pickUpCodeStatus","0");
- pickUpList = pickUpCodeService.queryList(dataParam);
- pickUpCodeMap.put(param,pickUpList);
- // }
- SessionSet.add(session);
- logger.info("WebSocket连接请求 session id:{}", session.getId());
- // 在线数加1
- int cnt = OnlineCount.incrementAndGet();
- logger.info("WebSocket当前连接数为:{}", cnt);
- sendMessage(session, "服务端返回WebSocket连接成功",param);
- }
- /**
- * 连接关闭调用的方法
- */
- @OnClose
- public void onClose(Session session) {
- SessionSet.remove(session);
- logger.info("WebSocket连接关闭 session id:{}", session.getId());
- int cnt = OnlineCount.decrementAndGet();
- logger.info("WebSocket连接关闭,当前连接数为:{}", cnt);
- }
- /**
- * 收到客户端消息后调用的方法
- *
- * @param message 客户端发送过来的消息
- */
- @OnMessage
- public void onMessage(String message, Session session) {
- logger.info("来自客户端[{}]的消息:[{}]", session.getId(), message);
- logger.info("Server开始广播推送客户端[{}]发来的新消息", session.getId());
- // sendMessage(session, message);
- broadcastInfo(message + ",【消息产生于:" + session.getId() + "】");
- }
- /**
- * 出现错误
- *
- * @param session
- * @param error
- */
- @OnError
- public void onError(Session session, Throwable error) {
- logger.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId());
- error.printStackTrace();
- }
- /**
- * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
- *
- * @param session
- * @param message
- */
- public static void sendMessage(Session session, String message,String picNo) {
- try {
- // List<PickUpCodeEntity> pickUpCodeEntities = new ArrayList<>();
- logger.info(picNo);
- // if(picNo != null){
- // pickUpCodeEntities = pickUpCodeMap.get(picNo);
- // }
- Map<String,Object> data = new HashMap<>();
- if("服务端返回WebSocket连接成功".equalsIgnoreCase(message)){
- data.put("sessionId",session.getId());
- data.put("pickUpCodeList",pickUpList);
- String info = JSONObject.toJSONString(data);
- session.getBasicRemote().sendText(info);
- logger.info("发送条形码给前端,内容:" + info);
- }else{
- if("broadcast".equalsIgnoreCase(message)){
- Map<String,Object> dataParam = new HashMap<>();
- dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
- List<PickUpCodeEntity> pickUpList = pickUpCodeService.queryList(dataParam);
- data.put("pickUpCodeList", pickUpList);
- String info = JSONObject.toJSONString(data);
- session.getBasicRemote().sendText(info);
- logger.info("发送条形码给前端,内容:" + info);
- }else {
- // 根据订单id查询条形码,返回给前端页面
- PickUpCodeEntity pickUpCodeEntity = pickUpCodeService.queryObject(message);
- pickUpList.add(pickUpCodeEntity);
- data.put("pickUpCodeList", pickUpList);
- String info = JSONObject.toJSONString(data);
- session.getBasicRemote().sendText(info);
- logger.info("发送条形码给前端,内容:" + info);
- }
- }
- } catch (IOException e) {
- logger.error("发送消息出错:{}", e.getMessage());
- e.printStackTrace();
- }
- }
- /**
- * 群发消息
- *
- * @param message
- * @throws IOException
- */
- public static void broadcastInfo(String message) {
- logger.info("Server broadcast info....");
- for (Session session : SessionSet) {
- if (session.isOpen()) {
- logger.info("broadcast send session id is " + session.getId());
- sendMessage(session, message,null);
- }
- }
- }
- /**
- * 指定Session发送消息
- *
- * @param sessionId
- * @param message
- * @throws IOException
- */
- public static void sendMessage(String sessionId, String message,String storeId) throws IOException {
- logger.info("request session id is " + sessionId);
- Session session = null;
- for (Session s : SessionSet) {
- if (s.getId().equals(sessionId)) {
- session = s;
- break;
- }
- }
- if (session != null) {
- sendMessage(session, message,storeId);
- logger.info("sessionId:"+session.getId());
- } else {
- logger.warn("没有找到你指定ID的会话:{}", sessionId);
- }
- }
- }
|