| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 |
- package api.service.websocket;
- import api.entity.database.online.Message;
- import api.entity.database.system.Customer;
- import api.entity.input.online.MessageInput;
- import api.entity.view.online.MessageView;
- import api.service.online.IMessageService;
- import api.service.system.ICustomerService;
- import api.util.helper.*;
- import com.alibaba.fastjson2.JSON;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import lombok.extern.slf4j.Slf4j;
- import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
- import me.chanjar.weixin.common.error.WxErrorException;
- import me.chanjar.weixin.mp.api.WxMpService;
- import me.chanjar.weixin.mp.bean.kefu.WxMpKefuMessage;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.web.bind.annotation.RequestParam;
- import javax.annotation.PostConstruct;
- import javax.websocket.*;
- import javax.websocket.server.PathParam;
- import javax.websocket.server.ServerEndpoint;
- import java.io.*;
- import java.util.*;
- import java.util.concurrent.ConcurrentHashMap;
- @Slf4j
- @Component
- @ServerEndpoint("/ws/{source}/{userCode}")
- public class WebSocket {
- @Autowired
- private IMessageService messageService;
- @Autowired
- private ICustomerService customerService;
- @Autowired
- private WxMpService wxMpService;
- private static WebSocket webSocket;
- @PostConstruct
- public void init() {
- webSocket = this;
- }
- private Session session;
- private Integer source;
- private String userCode;
- private static Queue<String> servicerQueue = new LinkedList<>();
- private static ConcurrentHashMap<String, Session> kfsessionPool = new ConcurrentHashMap<String, Session>();
- private static ConcurrentHashMap<String, Session> khsessionPool = new ConcurrentHashMap<String, Session>();
- private static Map<String, Integer> servicers = new HashMap<>();
- private static Map<String, Integer> customers = new HashMap<>();
- private static Map<String, String> khkfs = new HashMap<>();
- /**
- * 链接成功调用的方法
- */
- @OnOpen
- public void onOpen(Session session, @PathParam(value = "source") Integer source, @PathParam(value = "userCode") String userCode) {
- try {
- this.session = session;
- this.source = source;
- this.userCode = userCode;
- boolean reply;
- if (source == 1) {
- reply=servicers.containsKey(userCode);
- if(!reply) {
- kfsessionPool.put(userCode, session);
- servicers.put(userCode, 1);
- servicerQueue.add(userCode);
- }
- } else {
- reply = customers.containsKey(userCode);
- if (!reply) {
- customers.put(userCode, source);
- khsessionPool.put(userCode, session);
- Long pid = null;
- List<String> ps = session.getRequestParameterMap().get("project");
- if (!ps.isEmpty()) pid = Long.valueOf(ps.get(0));
- addCustomer(userCode, source, pid);
- }
- }
- if(reply) {
- MessageView mv1 = new MessageView();
- mv1.setStatus("error");
- mv1.setContent("签入失败,重复签入");
- session.getAsyncRemote().sendText(JSON.toJSONString(mv1));
- }
- } catch (Exception e) {
- log.error("open错误", e);
- }
- }
- /**
- * 链接关闭调用的方法
- */
- @OnClose
- public void onClose() {
- try {
- if (source == 1) {
- kfsessionPool.remove(userCode);
- servicerQueue.remove(userCode);
- servicers.remove(userCode);
- if(khkfs.values().contains(userCode)) {
- khkfs.values().removeIf(p -> p.equals(userCode));
- }
- } else {
- customers.remove(userCode);
- String kf = khkfs.get(userCode);
- if (!StringHelper.isEmpty(kf)) {
- Session session1 = kfsessionPool.get(kf);
- if (session1 != null) {
- MessageView mv1 = new MessageView();
- mv1.setStatus("error");
- mv1.setContent("客户已下线");
- mv1.setUser(userCode);
- session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
- }
- khsessionPool.remove(userCode);
- khkfs.remove(userCode);
- }
- }
- } catch (Exception e) {
- log.error("close错误", e);
- }
- }
- /**
- * 收到客户端消息后调用的方法
- *
- * @param message
- */
- @OnMessage
- public void onMessage(String message) {
- MessageInput mv = JSON.parseObject(message, MessageInput.class);
- if (mv != null) {
- String type = mv.getType();
- if(type.equals("heart")){
- MessageView mvh = new MessageView();
- mvh.setStatus("success");
- mvh.setType(type);
- mvh.setContent(mv.getContent());
- session.getAsyncRemote().sendText(JSON.toJSONString(mvh));
- return;
- }
- String content = mv.getContent();
- Message msg = new Message();
- msg.setCreateTime(new Date());
- msg.setStatus(0);
- msg.setMsgType(type);
- if ("text".equals(type)) {
- msg.setMsgContent(content);
- }
- if ("image".equals(type)) {
- msg.setMsgFile(content);
- }
- //客服发送消息
- if (this.source == 1) {
- msg.setSend(2);
- msg.setKfUser(userCode);
- String kh = mv.getUser();
- boolean exist = false;
- MessageView mv1 = new MessageView();
- mv1.setUser(userCode);
- String en = khExist( userCode,kh);
- if (StringHelper.isEmpty(en)) {
- exist = true;
- msg.setKhUser(kh);
- Integer toSource = customers.get(kh);
- msg.setKhSource(toSource);
- //微信客户
- if (toSource == 2) {
- WxMpKefuMessage wechatMessage = WxMpKefuMessage.TEXT().toUser(kh).content(content).build();
- try {
- webSocket.wxMpService.getKefuService().sendKefuMessage(wechatMessage);
- } catch (WxErrorException e) {
- throw new RuntimeException(e);
- }
- } else {
- //其他客户
- Session session1 = khsessionPool.get(kh);
- mv1.setStatus("success");
- mv1.setType(type);
- mv1.setContent(content);
- session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
- }
- } else {
- mv1.setStatus("error");
- mv1.setContent(en);
- }
- if (!exist) {
- session.getAsyncRemote().sendText(JSON.toJSONString(mv1));
- return;
- }
- } else {
- //客户发送消息
- msg.setSend(1);
- msg.setKhUser(userCode);
- msg.setKhSource(source);
- String kf = khkfs.get(userCode);
- if (StringHelper.isEmpty(kf)||servicers.get(kf)==2) {
- kf = servicerQueue.poll();
- if (!StringHelper.isEmpty(kf)) {
- servicerQueue.add(kf);
- khkfs.put(userCode, kf);
- }
- }
- if (!StringHelper.isEmpty(kf)) {
- Session session1 = kfsessionPool.get(kf);
- if (session1 != null) {
- msg.setKfUser(kf);
- MessageView mv1 = new MessageView();
- mv1.setStatus("success");
- mv1.setType(type);
- mv1.setContent(content);
- mv1.setUser(userCode);
- session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
- }
- }
- }
- //webSocket.messageService.insert(msg);
- AsyncHelper.instance().execute(new TimerTask() {
- @Override
- public void run() {
- webSocket.messageService.insert(msg);
- }
- });
- }
- }
- /**
- * 发送错误时的处理
- *
- * @param error
- */
- @OnError
- public void onError(Throwable error) {
- if (source == 1) {
- kfsessionPool.remove(userCode);
- servicerQueue.remove(userCode);
- servicers.remove(userCode);
- if(khkfs.values().contains(userCode)) {
- khkfs.values().removeIf(p -> p.equals(userCode));
- }
- } else {
- customers.remove(userCode);
- String kf = khkfs.get(userCode);
- if (!StringHelper.isEmpty(kf)) {
- Session session1 = kfsessionPool.get(kf);
- if (session1 != null) {
- MessageView mv1 = new MessageView();
- mv1.setStatus("error");
- mv1.setContent("客户已下线");
- mv1.setUser(userCode);
- session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
- }
- khsessionPool.remove(userCode);
- khkfs.remove(userCode);
- }
- }
- error.printStackTrace();
- }
- //客户转移
- public String khTransfer(String kf,String kh) {
- String msg = "客户已下线";
- if(!(servicers.containsKey(kf)&&kfsessionPool.containsKey(kf))){
- msg = "此客服未签入";
- } else if(servicers.get(kf)==2) {
- msg = "此客服忙碌";
- } else if (!StringHelper.isEmpty(kh)) {
- if (customers.containsKey(kh)) {
- if (khkfs.containsKey(kh)) {
- if (khkfs.get(kh).equals(kf) ) {
- msg = "不能转移给自己";
- } else {
- khkfs.replace(kh,kf);
- msg = "";
- }
- } else {
- if (customers.get(kh) == 2||khsessionPool.containsKey(kh)) {
- khkfs.put(kh, kf);
- msg = "";
- }
- }
- } else {
- LambdaQueryWrapper<Customer> qw = new LambdaQueryWrapper();
- qw.eq(Customer::getCustomerNo, kh).eq(Customer::getType,2);
- if (webSocket.customerService.exists(qw)) {
- Date sj=new Date(new Date().getTime()-(1000*60*60*48));
- LambdaQueryWrapper<Message> qw1 = new LambdaQueryWrapper();
- qw1.eq(Message::getKhUser,kh).eq(Message::getSend,2).gt(Message::getCreateTime,sj);
- if(webSocket.messageService.exists(qw1)) {
- customers.put(kh, 2);
- khkfs.put(kh, kf);
- msg = "";
- }
- }
- }
- } else {
- msg = "客户不能为空";
- }
- if(StringHelper.isEmpty(msg)){
- Session session1=kfsessionPool.get(kf);
- MessageView mv1 = new MessageView();
- mv1.setStatus("success");
- mv1.setType("transfer");
- mv1.setContent("转移客户");
- mv1.setUser(kh);
- session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
- }
- return msg;
- }
- //客户是否存在
- public String khExist(String kf,String kh) {
- String msg = "客户已下线";
- if(!(servicers.containsKey(kf)&&kfsessionPool.containsKey(kf))){
- msg = "未签入";
- }
- else if (!StringHelper.isEmpty(kh)) {
- if (customers.containsKey(kh)) {
- if (khkfs.containsKey(kh)) {
- if (khkfs.get(kh).equals(kf) ) {
- if (customers.get(kh) == 2) {
- msg = "";
- } else {
- if (khsessionPool.containsKey(kh)) {
- msg = "";
- }
- }
- } else {
- msg = "其他客服已接入";
- }
- } else {
- if (customers.get(kh) == 2||khsessionPool.containsKey(kh)) {
- khkfs.put(kh, kf);
- msg = "";
- }
- }
- } else {
- LambdaQueryWrapper<Customer> qw = new LambdaQueryWrapper();
- qw.eq(Customer::getCustomerNo, kh).eq(Customer::getType,2);
- if (webSocket.customerService.exists(qw)) {
- Date sj=new Date(new Date().getTime()-(1000*60*60*48));
- LambdaQueryWrapper<Message> qw1 = new LambdaQueryWrapper();
- qw1.eq(Message::getKhUser,kh).eq(Message::getSend,2).gt(Message::getCreateTime,sj);
- if(webSocket.messageService.exists(qw1)) {
- customers.put(kh, 2);
- khkfs.put(kh, kf);
- msg = "";
- }
- }
- }
- } else {
- msg = "客户不能为空";
- }
- return msg;
- }
- //客服发送图片消息
- public void kfSendImageMessage(MessageInput mv,String kf) {
- String type = mv.getType();
- String content = mv.getContent();
- String kh = mv.getUser();
- Message message = new Message();
- message.setStatus(0);
- message.setMsgType(type);
- message.setMsgFile(content);
- message.setKhUser(kh);
- message.setSend(2);
- message.setCreateTime(new Date());
- message.setKfUser(kf);
- Integer source = customers.get(kh);
- message.setKhSource(source);
- //发送给微信客户
- if (source == 2) {
- File file = new File(content);
- if (file.exists()) {
- try {
- WxMediaUploadResult result = webSocket.wxMpService.getMaterialService().mediaUpload(type, file);
- String MediaId = result.getMediaId();
- WxMpKefuMessage wechatMessage = WxMpKefuMessage.IMAGE().toUser(kh).mediaId(MediaId).build();
- webSocket.wxMpService.getKefuService().sendKefuMessage(wechatMessage);
- message.setMsgMediaid(MediaId);
- } catch (WxErrorException e) {
- log.error("上传文件失败", e);
- throw new RuntimeException(e);
- }
- }
- } else {
- //发送其他客户
- MessageView mv1 = new MessageView();
- mv1.setStatus("success");
- mv1.setType(type);
- mv1.setContent(content);
- khsessionPool.get(kh).getAsyncRemote().sendText(JSON.toJSONString(mv1));
- }
- webSocket.messageService.insert(message);
- }
- // 微信信息转发给客服
- public void toWxMessage(Map<String, String> map) {
- String sendUser = map.get("FromUserName");
- String msgType = map.get("MsgType");
- Message message = new Message();
- message.setSend(1);
- message.setKhUser(sendUser);
- message.setKhSource(2);
- message.setStatus(0);
- message.setMsgType(msgType);
- message.setCreateTime(new Date());
- String content = "";
- if ("text".equals(msgType)) {
- content = map.get("Content");
- message.setMsgContent(content);
- } else if ("image".equals(msgType) || "voice".equals(msgType)) {
- String mediaId=map.get("MediaId");
- message.setMsgMediaid(mediaId);
- String fn = System.currentTimeMillis()+".";
- String fileName = fn + "jpg";
- if ("voice".equals(msgType)) {
- message.setMsgFormat(map.get("Format"));
- fileName = fn + map.get("Format");
- }
- //下载临时素材
- try {
- String path = "files/online/" + DateHelper.getDate();
- File pFile = new File(path);
- if (!pFile.exists()) pFile.mkdirs();
- String filePath = pFile.getAbsolutePath() + "/" + fileName;
- File lFile = new File(filePath);
- if (!lFile.exists()) lFile.createNewFile();
- File file = webSocket.wxMpService.getMaterialService().mediaDownload(mediaId);
- InputStream inputStream = new FileInputStream(file);
- byte[] buffer = new byte[1024];
- FileOutputStream outputStream = new FileOutputStream(lFile);
- int length;
- while ((length = inputStream.read(buffer)) > 0) {
- outputStream.write(buffer, 0, length);
- }
- inputStream.close();
- outputStream.close();
- message.setMsgFile(path + "/" + fileName);
- content = path + "/" + fileName;
- } catch (WxErrorException e) {
- log.error("下载文件失败", e);
- throw new RuntimeException(e);
- } catch (IOException e) {
- log.error("下载文件失败", e);
- throw new RuntimeException(e);
- }
- }
- if (!customers.containsKey(sendUser)) {
- customers.put(sendUser, 2);
- addCustomer(sendUser,2,null);
- };
- String kf = khkfs.get(sendUser);
- if (StringHelper.isEmpty(kf)||servicers.get(kf)==2) {
- kf = servicerQueue.poll();
- if (!StringHelper.isEmpty(kf)) {
- servicerQueue.add(kf);
- khkfs.put(sendUser, kf);
- }
- }
- if (!StringHelper.isEmpty(kf)) {
- message.setKfUser(kf);
- Session session1 = kfsessionPool.get(kf);
- if (session1 != null) {
- MessageView mv = new MessageView();
- mv.setStatus("success");
- mv.setType(msgType);
- mv.setContent(content);
- mv.setUser(sendUser);
- session1.getAsyncRemote().sendText(JSON.toJSONString(mv));
- }
- }
- webSocket.messageService.insert(message);
- }
- public void addCustomer(String userCode,Integer source,Long project) {
- LambdaQueryWrapper<Customer> qw = new LambdaQueryWrapper();
- qw.eq(Customer::getCustomerNo, userCode);
- if (!webSocket.customerService.exists(qw)) {
- Customer customer = new Customer();
- customer.setCustomerNo(userCode);
- customer.setType(Long.valueOf(source));
- customer.setProject(project);
- customer.setFCreatetime(new Date());
- customer.setFIsdelete(0L);
- webSocket.customerService.insert(customer);
- }
- }
- //客服置忙
- public void kfBusy(String userCode) {
- if (servicers.containsKey(userCode)) {
- servicers.replace(userCode, 2);
- }
- if (servicerQueue.contains(userCode)) {
- servicerQueue.remove(userCode);
- }
- }
- //客服置闲
- public void kfFree(String userCode) {
- if (servicers.containsKey(userCode)) {
- servicers.replace(userCode, 1);
- }
- if (!servicerQueue.contains(userCode)) {
- servicerQueue.add(userCode);
- }
- }
- //获取信息
- public Object GetMap(Integer type) {
- if (type == 1) {
- return servicers;
- }
- if (type == 2) {
- return customers;
- }
- if (type == 3) {
- return khkfs;
- }
- if (type == 4) {
- return servicerQueue;
- }
- return servicers;
- }
- }
|