| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- using Aliyun.Acs.Core;
- using Aliyun.Acs.Dybaseapi.Model.V20170525;
- using Aliyun.MNS;
- using Aliyun.MNS.Model;
- using Microsoft.Extensions.Caching.Redis;
- using Microsoft.Extensions.Caching.Distributed;
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- using Newtonsoft.Json.Linq;
- using Newtonsoft.Json;
- using System.Linq;
- using Newtonsoft.Json.Converters;
- namespace SmsReceive
- {
- public class ReceiveTask
- {
- private object o = new object();
- private int sleepTime = 50;
- public String name { get; private set; }
- public String messageType { get; private set; }
- public String queueName { get; private set; }
- public String mnsAccountEndpoint { get; private set; }
- public int TaskID { get; private set; }
- public IAcsClient acsClient { get; private set; }
- public RedisCache redisCache = null;
- public ReceiveTask(String name, String messageType, String queueName, String mnsAccountEndpoint, IAcsClient acsClient, RedisCache redisCache)
- {
- this.name = name;
- this.messageType = messageType;
- this.queueName = queueName;
- this.mnsAccountEndpoint = mnsAccountEndpoint;
- this.acsClient = acsClient;
- this.redisCache = redisCache;
- }
- long bufferTime = 60 * 2;//过期时间小于2分钟则重新获取,防止服务器时间误差
- Dictionary<string, QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO>();
- Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();
- /// 处理消息
- public void Handle()
- {
- while (true)
- {
- try
- {
- QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO token = null;
- Queue queue = null;
- lock (o)
- {
- if (tokenMap.ContainsKey(messageType))
- {
- token = tokenMap[messageType];
- }
- if (queueMap.ContainsKey(queueName))
- {
- queue = queueMap[queueName];
- }
- TimeSpan ts = new TimeSpan(0);
- if (token != null)
- {
- DateTime b = Convert.ToDateTime(token.ExpireTime);
- DateTime c = Convert.ToDateTime(DateTime.Now);
- ts = b - c;
- }
- if (token == null || ts.TotalSeconds < bufferTime || queue == null)
- {
- token = GetTokenByMessageType(acsClient, messageType);
- IMNS client = new Aliyun.MNS.MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
- queue = client.GetNativeQueue(queueName);
- if (tokenMap.ContainsKey(messageType))
- {
- tokenMap.Remove(messageType);
- }
- if (queueMap.ContainsKey(queueName))
- {
- queueMap.Remove(queueName);
- }
- tokenMap.Add(messageType, token);
- queueMap.Add(queueName, queue);
- }
- }
- BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
- if (batchReceiveMessageResponse != null)
- {
- List<Message> messages = batchReceiveMessageResponse.Messages;
- foreach (var msg in messages)
- {
- try
- {
- byte[] outputb = Convert.FromBase64String(msg.Body);
- string orgStr = Encoding.UTF8.GetString(outputb);
- System.Console.WriteLine(orgStr);
- ////TODO 具体消费逻辑,待客户自己实现.
- //var orgmodel = JObject.Parse(orgStr.Replace(" ", ""));
- //if (orgmodel["success"].ToString().ToLower() != "true")
- //{
- // var sendliststr = redisCache.GetString("T_Sms_Send");
- // var sendlist = new List<T_Sms_Send>();
- // if (!string.IsNullOrEmpty(sendliststr))
- // {
- // sendlist = JsonConvert.DeserializeObject<List<T_Sms_Send>>(sendliststr);
- // }
- // var send = new T_Sms_Send();
- // send.Id = sendlist.Count > 0 ? sendlist.Max(p => p.Id) + 1 : 1;
- // send.BizId = orgmodel["biz_id"].ToString();
- // send.PhoneNumber = orgmodel["phone_number"].ToString();
- // send.SendTime = DateTime.Now;
- // send.ReportTime = DateTime.Parse(orgmodel["report_time"].ToString());
- // send.SendTime= DateTime.Parse(orgmodel["send_time"].ToString());
- // send.SmsSize = Int32.Parse(orgmodel["sms_size"].ToString());
- // send.State = 2;
- // send.ErrCode= orgmodel["err_code"].ToString();
- // send.ErrMsg = orgmodel["err_msg"].ToString();
- // sendlist.Add(send);
- // var timeConverter = new IsoDateTimeConverter { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" };
- // redisCache.SetString("T_Sms_Send", JsonConvert.SerializeObject(sendlist, timeConverter));
- //}
- //消费成功的前提下删除消息
- queue.DeleteMessage(msg.ReceiptHandle);
- }
- catch (Exception e)
- {
- System.Console.WriteLine("exception:" + e.Message.ToString());
- }
- }
- }
- }
- catch (Exception e)
- {
- System.Console.WriteLine("Handle exception:" + e.Message.ToString());
- }
- Thread.Sleep(sleepTime);
- }
- }
- public QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO GetTokenByMessageType(IAcsClient acsClient, String messageType)
- {
- QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest();
- request.MessageType = messageType;
- QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse = acsClient.GetAcsResponse(request);
- QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO token = queryTokenForMnsQueueResponse.MessageTokenDTO;
- return token;
- }
- /// <summary>
- /// 消息类
- /// </summary>
- public partial class T_Sms_Send
- {
- public T_Sms_Send()
- { }
- #region Model
- private int _id;
- private string _phonenumber;
- private string _bizid;
- private DateTime? _sendtime;
- private DateTime? _reporttime;
- private int _state;
- private string _errcode;
- private string _errmsg;
- private int _smssize;
- /// <summary>
- /// 自增id
- /// </summary>
- public int Id
- {
- set { _id = value; }
- get { return _id; }
- }
- /// <summary>
- /// 手机号码
- /// </summary>
- public string PhoneNumber
- {
- set { _phonenumber = value; }
- get { return _phonenumber; }
- }
- /// <summary>
- /// 发送回执ID
- /// </summary>
- public string BizId
- {
- set { _bizid = value; }
- get { return _bizid; }
- }
- /// <summary>
- /// 发送时间
- /// </summary>
- public DateTime? SendTime
- {
- set { _sendtime = value; }
- get { return _sendtime; }
- }
- /// <summary>
- /// 回执时间
- /// </summary>
- public DateTime? ReportTime
- {
- set { _reporttime = value; }
- get { return _reporttime; }
- }
- /// <summary>
- /// 发送状态 1:等待回执,2:发送失败,3:发送成功
- /// </summary>
- public int State
- {
- set { _state = value; }
- get { return _state; }
- }
- /// <summary>
- /// 错误码
- /// </summary>
- public string ErrCode
- {
- set { _errcode = value; }
- get { return _errcode; }
- }
- /// <summary>
- /// 错误信息
- /// </summary>
- public string ErrMsg
- {
- set { _errmsg = value; }
- get { return _errmsg; }
- }
- /// <summary>
- /// 短信大小--140字节算一条短信,短信长度超过140字节时会拆分成多条短信发送
- /// </summary>
- public int SmsSize
- {
- set { _smssize = value; }
- get { return _smssize; }
- }
- #endregion Model
- }
- }
- }
|