package api.service.websocket; import api.entity.database.online.Message; import api.entity.database.system.Customer; import api.entity.input.online.MessageInput; import api.entity.view.online.MessageView; import api.service.online.IMessageService; import api.service.system.ICustomerService; import api.util.helper.*; import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.bean.result.WxMediaUploadResult; import me.chanjar.weixin.common.error.WxErrorException; import me.chanjar.weixin.mp.api.WxMpService; import me.chanjar.weixin.mp.bean.kefu.WxMpKefuMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestParam; import javax.annotation.PostConstruct; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Component @ServerEndpoint("/ws/{source}/{userCode}") public class WebSocket { @Autowired private IMessageService messageService; @Autowired private ICustomerService customerService; @Autowired private WxMpService wxMpService; private static WebSocket webSocket; @PostConstruct public void init() { webSocket = this; } private Session session; private Integer source; private String userCode; private static Queue servicerQueue = new LinkedList<>(); private static ConcurrentHashMap kfsessionPool = new ConcurrentHashMap(); private static ConcurrentHashMap khsessionPool = new ConcurrentHashMap(); private static Map servicers = new HashMap<>(); private static Map customers = new HashMap<>(); private static Map khkfs = new HashMap<>(); /** * 链接成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam(value = "source") Integer source, @PathParam(value = "userCode") String userCode) { try { this.session = session; this.source = source; this.userCode = userCode; boolean reply; if (source == 1) { reply=servicers.containsKey(userCode); if(!reply) { kfsessionPool.put(userCode, session); servicers.put(userCode, 1); servicerQueue.add(userCode); } } else { reply = customers.containsKey(userCode); if (!reply) { customers.put(userCode, source); khsessionPool.put(userCode, session); Long pid = null; List ps = session.getRequestParameterMap().get("project"); if (!ps.isEmpty()) pid = Long.valueOf(ps.get(0)); addCustomer(userCode, source, pid); } } if(reply) { MessageView mv1 = new MessageView(); mv1.setStatus("error"); mv1.setContent("签入失败,重复签入"); session.getAsyncRemote().sendText(JSON.toJSONString(mv1)); } } catch (Exception e) { log.error("open错误", e); } } /** * 链接关闭调用的方法 */ @OnClose public void onClose() { try { if (source == 1) { kfsessionPool.remove(userCode); servicerQueue.remove(userCode); servicers.remove(userCode); if(khkfs.values().contains(userCode)) { khkfs.values().removeIf(p -> p.equals(userCode)); } } else { customers.remove(userCode); String kf = khkfs.get(userCode); if (!StringHelper.isEmpty(kf)) { Session session1 = kfsessionPool.get(kf); if (session1 != null) { MessageView mv1 = new MessageView(); mv1.setStatus("error"); mv1.setContent("客户已下线"); mv1.setUser(userCode); session1.getAsyncRemote().sendText(JSON.toJSONString(mv1)); } khsessionPool.remove(userCode); khkfs.remove(userCode); } } } catch (Exception e) { log.error("close错误", e); } } /** * 收到客户端消息后调用的方法 * * @param message */ @OnMessage public void onMessage(String message) { MessageInput mv = JSON.parseObject(message, MessageInput.class); if (mv != null) { String type = mv.getType(); if(type.equals("heart")){ MessageView mvh = new MessageView(); mvh.setStatus("success"); mvh.setType(type); mvh.setContent(mv.getContent()); session.getAsyncRemote().sendText(JSON.toJSONString(mvh)); return; } String content = mv.getContent(); Message msg = new Message(); msg.setCreateTime(new Date()); msg.setStatus(0); msg.setMsgType(type); if ("text".equals(type)) { msg.setMsgContent(content); } if ("image".equals(type)) { msg.setMsgFile(content); } //客服发送消息 if (this.source == 1) { msg.setSend(2); msg.setKfUser(userCode); String kh = mv.getUser(); boolean exist = false; MessageView mv1 = new MessageView(); mv1.setUser(userCode); String en = khExist( userCode,kh); if (StringHelper.isEmpty(en)) { exist = true; msg.setKhUser(kh); Integer toSource = customers.get(kh); msg.setKhSource(toSource); //微信客户 if (toSource == 2) { WxMpKefuMessage wechatMessage = WxMpKefuMessage.TEXT().toUser(kh).content(content).build(); try { webSocket.wxMpService.getKefuService().sendKefuMessage(wechatMessage); } catch (WxErrorException e) { throw new RuntimeException(e); } } else { //其他客户 Session session1 = khsessionPool.get(kh); mv1.setStatus("success"); mv1.setType(type); mv1.setContent(content); session1.getAsyncRemote().sendText(JSON.toJSONString(mv1)); } } else { mv1.setStatus("error"); mv1.setContent(en); } if (!exist) { session.getAsyncRemote().sendText(JSON.toJSONString(mv1)); return; } } else { //客户发送消息 msg.setSend(1); msg.setKhUser(userCode); msg.setKhSource(source); String kf = khkfs.get(userCode); if (StringHelper.isEmpty(kf)||servicers.get(kf)==2) { kf = servicerQueue.poll(); if (!StringHelper.isEmpty(kf)) { servicerQueue.add(kf); khkfs.put(userCode, kf); } } if (!StringHelper.isEmpty(kf)) { Session session1 = kfsessionPool.get(kf); if (session1 != null) { msg.setKfUser(kf); MessageView mv1 = new MessageView(); mv1.setStatus("success"); mv1.setType(type); mv1.setContent(content); mv1.setUser(userCode); session1.getAsyncRemote().sendText(JSON.toJSONString(mv1)); } } } //webSocket.messageService.insert(msg); AsyncHelper.instance().execute(new TimerTask() { @Override public void run() { webSocket.messageService.insert(msg); } }); } } /** * 发送错误时的处理 * * @param error */ @OnError public void onError(Throwable error) { if (source == 1) { kfsessionPool.remove(userCode); servicerQueue.remove(userCode); servicers.remove(userCode); if(khkfs.values().contains(userCode)) { khkfs.values().removeIf(p -> p.equals(userCode)); } } else { customers.remove(userCode); String kf = khkfs.get(userCode); if (!StringHelper.isEmpty(kf)) { Session session1 = kfsessionPool.get(kf); if (session1 != null) { MessageView mv1 = new MessageView(); mv1.setStatus("error"); mv1.setContent("客户已下线"); mv1.setUser(userCode); session1.getAsyncRemote().sendText(JSON.toJSONString(mv1)); } khsessionPool.remove(userCode); khkfs.remove(userCode); } } error.printStackTrace(); } //客户转移 public String khTransfer(String kf,String kh) { String msg = "客户已下线"; if(!(servicers.containsKey(kf)&&kfsessionPool.containsKey(kf))){ msg = "此客服未签入"; } else if(servicers.get(kf)==2) { msg = "此客服忙碌"; } else if (!StringHelper.isEmpty(kh)) { if (customers.containsKey(kh)) { if (khkfs.containsKey(kh)) { if (khkfs.get(kh).equals(kf) ) { msg = "不能转移给自己"; } else { khkfs.replace(kh,kf); msg = ""; } } else { if (customers.get(kh) == 2||khsessionPool.containsKey(kh)) { khkfs.put(kh, kf); msg = ""; } } } else { LambdaQueryWrapper qw = new LambdaQueryWrapper(); qw.eq(Customer::getCustomerNo, kh).eq(Customer::getType,2); if (webSocket.customerService.exists(qw)) { Date sj=new Date(new Date().getTime()-(1000*60*60*48)); LambdaQueryWrapper qw1 = new LambdaQueryWrapper(); qw1.eq(Message::getKhUser,kh).eq(Message::getSend,2).gt(Message::getCreateTime,sj); if(webSocket.messageService.exists(qw1)) { customers.put(kh, 2); khkfs.put(kh, kf); msg = ""; } } } } else { msg = "客户不能为空"; } if(StringHelper.isEmpty(msg)){ Session session1=kfsessionPool.get(kf); MessageView mv1 = new MessageView(); mv1.setStatus("success"); mv1.setType("transfer"); mv1.setContent("转移客户"); mv1.setUser(kh); session1.getAsyncRemote().sendText(JSON.toJSONString(mv1)); } return msg; } //客户是否存在 public String khExist(String kf,String kh) { String msg = "客户已下线"; if(!(servicers.containsKey(kf)&&kfsessionPool.containsKey(kf))){ msg = "未签入"; } else if (!StringHelper.isEmpty(kh)) { if (customers.containsKey(kh)) { if (khkfs.containsKey(kh)) { if (khkfs.get(kh).equals(kf) ) { if (customers.get(kh) == 2) { msg = ""; } else { if (khsessionPool.containsKey(kh)) { msg = ""; } } } else { msg = "其他客服已接入"; } } else { if (customers.get(kh) == 2||khsessionPool.containsKey(kh)) { khkfs.put(kh, kf); msg = ""; } } } else { LambdaQueryWrapper qw = new LambdaQueryWrapper(); qw.eq(Customer::getCustomerNo, kh).eq(Customer::getType,2); if (webSocket.customerService.exists(qw)) { Date sj=new Date(new Date().getTime()-(1000*60*60*48)); LambdaQueryWrapper qw1 = new LambdaQueryWrapper(); qw1.eq(Message::getKhUser,kh).eq(Message::getSend,2).gt(Message::getCreateTime,sj); if(webSocket.messageService.exists(qw1)) { customers.put(kh, 2); khkfs.put(kh, kf); msg = ""; } } } } else { msg = "客户不能为空"; } return msg; } //客服发送图片消息 public void kfSendImageMessage(MessageInput mv,String kf) { String type = mv.getType(); String content = mv.getContent(); String kh = mv.getUser(); Message message = new Message(); message.setStatus(0); message.setMsgType(type); message.setMsgFile(content); message.setKhUser(kh); message.setSend(2); message.setCreateTime(new Date()); message.setKfUser(kf); Integer source = customers.get(kh); message.setKhSource(source); //发送给微信客户 if (source == 2) { File file = new File(content); if (file.exists()) { try { WxMediaUploadResult result = webSocket.wxMpService.getMaterialService().mediaUpload(type, file); String MediaId = result.getMediaId(); WxMpKefuMessage wechatMessage = WxMpKefuMessage.IMAGE().toUser(kh).mediaId(MediaId).build(); webSocket.wxMpService.getKefuService().sendKefuMessage(wechatMessage); message.setMsgMediaid(MediaId); } catch (WxErrorException e) { log.error("上传文件失败", e); throw new RuntimeException(e); } } } else { //发送其他客户 MessageView mv1 = new MessageView(); mv1.setStatus("success"); mv1.setType(type); mv1.setContent(content); khsessionPool.get(kh).getAsyncRemote().sendText(JSON.toJSONString(mv1)); } webSocket.messageService.insert(message); } // 微信信息转发给客服 public void toWxMessage(Map map) { String sendUser = map.get("FromUserName"); String msgType = map.get("MsgType"); Message message = new Message(); message.setSend(1); message.setKhUser(sendUser); message.setKhSource(2); message.setStatus(0); message.setMsgType(msgType); message.setCreateTime(new Date()); String content = ""; if ("text".equals(msgType)) { content = map.get("Content"); message.setMsgContent(content); } else if ("image".equals(msgType) || "voice".equals(msgType)) { String mediaId=map.get("MediaId"); message.setMsgMediaid(mediaId); String fn = System.currentTimeMillis()+"."; String fileName = fn + "jpg"; if ("voice".equals(msgType)) { message.setMsgFormat(map.get("Format")); fileName = fn + map.get("Format"); } //下载临时素材 try { String path = "files/online/" + DateHelper.getDate(); File pFile = new File(path); if (!pFile.exists()) pFile.mkdirs(); String filePath = pFile.getAbsolutePath() + "/" + fileName; File lFile = new File(filePath); if (!lFile.exists()) lFile.createNewFile(); File file = webSocket.wxMpService.getMaterialService().mediaDownload(mediaId); InputStream inputStream = new FileInputStream(file); byte[] buffer = new byte[1024]; FileOutputStream outputStream = new FileOutputStream(lFile); int length; while ((length = inputStream.read(buffer)) > 0) { outputStream.write(buffer, 0, length); } inputStream.close(); outputStream.close(); message.setMsgFile(path + "/" + fileName); content = path + "/" + fileName; } catch (WxErrorException e) { log.error("下载文件失败", e); throw new RuntimeException(e); } catch (IOException e) { log.error("下载文件失败", e); throw new RuntimeException(e); } } if (!customers.containsKey(sendUser)) { customers.put(sendUser, 2); addCustomer(sendUser,2,null); }; String kf = khkfs.get(sendUser); if (StringHelper.isEmpty(kf)||servicers.get(kf)==2) { kf = servicerQueue.poll(); if (!StringHelper.isEmpty(kf)) { servicerQueue.add(kf); khkfs.put(sendUser, kf); } } if (!StringHelper.isEmpty(kf)) { message.setKfUser(kf); Session session1 = kfsessionPool.get(kf); if (session1 != null) { MessageView mv = new MessageView(); mv.setStatus("success"); mv.setType(msgType); mv.setContent(content); mv.setUser(sendUser); session1.getAsyncRemote().sendText(JSON.toJSONString(mv)); } } webSocket.messageService.insert(message); } public void addCustomer(String userCode,Integer source,Long project) { LambdaQueryWrapper qw = new LambdaQueryWrapper(); qw.eq(Customer::getCustomerNo, userCode); if (!webSocket.customerService.exists(qw)) { Customer customer = new Customer(); customer.setCustomerNo(userCode); customer.setType(Long.valueOf(source)); customer.setProject(project); customer.setFCreatetime(new Date()); customer.setFIsdelete(0L); webSocket.customerService.insert(customer); } } //客服置忙 public void kfBusy(String userCode) { if (servicers.containsKey(userCode)) { servicers.replace(userCode, 2); } if (servicerQueue.contains(userCode)) { servicerQueue.remove(userCode); } } //客服置闲 public void kfFree(String userCode) { if (servicers.containsKey(userCode)) { servicers.replace(userCode, 1); } if (!servicerQueue.contains(userCode)) { servicerQueue.add(userCode); } } //获取信息 public Object GetMap(Integer type) { if (type == 1) { return servicers; } if (type == 2) { return customers; } if (type == 3) { return khkfs; } if (type == 4) { return servicerQueue; } return servicers; } }