中间件底层,websocket

CRedis.cpp 6.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. #include "CRedis.h"
  2. #include <string.h>
  3. #define LOG_WARN printf
  4. #define LOG_ERROR printf
  5. #define LOG_INFO printf
  6. #define LOG_DEBUG printf
  7. #ifdef WIN32
  8. #define NO_QFORKIMPL //这一行必须加才能正常使用
  9. #include "hiredis\msvs\win32_interop\win32_fixes.h"
  10. #pragma comment(lib,"hiredis.lib")
  11. #pragma comment(lib,"win32_interop.lib")
  12. #endif // WIN32
  13. CRedis::CRedis()
  14. {
  15. this->_connect = nullptr;
  16. this->_reply = nullptr;
  17. m_pHost = nullptr;
  18. m_port = 6379;
  19. m_pPwd = nullptr;
  20. m_isConnect = false;
  21. }
  22. CRedis::~CRedis()
  23. {
  24. if (this->_reply != nullptr)
  25. freeReplyObject(this->_reply);
  26. if (this->_connect != nullptr)
  27. redisFree(this->_connect);
  28. this->_connect = nullptr;
  29. this->_reply = nullptr;
  30. }
  31. bool CRedis::connect(const char * host, int port, const char * password)
  32. {
  33. LOG_INFO("Redis连接[%s][%d][%s]\n", host, port, password);
  34. if (this->_connect != nullptr)
  35. redisFree(this->_connect);
  36. this->_connect = nullptr;
  37. struct timeval timeout = { 1, 500000 }; // 1.5 seconds
  38. this->_connect = redisConnectWithTimeout(host, port, timeout);
  39. if (this->_connect == nullptr || this->_connect->err) {
  40. if (this->_connect) {
  41. LOG_ERROR("Connection error: %s\n", this->_connect->errstr);
  42. redisFree(this->_connect);
  43. }
  44. else {
  45. LOG_ERROR("Connection error: can't allocate redis context\n");
  46. }
  47. return false;
  48. }
  49. if (password == "")
  50. {
  51. m_isConnect = true;
  52. return true;
  53. }
  54. this->_reply = (redisReply *)redisCommand(this->_connect, "AUTH %s", password);
  55. if (this->_reply->type == REDIS_REPLY_ERROR) {
  56. LOG_ERROR("Redis认证失败!\n");
  57. return false;
  58. }
  59. else
  60. {
  61. LOG_INFO("Redis认证成功!\n");
  62. m_isConnect = true;
  63. if (this->m_pHost == nullptr)
  64. {
  65. std::uint32_t len = strlen(host);
  66. this->m_pHost = std::make_unique<char[]>(len + 1);
  67. memcpy(this->m_pHost.get(), host, len);
  68. }
  69. else
  70. {
  71. this->m_pHost.reset();
  72. std::uint32_t len = strlen(host);
  73. this->m_pHost = std::make_unique<char[]>(len + 1);
  74. memcpy(this->m_pHost.get(), host, len);
  75. }
  76. if (this->m_pPwd == nullptr)
  77. {
  78. std::uint32_t len = strlen(password);
  79. this->m_pPwd = std::make_unique<char[]>(len + 1);
  80. memcpy(this->m_pPwd.get(), password, len);
  81. }
  82. else
  83. {
  84. }
  85. this->m_port = port;
  86. /* PING server */
  87. static bool isFirst = true;
  88. if (isFirst)
  89. {
  90. isFirst = false;
  91. __testCon();
  92. }
  93. }
  94. return true;
  95. }
  96. bool CRedis::set(const char * key, const char * value)
  97. {
  98. if (!m_isConnect) return false;
  99. std::unique_lock<std::mutex>lock(mut);
  100. this->_reply = (redisReply*)redisCommand(this->_connect, "SET %s %s", key, value);
  101. int n = strcmp(this->_reply->str, "OK");
  102. freeReplyObject(this->_reply);
  103. LOG_DEBUG("SET %s %s\n", key, value);
  104. if (n == 0) return true;
  105. return false;
  106. }
  107. bool CRedis::set(const char * key, const char * value, int second)
  108. {
  109. if (!m_isConnect) return false;
  110. std::unique_lock<std::mutex>lock(mut);
  111. this->_reply = (redisReply*)redisCommand(this->_connect, "SET %s %s EX %d", key, value, second);
  112. int n = strcmp(this->_reply->str, "OK");
  113. freeReplyObject(this->_reply);
  114. LOG_DEBUG("SET %s %s EX %d\n", key, value, second);
  115. if (n == 0) return true;
  116. return false;
  117. }
  118. void CRedis::push(const char * key, const char * values)
  119. {
  120. if (!m_isConnect) return;
  121. std::unique_lock<std::mutex>lock(mut);
  122. this->_reply = (redisReply*)redisCommand(this->_connect, "PUBLISH %s %s", key, values);
  123. int n = 0;
  124. if (this->_reply->str != nullptr)
  125. n = strcmp(this->_reply->str, "OK");
  126. if (n == 0)
  127. {
  128. LOG_DEBUG("推送成功");
  129. }
  130. else
  131. {
  132. //LOG_WARN("推送失败[%s]", this->_reply->str);
  133. // "MOVED 6368 192.168.0.75:6381"
  134. std::string str = this->_reply->str;
  135. if (str.find("MOVED") == std::string::npos)
  136. {
  137. LOG_WARN("推送失败[%s]", this->_reply->str);
  138. return;
  139. }
  140. auto index = str.find_last_of(' ');
  141. auto spli = str.find(':');
  142. std::string ip = str.substr(index + 1, spli - index - 1);
  143. std::string port = str.substr(spli + 1, str.length() - spli);
  144. LOG_INFO("推送[%s][%s]", ip.c_str(), port.c_str());
  145. freeReplyObject(this->_reply);
  146. connect(ip.c_str(), atoi(port.c_str()), this->m_pPwd.get());
  147. this->_reply = (redisReply*)redisCommand(this->_connect, "PUBLISH %s %s", key, values);
  148. LOG_INFO("推送结果[%s]", this->_reply->str);
  149. }
  150. freeReplyObject(this->_reply);
  151. }
  152. void CRedis::push(const char * key, std::list<const char*> &values)
  153. {
  154. if (!m_isConnect) return;
  155. std::unique_lock<std::mutex>lock(mut);
  156. while (!values.empty())
  157. {
  158. this->_reply = (redisReply*)redisCommand(this->_connect, "LPUSH %s %s", key, values.front());
  159. freeReplyObject(this->_reply);
  160. values.pop_front();
  161. }
  162. }
  163. void CRedis::pull(const char * key, std::list<const char*>& values)
  164. {
  165. if (!m_isConnect) return;
  166. std::unique_lock<std::mutex>lock(mut);
  167. this->_reply = (redisReply*)redisCommand(this->_connect, "LRANGE %s 0 -1", key);
  168. if (this->_reply->type == REDIS_REPLY_ARRAY) {
  169. for (size_t j = 0; j < this->_reply->elements; j++) {
  170. values.emplace_back(this->_reply->element[j]->str);
  171. }
  172. }
  173. freeReplyObject(this->_reply);
  174. }
  175. const char * CRedis::get(const char * key)
  176. {
  177. if (!m_isConnect) return nullptr;
  178. std::unique_lock<std::mutex>lock(mut);
  179. this->_reply = (redisReply*)redisCommand(this->_connect, "GET %s", key);
  180. if (this->_reply->type == REDIS_REPLY_STRING)
  181. {
  182. char* str = this->_reply->str;
  183. freeReplyObject(this->_reply);
  184. return str;
  185. }
  186. else if (this->_reply->type == REDIS_REPLY_NIL)
  187. {
  188. LOG_DEBUG("GET %s: 空\n", key);
  189. freeReplyObject(this->_reply);
  190. return nullptr;
  191. }
  192. else
  193. {
  194. LOG_DEBUG("GET key:%s value:%s type:%d\n", key, this->_reply->str, this->_reply->type);
  195. freeReplyObject(this->_reply);
  196. return nullptr;
  197. }
  198. }
  199. void CRedis::__delKey(const char * key)
  200. {
  201. if (!m_isConnect) return;
  202. //std::unique_lock<std::mutex>lock(mut);
  203. this->_reply = (redisReply*)redisCommand(this->_connect, "DEL %s", key);
  204. freeReplyObject(this->_reply);
  205. }
  206. bool CRedis::__testCon()
  207. {
  208. std::unique_lock<std::mutex>lock(mut);
  209. this->_reply = (redisReply*)redisCommand(this->_connect, "SET %s %s", "test", "test");
  210. int n = strcmp(this->_reply->str, "OK");
  211. std::string str = this->_reply->str;
  212. freeReplyObject(this->_reply);
  213. __delKey("test");
  214. if (n == 0) return true;
  215. auto index = str.find_last_of(' ');
  216. auto spli = str.find(':');
  217. std::string ip = str.substr(index + 1, spli - index - 1);
  218. std::string port = str.substr(spli + 1, str.length() - spli);
  219. return connect(ip.c_str(), atoi(port.c_str()), this->m_pPwd.get());
  220. }
  221. std::shared_ptr<CRedis> CRedis::pInstance(new CRedis);