| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- #pragma once
- #include <cstdlib>
- #include <cstring>
- #include <string>
- #include <vector>
- #include <atomic>
- #include <functional>
- #include <amqp.h>
- #include <amqp_tcp_socket.h>
- #include <thread>
- using RecvCallBack = std::function<void(const std::string&)>;
- class CRabbitmqClient
- {
- public:
- CRabbitmqClient();
- ~CRabbitmqClient();
- /*
- * 连接RabbitMQ Server
- * @param [in] strExchange:交换器名称
- * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅)
- * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
- * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
- * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
- * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
- * @returns 设置结果
- */
- int Connect(const std::string &strHostname, int iPort, const std::string &strUser, const std::string &strPasswd);
- int Disconnect();
- /*
- * 声明交换器
- * @param [in] strExchange:交换器名称
- * @param [in] strType:交换器类型 ,常见的如 fanout(广播) direct(点对点) topic(订阅)
- * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
- * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
- * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
- * @param [in] internal: 设置是否内置的, true表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式
- * @returns 声明结果
- */
- int ExchangeDeclare(const std::string &strExchange, const std::string& strType, bool isPassive = false,
- bool isDurable = false, bool isAutoDelete = false, int internal = 0);
- /*
- * 声明队列
- * @param [in] strQueueName:队列名称
- * @param [in] isPassive:检测exchange是否存在。false:不存在会创建,true:不存在不会创建
- * @param [in] isDurable:是否永久化。永久化:将队列信息写入磁盘,RabbitMQ Server重启后,队列不会丢失
- * @param [in] isExclusive:只有自己的用户对该队列可见
- * @param [in] isAutoDelete:当没有队列和交换器绑定时,交换器是否自动删除
- * @returns 声明结果
- */
- int QueueDelare(const std::string& strQueueName, bool isPassive = false,
- bool isDurable = false, bool isExclusive = false, bool isAutoDelete = false);
- /*
- * 将队列绑定到交换器
- * @param [in] strQueueName:队列名称
- * @param [in] strExchange:交换器名称
- * @param [in] strBindKey:路由键
- * @returns 绑定结果
- */
- int QueueBind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey);
- int QueueUnbind(const std::string &strQueueName, const std::string &strExchange, const std::string &strBindKey);
- int QueueDelete(const std::string &strQueueName, int iIfUnused);
- /*
- * 发布消息
- * @param [in] strMessage:需要发送的消息
- * @param [in] strExchange:交换器名称
- * @param [in] strBindKey:路由键
- * @returns 发布结果
- */
- int Publish(const std::string &strMessage, const std::string &strExchange, const std::string &strRoutekey);
- int Consumer(const std::string &strQueueName, RecvCallBack func);
- int Consumer(const std::string &strQueueName);
-
- bool GetMesssage(std::string&strMessage);
- //
- void SetConsum(const bool&bConsum);
- private:
- int ErrorMsg(amqp_rpc_reply_t x, char const *context);
- void StartRecvThread();
- void run();
- private:
- std::string m_strHostname{ "127.0.0.1" }; // amqp主机
- int m_iPort{ 5672 }; // amqp端口
- std::string m_strUser{ "guest" };
- std::string m_strPasswd{ "guest" };
- //int m_iChannel = 2;
- amqp_channel_t m_iChannel = 2;
- amqp_socket_t *m_pSock{ nullptr };
- amqp_connection_state_t m_pConn{ nullptr };
- std::unique_ptr<std::thread> m_pRecvThread{ nullptr };
- bool m_isRun{ false };
- std::atomic_bool m_isConsum{ true };
- RecvCallBack m_recvFunc{ nullptr };
- };
|