-
Notifications
You must be signed in to change notification settings - Fork 954
Description
目前使用3.6.0版本,使用redis_client_pipeline方式进行数据写入,目前出现redis集群一个节点发生主从切换之后,出现异常无法写入,其他采用redis_client_cluster方式的写入正常,redis_client_pipeline在节点挂了或者节点主从切换之后,是否能够识别节点到节点变更并重新设置,帮忙分析下,谢谢
使用方式:
g_pRedisPipeline = new RedisPipeline();
nRet = g_pRedisPipeline->Init((UtilIniFileAgent*)pIniFileAgent);
if (0 != nRet)
{
return ERROR_CODE_COM_FAILED;
}
void StatusRedisWriter::DealSaveRedisDataList(bool pipeline)
{
XLock(m_SaveRedisDataLock);
unsigned int nSize = XQueSize(m_qSaveRedisDataList);
XUnLock(m_SaveRedisDataLock);
if (nSize < m_nPipelineMax)
{
Sleep(50);
}
std::vector<RedisPipelineCmd*> vPipelineCmd;
for (int i = 0; i < m_nPipelineMax; ++i)
{
// 加锁从队列取出一个数据
XLock(m_SaveRedisDataLock);
unsigned int nSize = XQueSize(m_qSaveRedisDataList);
if (nSize == 0)
{
XUnLock(m_SaveRedisDataLock);
break;
}
DCMDataSaveInfo stDataInfo;
XQueFront(m_qSaveRedisDataList, stDataInfo);
XQuePop(m_qSaveRedisDataList);
XUnLock(m_SaveRedisDataLock);
// 构造 redis key
string strKey = "OBD:";
strKey.append(stDataInfo.m_strDCMNo);
// 构造 HMSET 命令行
std::vector<std::string> vCmdline;
vCmdline.push_back("HMSET");
vCmdline.push_back(strKey);
vCmdline.push_back("UpLoadData");
vCmdline.push_back(stDataInfo.m_pData);
// 队列数据使用完毕,可释放
FreeSaveData(stDataInfo);
// 批量准备命令
RedisPipelineCmd* pCmd = g_pRedisPipeline->PushCommand(vCmdline);
vPipelineCmd.push_back(pCmd);
}
if (vPipelineCmd.empty())
{
LY_LOG_ERROR(g_LogHandle, "bug! m_qSaveRedisDataList size 0");
return;
}
// 批量等待命令
for (auto pCmd : vPipelineCmd)
{
std::string strResult;
auto acl_result = g_pRedisPipeline->WaitResult(pCmd, strResult);
if (strResult != "OK\r\n")
{
LY_LOG_ERROR(g_LogHandle, "fail to write redis for key %s", pCmd->GetKey().c_str());
}
delete pCmd;
}
}
封装的RedisPipeline.cpp代码如下:
#include "RedisPipeline.h"
#include "Util/UtilIniStruct.hpp"
#include "Util/UtilString.h"
#include "acl/acl_cpp/lib_acl.hpp"
// #include "acl/acl_cpp/redis/redis.hpp"
// #include "acl/acl_cpp/redis/redis_client_pipeline.hpp"
bool ConfigRedis::LoadIni(UtilIniFileAgent* pIniFileAgent, const char* pNodeName)
{
LOAD_INI_KEY(AddrAndPort);
LOAD_INI_KEY(Password);
LOAD_INI_KEY(PoolNum);
LOAD_INI_KEY(Cluster);
LOAD_INI_KEY(Pipeline);
return CheckIni();
}
bool onfigRedis::CheckIni()
{
return !AddrAndPort.empty() && !Password.empty();
}
RedisPipeline::RedisPipeline()
{
}
RedisPipeline::~RedisPipeline()
{
if (m_pRedisPipeline)
{
m_pRedisPipeline->stop_thread();
delete m_pRedisPipeline;
m_pRedisPipeline = nullptr;
}
}
int RedisPipeline::Init(UtilIniFileAgent* pIniFileAgent)
{
if (!m_stConfig.LoadIni(pIniFileAgent, "Redis"))
{
LOG_ERROR("fial to load redis config");
return -1;
}
return Init();
}
int RedisPipeline::Init(const LYConfigRedis& stConfig)
{
m_stConfig = stConfig;
return Init();
}
int RedisPipeline::Init()
{
if (m_bIsInited)
{
return 0;
}
// 对于 pipeline ,只用到前两个配置项 addr 与 password
// addr 是单个 ip:port
m_pRedisPipeline = new acl::redis_client_pipeline(m_stConfig.AddrAndPort.c_str());
m_pRedisPipeline->set_password(m_stConfig.Password.c_str());
m_pRedisPipeline->start_thread();
m_bIsInited = true;
return 0;
}
std::unique_ptracl::redis RedisPipeline::get_redis_cmd()
{
std::unique_ptracl::redis cmd(new acl::redis);
cmd->set_pipeline(m_pRedisPipeline);
return cmd;
}
RedisPipelineCmd* RedisPipeline::PushCommand(const std::string& strCmdline)
{
std::vectorstd::string vCmdline;
SplitBySpace(strCmdline, vCmdline);
return PushCommand(vCmdline);
}
RedisPipelineCmd* RedisPipeline::PushCommand(std::vectorstd::string& vCmdline)
{
RedisPipelineCmd* pCmd = new RedisPipelineCmd(this);
pCmd->BuildRequest(vCmdline);
return pCmd;
}
const acl::redis_result* RedisPipeline::WaitResult(RedisPipelineCmd* pCmd, std::string& strResult)
{
const acl::redis_result* pResult = pCmd->WaitResult();
if (pResult)
{
acl::string str;
pResult->to_string(str);
strResult.assign(str.c_str(), str.size());
}
return pResult;
}
/* ************************************************************ */
RedisPipelineCmd::~RedisPipelineCmd()
{
if (m_pRedisCommand != nullptr)
{
delete m_pRedisCommand;
m_pRedisCommand = nullptr;
}
}
void RedisPipelineCmd::BuildRequest(const std::string& strCmdline)
{
std::vectorstd::string vCmdline;
SplitBySpace(strCmdline, vCmdline);
return BuildRequest(vCmdline);
}
void RedisPipelineCmd::BuildRequest(std::vectorstd::string& vCmdline)
{
ASSERT_RET(vCmdline.size() > 1);
if (m_pRedisCommand == nullptr)
{
m_pRedisCommand = new acl::redis;
}
acl::redis_client_pipeline* pipeline = m_pRedisPipeline->get_acl_pipeline();
ASSERT_RET(pipeline);
m_pRedisCommand->set_pipeline(pipeline);
// 预设 slot 避免 redirect 开销
m_pRedisCommand->hash_slot(vCmdline[1].c_str());
m_vCmdline.clear();
m_vCmdline.swap(vCmdline);
size_t m_argc = m_vCmdline.size();
m_argv.resize(m_argc);
m_lens.resize(m_argc);
for (int i = 0; i < m_argc; ++i)
{
m_argv[i] = m_vCmdline[i].c_str();
m_lens[i] = m_vCmdline[i].size();
}
// pipeline 模式下,传入的 argv 指针须较长的生命周期
m_pRedisCommand->build_request(m_argc, &m_argv[0], &m_lens[0]);
acl::redis_pipeline_message& msg = m_pRedisCommand->get_pipeline_message();
pipeline->push(&msg);
}
const acl::redis_result* RedisPipelineCmd::WaitResult()
{
acl::redis_pipeline_message& msg = m_pRedisCommand->get_pipeline_message();
return msg.wait();
}
std::string RedisPipelineCmd::GetKey() const
{
if (m_vCmdline.size() > 2)
{
return m_vCmdline[1];
}
return "";
}