短信平台

ReceiveTask.cs 10KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. using Aliyun.Acs.Core;
  2. using Aliyun.Acs.Dybaseapi.Model.V20170525;
  3. using Aliyun.MNS;
  4. using Aliyun.MNS.Model;
  5. using Microsoft.Extensions.Caching.Redis;
  6. using Microsoft.Extensions.Caching.Distributed;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Text;
  10. using System.Threading;
  11. using Newtonsoft.Json.Linq;
  12. using Newtonsoft.Json;
  13. using System.Linq;
  14. using Newtonsoft.Json.Converters;
  15. namespace SmsReceive
  16. {
  17. public class ReceiveTask
  18. {
  19. private object o = new object();
  20. private int sleepTime = 50;
  21. public String name { get; private set; }
  22. public String messageType { get; private set; }
  23. public String queueName { get; private set; }
  24. public String mnsAccountEndpoint { get; private set; }
  25. public int TaskID { get; private set; }
  26. public IAcsClient acsClient { get; private set; }
  27. public RedisCache redisCache = null;
  28. public ReceiveTask(String name, String messageType, String queueName, String mnsAccountEndpoint, IAcsClient acsClient, RedisCache redisCache)
  29. {
  30. this.name = name;
  31. this.messageType = messageType;
  32. this.queueName = queueName;
  33. this.mnsAccountEndpoint = mnsAccountEndpoint;
  34. this.acsClient = acsClient;
  35. this.redisCache = redisCache;
  36. }
  37. long bufferTime = 60 * 2;//过期时间小于2分钟则重新获取,防止服务器时间误差
  38. Dictionary<string, QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO>();
  39. Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();
  40. /// 处理消息
  41. public void Handle()
  42. {
  43. while (true)
  44. {
  45. try
  46. {
  47. QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO token = null;
  48. Queue queue = null;
  49. lock (o)
  50. {
  51. if (tokenMap.ContainsKey(messageType))
  52. {
  53. token = tokenMap[messageType];
  54. }
  55. if (queueMap.ContainsKey(queueName))
  56. {
  57. queue = queueMap[queueName];
  58. }
  59. TimeSpan ts = new TimeSpan(0);
  60. if (token != null)
  61. {
  62. DateTime b = Convert.ToDateTime(token.ExpireTime);
  63. DateTime c = Convert.ToDateTime(DateTime.Now);
  64. ts = b - c;
  65. }
  66. if (token == null || ts.TotalSeconds < bufferTime || queue == null)
  67. {
  68. token = GetTokenByMessageType(acsClient, messageType);
  69. IMNS client = new Aliyun.MNS.MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
  70. queue = client.GetNativeQueue(queueName);
  71. if (tokenMap.ContainsKey(messageType))
  72. {
  73. tokenMap.Remove(messageType);
  74. }
  75. if (queueMap.ContainsKey(queueName))
  76. {
  77. queueMap.Remove(queueName);
  78. }
  79. tokenMap.Add(messageType, token);
  80. queueMap.Add(queueName, queue);
  81. }
  82. }
  83. BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
  84. if (batchReceiveMessageResponse != null)
  85. {
  86. List<Message> messages = batchReceiveMessageResponse.Messages;
  87. foreach (var msg in messages)
  88. {
  89. try
  90. {
  91. byte[] outputb = Convert.FromBase64String(msg.Body);
  92. string orgStr = Encoding.UTF8.GetString(outputb);
  93. System.Console.WriteLine(orgStr);
  94. ////TODO 具体消费逻辑,待客户自己实现.
  95. //var orgmodel = JObject.Parse(orgStr.Replace("&nbsp;", ""));
  96. //if (orgmodel["success"].ToString().ToLower() != "true")
  97. //{
  98. // var sendliststr = redisCache.GetString("T_Sms_Send");
  99. // var sendlist = new List<T_Sms_Send>();
  100. // if (!string.IsNullOrEmpty(sendliststr))
  101. // {
  102. // sendlist = JsonConvert.DeserializeObject<List<T_Sms_Send>>(sendliststr);
  103. // }
  104. // var send = new T_Sms_Send();
  105. // send.Id = sendlist.Count > 0 ? sendlist.Max(p => p.Id) + 1 : 1;
  106. // send.BizId = orgmodel["biz_id"].ToString();
  107. // send.PhoneNumber = orgmodel["phone_number"].ToString();
  108. // send.SendTime = DateTime.Now;
  109. // send.ReportTime = DateTime.Parse(orgmodel["report_time"].ToString());
  110. // send.SendTime= DateTime.Parse(orgmodel["send_time"].ToString());
  111. // send.SmsSize = Int32.Parse(orgmodel["sms_size"].ToString());
  112. // send.State = 2;
  113. // send.ErrCode= orgmodel["err_code"].ToString();
  114. // send.ErrMsg = orgmodel["err_msg"].ToString();
  115. // sendlist.Add(send);
  116. // var timeConverter = new IsoDateTimeConverter { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" };
  117. // redisCache.SetString("T_Sms_Send", JsonConvert.SerializeObject(sendlist, timeConverter));
  118. //}
  119. //消费成功的前提下删除消息
  120. queue.DeleteMessage(msg.ReceiptHandle);
  121. }
  122. catch (Exception e)
  123. {
  124. System.Console.WriteLine("exception:" + e.Message.ToString());
  125. }
  126. }
  127. }
  128. }
  129. catch (Exception e)
  130. {
  131. System.Console.WriteLine("Handle exception:" + e.Message.ToString());
  132. }
  133. Thread.Sleep(sleepTime);
  134. }
  135. }
  136. public QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO GetTokenByMessageType(IAcsClient acsClient, String messageType)
  137. {
  138. QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest();
  139. request.MessageType = messageType;
  140. QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse = acsClient.GetAcsResponse(request);
  141. QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO token = queryTokenForMnsQueueResponse.MessageTokenDTO;
  142. return token;
  143. }
  144. /// <summary>
  145. /// 消息类
  146. /// </summary>
  147. public partial class T_Sms_Send
  148. {
  149. public T_Sms_Send()
  150. { }
  151. #region Model
  152. private int _id;
  153. private string _phonenumber;
  154. private string _bizid;
  155. private DateTime? _sendtime;
  156. private DateTime? _reporttime;
  157. private int _state;
  158. private string _errcode;
  159. private string _errmsg;
  160. private int _smssize;
  161. /// <summary>
  162. /// 自增id
  163. /// </summary>
  164. public int Id
  165. {
  166. set { _id = value; }
  167. get { return _id; }
  168. }
  169. /// <summary>
  170. /// 手机号码
  171. /// </summary>
  172. public string PhoneNumber
  173. {
  174. set { _phonenumber = value; }
  175. get { return _phonenumber; }
  176. }
  177. /// <summary>
  178. /// 发送回执ID
  179. /// </summary>
  180. public string BizId
  181. {
  182. set { _bizid = value; }
  183. get { return _bizid; }
  184. }
  185. /// <summary>
  186. /// 发送时间
  187. /// </summary>
  188. public DateTime? SendTime
  189. {
  190. set { _sendtime = value; }
  191. get { return _sendtime; }
  192. }
  193. /// <summary>
  194. /// 回执时间
  195. /// </summary>
  196. public DateTime? ReportTime
  197. {
  198. set { _reporttime = value; }
  199. get { return _reporttime; }
  200. }
  201. /// <summary>
  202. /// 发送状态 1:等待回执,2:发送失败,3:发送成功
  203. /// </summary>
  204. public int State
  205. {
  206. set { _state = value; }
  207. get { return _state; }
  208. }
  209. /// <summary>
  210. /// 错误码
  211. /// </summary>
  212. public string ErrCode
  213. {
  214. set { _errcode = value; }
  215. get { return _errcode; }
  216. }
  217. /// <summary>
  218. /// 错误信息
  219. /// </summary>
  220. public string ErrMsg
  221. {
  222. set { _errmsg = value; }
  223. get { return _errmsg; }
  224. }
  225. /// <summary>
  226. /// 短信大小--140字节算一条短信,短信长度超过140字节时会拆分成多条短信发送
  227. /// </summary>
  228. public int SmsSize
  229. {
  230. set { _smssize = value; }
  231. get { return _smssize; }
  232. }
  233. #endregion Model
  234. }
  235. }
  236. }