linux版本中间件

RabbitmqClient.h 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #pragma once
  2. #include <cstdlib>
  3. #include <cstring>
  4. #include <string>
  5. #include <vector>
  6. #include <atomic>
  7. #include <functional>
  8. #include <amqp.h>
  9. #include <amqp_tcp_socket.h>
  10. #include <thread>
  11. using RecvCallBack = std::function<void(const std::string&)>;
  12. class CRabbitmqClient
  13. {
  14. public:
  15. CRabbitmqClient();
  16. ~CRabbitmqClient();
  17. /*
  18. * 连接RabbitMQ Server
  19. * @param [in] strExchange:交换器名称
  20. * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅)
  21. * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
  22. * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
  23. * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
  24. * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
  25. * @returns 设置结果
  26. */
  27. int Connect(const std::string &strHostname, int iPort, const std::string &strUser, const std::string &strPasswd);
  28. int Disconnect();
  29. /*
  30. * 声明交换器
  31. * @param [in] strExchange:交换器名称
  32. * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅)
  33. * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
  34. * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
  35. * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
  36. * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
  37. * @returns 声明结果
  38. */
  39. int ExchangeDeclare(const std::string &strExchange, const std::string& strType, bool isPassive = false,
  40. bool isDurable = false, bool isAutoDelete = false, int internal = 0);
  41. /*
  42. * 声明队列
  43. * @param [in] strQueueName:队列名称
  44. * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
  45. * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
  46. * @param [in] isExclusive:只有自己的用户对该队列可见
  47. * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
  48. * @returns 声明结果
  49. */
  50. int QueueDelare(const std::string& strQueueName, bool isPassive = false,
  51. bool isDurable = false, bool isExclusive = false, bool isAutoDelete = false);
  52. /*
  53. * 将队列绑定到交换器
  54. * @param [in] strQueueName:队列名称
  55. * @param [in] strExchange:交换器名称
  56. * @param [in] strBindKey:路由键
  57. * @returns 绑定结果
  58. */
  59. int QueueBind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey);
  60. int QueueUnbind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey);
  61. int QueueDelete(const std::string &strQueueName, int iIfUnused);
  62. /*
  63. * 发布消息
  64. * @param [in] strMessage:需要发送的消息
  65. * @param [in] strExchange:交换器名称
  66. * @param [in] strBindKey:路由键
  67. * @returns 发布结果
  68. */
  69. int Publish(const std::string &strMessage, const std::string &strExchange, const std::string &strRoutekey);
  70. int Consumer(const std::string &strQueueName, RecvCallBack func);
  71. int Consumer(const std::string &strQueueName);
  72. bool GetMesssage(std::string&strMessage);
  73. //
  74. void SetConsum(const bool&bConsum);
  75. private:
  76. int ErrorMsg(amqp_rpc_reply_t x, char const *context);
  77. void StartRecvThread();
  78. void run();
  79. private:
  80. std::string m_strHostname{ "127.0.0.1" }; // amqp主机
  81. int m_iPort{ 5672 }; // amqp端口
  82. std::string m_strUser{ "guest" };
  83. std::string m_strPasswd{ "guest" };
  84. //int m_iChannel = 2;
  85. amqp_channel_t m_iChannel = 2;
  86. amqp_socket_t *m_pSock{ nullptr };
  87. amqp_connection_state_t m_pConn{ nullptr };
  88. std::unique_ptr<std::thread> m_pRecvThread{ nullptr };
  89. bool m_isRun{ false };
  90. std::atomic_bool m_isConsum{ true };
  91. RecvCallBack m_recvFunc{ nullptr };
  92. };