package com.kmall.admin.websocket; import com.alibaba.fastjson.JSONObject; import com.kmall.admin.entity.OrderEntity; import com.kmall.admin.entity.PickUpCodeEntity; import com.kmall.admin.haikong.constant.Constants; import com.kmall.admin.service.OrderService; import com.kmall.admin.service.PickUpCodeService; import com.kmall.api.util.SendMsgUtil; import com.kmall.common.utils.R; import com.kmall.manager.manager.redis.JedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicInteger; /** * WebSocket服务端 * * 新建一个CopyOnWriteArraySet SessionSet 用于接收当前会话的Session,方便进行推送消息。单机版实现到这里就可以 * 集群版(多个ws节点)还需要借助mysql或者redis等进行处理,改造对应的sendMessage方法即可 * * @author Scott Chen * @date 2019-11-06 */ // 前端发起的启动WebSocket服务地址 @ServerEndpoint(value = "/ws/server/{param}") @Component public class WebSocketServer { private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class); private static final AtomicInteger OnlineCount = new AtomicInteger(0); private static CopyOnWriteArraySet SessionSet ; // 存放小票码,key是收银员编号 private static Map> pickUpCodeMap ; private static List pickUpList ; static SimpleDateFormat sdf = null; static { // concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。 SessionSet = new CopyOnWriteArraySet(); pickUpCodeMap = new HashMap<>(); pickUpList = new ArrayList<>(); sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } private static PickUpCodeService pickUpCodeService; private static OrderService orderService; @Autowired public void setOptionService(PickUpCodeService pickUpCodeService,OrderService orderService) { WebSocketServer.pickUpCodeService = pickUpCodeService; WebSocketServer.orderService = orderService; } public static void delete(String sessionId, Integer storeId, PickUpCodeEntity pickUpCodeEntity) throws IOException { sendMessage(sessionId,"delete",storeId+""); } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session,@PathParam(value="param")String param ) { List pickUpCodeEntities = pickUpCodeMap.get(param); // TODO // if(pickUpCodeEntities == null){ // 查询所有的小票码 Map dataParam = new HashMap<>(); dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date())); dataParam.put("storeId",param); JedisUtil.del(Constants.WAREHOUSE_STOCK_MAP_KEY + "_" + param); pickUpList = pickUpCodeService.queryList(dataParam); pickUpCodeMap.put(param,pickUpList); // } SessionSet.add(session); logger.info("WebSocket连接请求 session id:{}", session.getId()); // 在线数加1 int cnt = OnlineCount.incrementAndGet(); logger.info("WebSocket当前连接数为:{}", cnt); sendMessage(session, "服务端返回WebSocket连接成功",param); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session) { SessionSet.remove(session); logger.info("WebSocket连接关闭 session id:{}", session.getId()); int cnt = OnlineCount.decrementAndGet(); logger.info("WebSocket连接关闭,当前连接数为:{}", cnt); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { logger.info("来自客户端[{}]的消息:[{}]", session.getId(), message); logger.info("Server开始广播推送客户端[{}]发来的新消息", session.getId()); // sendMessage(session, message); broadcastInfo(message + ",【消息产生于:" + session.getId() + "】"); } /** * 出现错误 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { logger.error("发生错误:{},Session ID: {}", error.getMessage(), session.getId()); error.printStackTrace(); } /** * 发送消息,实践表明,每次浏览器刷新,session会发生变化。 * * @param session * @param message */ public static void sendMessage(Session session, String message,String picNo) { try { // List pickUpCodeEntities = new ArrayList<>(); // logger.info(picNo); // if(picNo != null){ // pickUpCodeEntities = pickUpCodeMap.get(picNo); // } Map data = new HashMap<>(); data.put("delayResponse",false); /** * 查询是否有卡单的情况 * 步骤: * 1.查询所有状态为201且下单时间超过5分钟的订单 * 2.判断订单数是否超过10个,如果超过10个,则去查询oms是否有该订单,如果有该订单,是否存在回执 * 3.循环查询,如果第一个有回执,则查询第二个,直到查询到最后一个,如果都有回执,则无异常,如果其中有一个单无回执,则证明海关可能卡单 * 4.如果海关卡单,给页面一个卡单提示,并且发送短信给客服相关人员 */ try{ // 1.查询所有状态为201且下单时间超过5分钟的订单 // List all201Order = orderService.queryAll201Order(); // if(all201Order != null && all201Order.size() >= 10){ // // 循环查询oms是否有回执 // for (OrderEntity orderEntity : all201Order) { // List resultMessage = orderService.queryInveResponse(orderEntity.getOrderSn()); // if(resultMessage == null || resultMessage.size() == 0){ // String sendTemplate = "【CW惠州门店】时间:" + sdf.format(new Date()) + ",订单出现卡单情况!"; //// //// SendMsgUtil.sendMsg("18825104165", sendTemplate); //// SendMsgUtil.sendMsg("", sendTemplate); // // // 如果能进这里,证明卡回执了 // data.put("delayResponse",true); // } // } // } }catch (Exception e){ } if("服务端返回WebSocket连接成功".equalsIgnoreCase(message)){ data.put("sessionId",session.getId()); data.put("pickUpCodeList",pickUpList); String info = JSONObject.toJSONString(data); session.getBasicRemote().sendText(info); // logger.info("发送条形码给前端,内容:" + info); }else{ if("broadcast".equalsIgnoreCase(message)){ Map dataParam = new HashMap<>(); dataParam.put("pickUpCodeCreatetime",new SimpleDateFormat("yyyy-MM-dd").format(new Date())); List pickUpList = pickUpCodeService.queryList(dataParam); data.put("pickUpCodeList", pickUpList); String info = JSONObject.toJSONString(data); session.getBasicRemote().sendText(info); // logger.info("发送条形码给前端,内容:" + info); }else { // 根据订单id查询条形码,返回给前端页面 PickUpCodeEntity pickUpCodeEntity = pickUpCodeService.queryObject(message); pickUpList.add(pickUpCodeEntity); data.put("pickUpCodeList", pickUpList); String info = JSONObject.toJSONString(data); session.getBasicRemote().sendText(info); // logger.info("发送条形码给前端,内容:" + info); } } } catch (IOException e) { logger.error("发送消息出错:{}", e.getMessage()); e.printStackTrace(); } } /** * 群发消息 * * @param message * @throws IOException */ public static void broadcastInfo(String message) { logger.info("Server broadcast info...."); for (Session session : SessionSet) { if (session.isOpen()) { logger.info("broadcast send session id is " + session.getId()); sendMessage(session, message,null); } } } /** * 指定Session发送消息 * * @param sessionId * @param message * @throws IOException */ public static void sendMessage(String sessionId, String message,String storeId) throws IOException { logger.info("request session id is " + sessionId); Session session = null; for (Session s : SessionSet) { if (s.getId().equals(sessionId)) { session = s; break; } } if (session != null) { sendMessage(session, message,storeId); logger.info("sessionId:"+session.getId()); } else { logger.warn("没有找到你指定ID的会话:{}", sessionId); } } }