在线客服

WebSocket.java 20KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. package api.service.websocket;
  2. import api.entity.database.online.Message;
  3. import api.entity.database.system.Customer;
  4. import api.entity.input.online.MessageInput;
  5. import api.entity.view.online.MessageView;
  6. import api.service.online.IMessageService;
  7. import api.service.system.ICustomerService;
  8. import api.util.helper.*;
  9. import com.alibaba.fastjson2.JSON;
  10. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  11. import lombok.extern.slf4j.Slf4j;
  12. import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
  13. import me.chanjar.weixin.common.error.WxErrorException;
  14. import me.chanjar.weixin.mp.api.WxMpService;
  15. import me.chanjar.weixin.mp.bean.kefu.WxMpKefuMessage;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.stereotype.Component;
  18. import org.springframework.web.bind.annotation.RequestParam;
  19. import javax.annotation.PostConstruct;
  20. import javax.websocket.*;
  21. import javax.websocket.server.PathParam;
  22. import javax.websocket.server.ServerEndpoint;
  23. import java.io.*;
  24. import java.util.*;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. @Slf4j
  27. @Component
  28. @ServerEndpoint("/ws/{source}/{userCode}")
  29. public class WebSocket {
  30. @Autowired
  31. private IMessageService messageService;
  32. @Autowired
  33. private ICustomerService customerService;
  34. @Autowired
  35. private WxMpService wxMpService;
  36. private static WebSocket webSocket;
  37. @PostConstruct
  38. public void init() {
  39. webSocket = this;
  40. }
  41. private Session session;
  42. private Integer source;
  43. private String userCode;
  44. private static Queue<String> servicerQueue = new LinkedList<>();
  45. private static ConcurrentHashMap<String, Session> kfsessionPool = new ConcurrentHashMap<String, Session>();
  46. private static ConcurrentHashMap<String, Session> khsessionPool = new ConcurrentHashMap<String, Session>();
  47. private static Map<String, Integer> servicers = new HashMap<>();
  48. private static Map<String, Integer> customers = new HashMap<>();
  49. private static Map<String, String> khkfs = new HashMap<>();
  50. /**
  51. * 链接成功调用的方法
  52. */
  53. @OnOpen
  54. public void onOpen(Session session, @PathParam(value = "source") Integer source, @PathParam(value = "userCode") String userCode) {
  55. try {
  56. this.session = session;
  57. this.source = source;
  58. this.userCode = userCode;
  59. boolean reply;
  60. if (source == 1) {
  61. reply=servicers.containsKey(userCode);
  62. if(!reply) {
  63. kfsessionPool.put(userCode, session);
  64. servicers.put(userCode, 1);
  65. servicerQueue.add(userCode);
  66. }
  67. } else {
  68. reply = customers.containsKey(userCode);
  69. if (!reply) {
  70. customers.put(userCode, source);
  71. khsessionPool.put(userCode, session);
  72. Long pid = null;
  73. List<String> ps = session.getRequestParameterMap().get("project");
  74. if (!ps.isEmpty()) pid = Long.valueOf(ps.get(0));
  75. addCustomer(userCode, source, pid);
  76. }
  77. }
  78. if(reply) {
  79. MessageView mv1 = new MessageView();
  80. mv1.setStatus("error");
  81. mv1.setContent("签入失败,重复签入");
  82. session.getAsyncRemote().sendText(JSON.toJSONString(mv1));
  83. }
  84. } catch (Exception e) {
  85. log.error("open错误", e);
  86. }
  87. }
  88. /**
  89. * 链接关闭调用的方法
  90. */
  91. @OnClose
  92. public void onClose() {
  93. try {
  94. if (source == 1) {
  95. kfsessionPool.remove(userCode);
  96. servicerQueue.remove(userCode);
  97. servicers.remove(userCode);
  98. if(khkfs.values().contains(userCode)) {
  99. khkfs.values().removeIf(p -> p.equals(userCode));
  100. }
  101. } else {
  102. customers.remove(userCode);
  103. String kf = khkfs.get(userCode);
  104. if (!StringHelper.isEmpty(kf)) {
  105. Session session1 = kfsessionPool.get(kf);
  106. if (session1 != null) {
  107. MessageView mv1 = new MessageView();
  108. mv1.setStatus("error");
  109. mv1.setContent("客户已下线");
  110. mv1.setUser(userCode);
  111. session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
  112. }
  113. khsessionPool.remove(userCode);
  114. khkfs.remove(userCode);
  115. }
  116. }
  117. } catch (Exception e) {
  118. log.error("close错误", e);
  119. }
  120. }
  121. /**
  122. * 收到客户端消息后调用的方法
  123. *
  124. * @param message
  125. */
  126. @OnMessage
  127. public void onMessage(String message) {
  128. MessageInput mv = JSON.parseObject(message, MessageInput.class);
  129. if (mv != null) {
  130. String type = mv.getType();
  131. if(type.equals("heart")){
  132. MessageView mvh = new MessageView();
  133. mvh.setStatus("success");
  134. mvh.setType(type);
  135. mvh.setContent(mv.getContent());
  136. session.getAsyncRemote().sendText(JSON.toJSONString(mvh));
  137. return;
  138. }
  139. String content = mv.getContent();
  140. Message msg = new Message();
  141. msg.setCreateTime(new Date());
  142. msg.setStatus(0);
  143. msg.setMsgType(type);
  144. if ("text".equals(type)) {
  145. msg.setMsgContent(content);
  146. }
  147. if ("image".equals(type)) {
  148. msg.setMsgFile(content);
  149. }
  150. //客服发送消息
  151. if (this.source == 1) {
  152. msg.setSend(2);
  153. msg.setKfUser(userCode);
  154. String kh = mv.getUser();
  155. boolean exist = false;
  156. MessageView mv1 = new MessageView();
  157. mv1.setUser(userCode);
  158. String en = khExist( userCode,kh);
  159. if (StringHelper.isEmpty(en)) {
  160. exist = true;
  161. msg.setKhUser(kh);
  162. Integer toSource = customers.get(kh);
  163. msg.setKhSource(toSource);
  164. //微信客户
  165. if (toSource == 2) {
  166. WxMpKefuMessage wechatMessage = WxMpKefuMessage.TEXT().toUser(kh).content(content).build();
  167. try {
  168. webSocket.wxMpService.getKefuService().sendKefuMessage(wechatMessage);
  169. } catch (WxErrorException e) {
  170. throw new RuntimeException(e);
  171. }
  172. } else {
  173. //其他客户
  174. Session session1 = khsessionPool.get(kh);
  175. mv1.setStatus("success");
  176. mv1.setType(type);
  177. mv1.setContent(content);
  178. session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
  179. }
  180. } else {
  181. mv1.setStatus("error");
  182. mv1.setContent(en);
  183. }
  184. if (!exist) {
  185. session.getAsyncRemote().sendText(JSON.toJSONString(mv1));
  186. return;
  187. }
  188. } else {
  189. //客户发送消息
  190. msg.setSend(1);
  191. msg.setKhUser(userCode);
  192. msg.setKhSource(source);
  193. String kf = khkfs.get(userCode);
  194. if (StringHelper.isEmpty(kf)||servicers.get(kf)==2) {
  195. kf = servicerQueue.poll();
  196. if (!StringHelper.isEmpty(kf)) {
  197. servicerQueue.add(kf);
  198. khkfs.put(userCode, kf);
  199. }
  200. }
  201. if (!StringHelper.isEmpty(kf)) {
  202. Session session1 = kfsessionPool.get(kf);
  203. if (session1 != null) {
  204. msg.setKfUser(kf);
  205. MessageView mv1 = new MessageView();
  206. mv1.setStatus("success");
  207. mv1.setType(type);
  208. mv1.setContent(content);
  209. mv1.setUser(userCode);
  210. session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
  211. }
  212. }
  213. }
  214. //webSocket.messageService.insert(msg);
  215. AsyncHelper.instance().execute(new TimerTask() {
  216. @Override
  217. public void run() {
  218. webSocket.messageService.insert(msg);
  219. }
  220. });
  221. }
  222. }
  223. /**
  224. * 发送错误时的处理
  225. *
  226. * @param error
  227. */
  228. @OnError
  229. public void onError(Throwable error) {
  230. if (source == 1) {
  231. kfsessionPool.remove(userCode);
  232. servicerQueue.remove(userCode);
  233. servicers.remove(userCode);
  234. if(khkfs.values().contains(userCode)) {
  235. khkfs.values().removeIf(p -> p.equals(userCode));
  236. }
  237. } else {
  238. customers.remove(userCode);
  239. String kf = khkfs.get(userCode);
  240. if (!StringHelper.isEmpty(kf)) {
  241. Session session1 = kfsessionPool.get(kf);
  242. if (session1 != null) {
  243. MessageView mv1 = new MessageView();
  244. mv1.setStatus("error");
  245. mv1.setContent("客户已下线");
  246. mv1.setUser(userCode);
  247. session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
  248. }
  249. khsessionPool.remove(userCode);
  250. khkfs.remove(userCode);
  251. }
  252. }
  253. error.printStackTrace();
  254. }
  255. //客户转移
  256. public String khTransfer(String kf,String kh) {
  257. String msg = "客户已下线";
  258. if(!(servicers.containsKey(kf)&&kfsessionPool.containsKey(kf))){
  259. msg = "此客服未签入";
  260. } else if(servicers.get(kf)==2) {
  261. msg = "此客服忙碌";
  262. } else if (!StringHelper.isEmpty(kh)) {
  263. if (customers.containsKey(kh)) {
  264. if (khkfs.containsKey(kh)) {
  265. if (khkfs.get(kh).equals(kf) ) {
  266. msg = "不能转移给自己";
  267. } else {
  268. khkfs.replace(kh,kf);
  269. msg = "";
  270. }
  271. } else {
  272. if (customers.get(kh) == 2||khsessionPool.containsKey(kh)) {
  273. khkfs.put(kh, kf);
  274. msg = "";
  275. }
  276. }
  277. } else {
  278. LambdaQueryWrapper<Customer> qw = new LambdaQueryWrapper();
  279. qw.eq(Customer::getCustomerNo, kh).eq(Customer::getType,2);
  280. if (webSocket.customerService.exists(qw)) {
  281. Date sj=new Date(new Date().getTime()-(1000*60*60*48));
  282. LambdaQueryWrapper<Message> qw1 = new LambdaQueryWrapper();
  283. qw1.eq(Message::getKhUser,kh).eq(Message::getSend,2).gt(Message::getCreateTime,sj);
  284. if(webSocket.messageService.exists(qw1)) {
  285. customers.put(kh, 2);
  286. khkfs.put(kh, kf);
  287. msg = "";
  288. }
  289. }
  290. }
  291. } else {
  292. msg = "客户不能为空";
  293. }
  294. if(StringHelper.isEmpty(msg)){
  295. Session session1=kfsessionPool.get(kf);
  296. MessageView mv1 = new MessageView();
  297. mv1.setStatus("success");
  298. mv1.setType("transfer");
  299. mv1.setContent("转移客户");
  300. mv1.setUser(kh);
  301. session1.getAsyncRemote().sendText(JSON.toJSONString(mv1));
  302. }
  303. return msg;
  304. }
  305. //客户是否存在
  306. public String khExist(String kf,String kh) {
  307. String msg = "客户已下线";
  308. if(!(servicers.containsKey(kf)&&kfsessionPool.containsKey(kf))){
  309. msg = "未签入";
  310. }
  311. else if (!StringHelper.isEmpty(kh)) {
  312. if (customers.containsKey(kh)) {
  313. if (khkfs.containsKey(kh)) {
  314. if (khkfs.get(kh).equals(kf) ) {
  315. if (customers.get(kh) == 2) {
  316. msg = "";
  317. } else {
  318. if (khsessionPool.containsKey(kh)) {
  319. msg = "";
  320. }
  321. }
  322. } else {
  323. msg = "其他客服已接入";
  324. }
  325. } else {
  326. if (customers.get(kh) == 2||khsessionPool.containsKey(kh)) {
  327. khkfs.put(kh, kf);
  328. msg = "";
  329. }
  330. }
  331. } else {
  332. LambdaQueryWrapper<Customer> qw = new LambdaQueryWrapper();
  333. qw.eq(Customer::getCustomerNo, kh).eq(Customer::getType,2);
  334. if (webSocket.customerService.exists(qw)) {
  335. Date sj=new Date(new Date().getTime()-(1000*60*60*48));
  336. LambdaQueryWrapper<Message> qw1 = new LambdaQueryWrapper();
  337. qw1.eq(Message::getKhUser,kh).eq(Message::getSend,2).gt(Message::getCreateTime,sj);
  338. if(webSocket.messageService.exists(qw1)) {
  339. customers.put(kh, 2);
  340. khkfs.put(kh, kf);
  341. msg = "";
  342. }
  343. }
  344. }
  345. } else {
  346. msg = "客户不能为空";
  347. }
  348. return msg;
  349. }
  350. //客服发送图片消息
  351. public void kfSendImageMessage(MessageInput mv,String kf) {
  352. String type = mv.getType();
  353. String content = mv.getContent();
  354. String kh = mv.getUser();
  355. Message message = new Message();
  356. message.setStatus(0);
  357. message.setMsgType(type);
  358. message.setMsgFile(content);
  359. message.setKhUser(kh);
  360. message.setSend(2);
  361. message.setCreateTime(new Date());
  362. message.setKfUser(kf);
  363. Integer source = customers.get(kh);
  364. message.setKhSource(source);
  365. //发送给微信客户
  366. if (source == 2) {
  367. File file = new File(content);
  368. if (file.exists()) {
  369. try {
  370. WxMediaUploadResult result = webSocket.wxMpService.getMaterialService().mediaUpload(type, file);
  371. String MediaId = result.getMediaId();
  372. WxMpKefuMessage wechatMessage = WxMpKefuMessage.IMAGE().toUser(kh).mediaId(MediaId).build();
  373. webSocket.wxMpService.getKefuService().sendKefuMessage(wechatMessage);
  374. message.setMsgMediaid(MediaId);
  375. } catch (WxErrorException e) {
  376. log.error("上传文件失败", e);
  377. throw new RuntimeException(e);
  378. }
  379. }
  380. } else {
  381. //发送其他客户
  382. MessageView mv1 = new MessageView();
  383. mv1.setStatus("success");
  384. mv1.setType(type);
  385. mv1.setContent(content);
  386. khsessionPool.get(kh).getAsyncRemote().sendText(JSON.toJSONString(mv1));
  387. }
  388. webSocket.messageService.insert(message);
  389. }
  390. // 微信信息转发给客服
  391. public void toWxMessage(Map<String, String> map) {
  392. String sendUser = map.get("FromUserName");
  393. String msgType = map.get("MsgType");
  394. Message message = new Message();
  395. message.setSend(1);
  396. message.setKhUser(sendUser);
  397. message.setKhSource(2);
  398. message.setStatus(0);
  399. message.setMsgType(msgType);
  400. message.setCreateTime(new Date());
  401. String content = "";
  402. if ("text".equals(msgType)) {
  403. content = map.get("Content");
  404. message.setMsgContent(content);
  405. } else if ("image".equals(msgType) || "voice".equals(msgType)) {
  406. String mediaId=map.get("MediaId");
  407. message.setMsgMediaid(mediaId);
  408. String fn = System.currentTimeMillis()+".";
  409. String fileName = fn + "jpg";
  410. if ("voice".equals(msgType)) {
  411. message.setMsgFormat(map.get("Format"));
  412. fileName = fn + map.get("Format");
  413. }
  414. //下载临时素材
  415. try {
  416. String path = "files/online/" + DateHelper.getDate();
  417. File pFile = new File(path);
  418. if (!pFile.exists()) pFile.mkdirs();
  419. String filePath = pFile.getAbsolutePath() + "/" + fileName;
  420. File lFile = new File(filePath);
  421. if (!lFile.exists()) lFile.createNewFile();
  422. File file = webSocket.wxMpService.getMaterialService().mediaDownload(mediaId);
  423. InputStream inputStream = new FileInputStream(file);
  424. byte[] buffer = new byte[1024];
  425. FileOutputStream outputStream = new FileOutputStream(lFile);
  426. int length;
  427. while ((length = inputStream.read(buffer)) > 0) {
  428. outputStream.write(buffer, 0, length);
  429. }
  430. inputStream.close();
  431. outputStream.close();
  432. message.setMsgFile(path + "/" + fileName);
  433. content = path + "/" + fileName;
  434. } catch (WxErrorException e) {
  435. log.error("下载文件失败", e);
  436. throw new RuntimeException(e);
  437. } catch (IOException e) {
  438. log.error("下载文件失败", e);
  439. throw new RuntimeException(e);
  440. }
  441. }
  442. if (!customers.containsKey(sendUser)) {
  443. customers.put(sendUser, 2);
  444. addCustomer(sendUser,2,null);
  445. };
  446. String kf = khkfs.get(sendUser);
  447. if (StringHelper.isEmpty(kf)||servicers.get(kf)==2) {
  448. kf = servicerQueue.poll();
  449. if (!StringHelper.isEmpty(kf)) {
  450. servicerQueue.add(kf);
  451. khkfs.put(sendUser, kf);
  452. }
  453. }
  454. if (!StringHelper.isEmpty(kf)) {
  455. message.setKfUser(kf);
  456. Session session1 = kfsessionPool.get(kf);
  457. if (session1 != null) {
  458. MessageView mv = new MessageView();
  459. mv.setStatus("success");
  460. mv.setType(msgType);
  461. mv.setContent(content);
  462. mv.setUser(sendUser);
  463. session1.getAsyncRemote().sendText(JSON.toJSONString(mv));
  464. }
  465. }
  466. webSocket.messageService.insert(message);
  467. }
  468. public void addCustomer(String userCode,Integer source,Long project) {
  469. LambdaQueryWrapper<Customer> qw = new LambdaQueryWrapper();
  470. qw.eq(Customer::getCustomerNo, userCode);
  471. if (!webSocket.customerService.exists(qw)) {
  472. Customer customer = new Customer();
  473. customer.setCustomerNo(userCode);
  474. customer.setType(Long.valueOf(source));
  475. customer.setProject(project);
  476. customer.setFCreatetime(new Date());
  477. customer.setFIsdelete(0L);
  478. webSocket.customerService.insert(customer);
  479. }
  480. }
  481. //客服置忙
  482. public void kfBusy(String userCode) {
  483. if (servicers.containsKey(userCode)) {
  484. servicers.replace(userCode, 2);
  485. }
  486. if (servicerQueue.contains(userCode)) {
  487. servicerQueue.remove(userCode);
  488. }
  489. }
  490. //客服置闲
  491. public void kfFree(String userCode) {
  492. if (servicers.containsKey(userCode)) {
  493. servicers.replace(userCode, 1);
  494. }
  495. if (!servicerQueue.contains(userCode)) {
  496. servicerQueue.add(userCode);
  497. }
  498. }
  499. //获取信息
  500. public Object GetMap(Integer type) {
  501. if (type == 1) {
  502. return servicers;
  503. }
  504. if (type == 2) {
  505. return customers;
  506. }
  507. if (type == 3) {
  508. return khkfs;
  509. }
  510. if (type == 4) {
  511. return servicerQueue;
  512. }
  513. return servicers;
  514. }
  515. }