using Aliyun.Acs.Core; using Aliyun.Acs.Dybaseapi.Model.V20170525; using Aliyun.MNS; using Aliyun.MNS.Model; using Microsoft.Extensions.Caching.Redis; using Microsoft.Extensions.Caching.Distributed; using System; using System.Collections.Generic; using System.Text; using System.Threading; using Newtonsoft.Json.Linq; using Newtonsoft.Json; using System.Linq; using Newtonsoft.Json.Converters; namespace SmsReceive { public class ReceiveTask { private object o = new object(); private int sleepTime = 50; public String name { get; private set; } public String messageType { get; private set; } public String queueName { get; private set; } public String mnsAccountEndpoint { get; private set; } public int TaskID { get; private set; } public IAcsClient acsClient { get; private set; } public RedisCache redisCache = null; public ReceiveTask(String name, String messageType, String queueName, String mnsAccountEndpoint, IAcsClient acsClient, RedisCache redisCache) { this.name = name; this.messageType = messageType; this.queueName = queueName; this.mnsAccountEndpoint = mnsAccountEndpoint; this.acsClient = acsClient; this.redisCache = redisCache; } long bufferTime = 60 * 2;//过期时间小于2分钟则重新获取,防止服务器时间误差 Dictionary tokenMap = new Dictionary(); Dictionary queueMap = new Dictionary(); /// 处理消息 public void Handle() { while (true) { try { QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO token = null; Queue queue = null; lock (o) { if (tokenMap.ContainsKey(messageType)) { token = tokenMap[messageType]; } if (queueMap.ContainsKey(queueName)) { queue = queueMap[queueName]; } TimeSpan ts = new TimeSpan(0); if (token != null) { DateTime b = Convert.ToDateTime(token.ExpireTime); DateTime c = Convert.ToDateTime(DateTime.Now); ts = b - c; } if (token == null || ts.TotalSeconds < bufferTime || queue == null) { token = GetTokenByMessageType(acsClient, messageType); IMNS client = new Aliyun.MNS.MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken); queue = client.GetNativeQueue(queueName); if (tokenMap.ContainsKey(messageType)) { tokenMap.Remove(messageType); } if (queueMap.ContainsKey(queueName)) { queueMap.Remove(queueName); } tokenMap.Add(messageType, token); queueMap.Add(queueName, queue); } } BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16); if (batchReceiveMessageResponse != null) { List messages = batchReceiveMessageResponse.Messages; foreach (var msg in messages) { try { byte[] outputb = Convert.FromBase64String(msg.Body); string orgStr = Encoding.UTF8.GetString(outputb); System.Console.WriteLine(orgStr); ////TODO 具体消费逻辑,待客户自己实现. //var orgmodel = JObject.Parse(orgStr.Replace(" ", "")); //if (orgmodel["success"].ToString().ToLower() != "true") //{ // var sendliststr = redisCache.GetString("T_Sms_Send"); // var sendlist = new List(); // if (!string.IsNullOrEmpty(sendliststr)) // { // sendlist = JsonConvert.DeserializeObject>(sendliststr); // } // var send = new T_Sms_Send(); // send.Id = sendlist.Count > 0 ? sendlist.Max(p => p.Id) + 1 : 1; // send.BizId = orgmodel["biz_id"].ToString(); // send.PhoneNumber = orgmodel["phone_number"].ToString(); // send.SendTime = DateTime.Now; // send.ReportTime = DateTime.Parse(orgmodel["report_time"].ToString()); // send.SendTime= DateTime.Parse(orgmodel["send_time"].ToString()); // send.SmsSize = Int32.Parse(orgmodel["sms_size"].ToString()); // send.State = 2; // send.ErrCode= orgmodel["err_code"].ToString(); // send.ErrMsg = orgmodel["err_msg"].ToString(); // sendlist.Add(send); // var timeConverter = new IsoDateTimeConverter { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" }; // redisCache.SetString("T_Sms_Send", JsonConvert.SerializeObject(sendlist, timeConverter)); //} //消费成功的前提下删除消息 queue.DeleteMessage(msg.ReceiptHandle); } catch (Exception e) { System.Console.WriteLine("exception:" + e.Message.ToString()); } } } } catch (Exception e) { System.Console.WriteLine("Handle exception:" + e.Message.ToString()); } Thread.Sleep(sleepTime); } } public QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO GetTokenByMessageType(IAcsClient acsClient, String messageType) { QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest(); request.MessageType = messageType; QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse = acsClient.GetAcsResponse(request); QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO token = queryTokenForMnsQueueResponse.MessageTokenDTO; return token; } /// /// 消息类 /// public partial class T_Sms_Send { public T_Sms_Send() { } #region Model private int _id; private string _phonenumber; private string _bizid; private DateTime? _sendtime; private DateTime? _reporttime; private int _state; private string _errcode; private string _errmsg; private int _smssize; /// /// 自增id /// public int Id { set { _id = value; } get { return _id; } } /// /// 手机号码 /// public string PhoneNumber { set { _phonenumber = value; } get { return _phonenumber; } } /// /// 发送回执ID /// public string BizId { set { _bizid = value; } get { return _bizid; } } /// /// 发送时间 /// public DateTime? SendTime { set { _sendtime = value; } get { return _sendtime; } } /// /// 回执时间 /// public DateTime? ReportTime { set { _reporttime = value; } get { return _reporttime; } } /// /// 发送状态 1:等待回执,2:发送失败,3:发送成功 /// public int State { set { _state = value; } get { return _state; } } /// /// 错误码 /// public string ErrCode { set { _errcode = value; } get { return _errcode; } } /// /// 错误信息 /// public string ErrMsg { set { _errmsg = value; } get { return _errmsg; } } /// /// 短信大小--140字节算一条短信,短信长度超过140字节时会拆分成多条短信发送 /// public int SmsSize { set { _smssize = value; } get { return _smssize; } } #endregion Model } } }