Skip to content

redis_client_pipeline 节点主从切换问题 #367

@41405465

Description

@41405465

目前使用3.6.0版本,使用redis_client_pipeline方式进行数据写入,目前出现redis集群一个节点发生主从切换之后,出现异常无法写入,其他采用redis_client_cluster方式的写入正常,redis_client_pipeline在节点挂了或者节点主从切换之后,是否能够识别节点到节点变更并重新设置,帮忙分析下,谢谢

使用方式:
g_pRedisPipeline = new RedisPipeline();
nRet = g_pRedisPipeline->Init((UtilIniFileAgent*)pIniFileAgent);
if (0 != nRet)
{
return ERROR_CODE_COM_FAILED;
}

void StatusRedisWriter::DealSaveRedisDataList(bool pipeline)
{
XLock(m_SaveRedisDataLock);
unsigned int nSize = XQueSize(m_qSaveRedisDataList);
XUnLock(m_SaveRedisDataLock);
if (nSize < m_nPipelineMax)
{
Sleep(50);
}

std::vector<RedisPipelineCmd*> vPipelineCmd;
for (int i = 0; i < m_nPipelineMax; ++i)
{
    // 加锁从队列取出一个数据
    XLock(m_SaveRedisDataLock);
    unsigned int nSize = XQueSize(m_qSaveRedisDataList);
    if (nSize == 0)
    {
        XUnLock(m_SaveRedisDataLock);
        break;
    }

    DCMDataSaveInfo stDataInfo;
    XQueFront(m_qSaveRedisDataList, stDataInfo);
    XQuePop(m_qSaveRedisDataList);
    XUnLock(m_SaveRedisDataLock);

    // 构造 redis key
    string strKey = "OBD:";
    strKey.append(stDataInfo.m_strDCMNo);

    // 构造 HMSET 命令行
    std::vector<std::string> vCmdline;
    vCmdline.push_back("HMSET");
    vCmdline.push_back(strKey);
    vCmdline.push_back("UpLoadData");
    vCmdline.push_back(stDataInfo.m_pData);

    // 队列数据使用完毕,可释放
    FreeSaveData(stDataInfo);

    // 批量准备命令
    RedisPipelineCmd* pCmd = g_pRedisPipeline->PushCommand(vCmdline);
    vPipelineCmd.push_back(pCmd);
}

if (vPipelineCmd.empty())
{
    LY_LOG_ERROR(g_LogHandle, "bug! m_qSaveRedisDataList size 0");
    return;
}

// 批量等待命令
for (auto pCmd : vPipelineCmd)
{
    std::string strResult;
    auto acl_result = g_pRedisPipeline->WaitResult(pCmd, strResult);
    if (strResult != "OK\r\n")
    {
        LY_LOG_ERROR(g_LogHandle, "fail to write redis for key %s", pCmd->GetKey().c_str());
    }
    delete pCmd;
}

}

封装的RedisPipeline.cpp代码如下:

#include "RedisPipeline.h"

#include "Util/UtilIniStruct.hpp"
#include "Util/UtilString.h"

#include "acl/acl_cpp/lib_acl.hpp"
// #include "acl/acl_cpp/redis/redis.hpp"
// #include "acl/acl_cpp/redis/redis_client_pipeline.hpp"

bool ConfigRedis::LoadIni(UtilIniFileAgent* pIniFileAgent, const char* pNodeName)
{
LOAD_INI_KEY(AddrAndPort);
LOAD_INI_KEY(Password);
LOAD_INI_KEY(PoolNum);
LOAD_INI_KEY(Cluster);
LOAD_INI_KEY(Pipeline);

return CheckIni();

}

bool onfigRedis::CheckIni()
{
return !AddrAndPort.empty() && !Password.empty();
}

RedisPipeline::RedisPipeline()
{
}

RedisPipeline::~RedisPipeline()
{
if (m_pRedisPipeline)
{
m_pRedisPipeline->stop_thread();
delete m_pRedisPipeline;
m_pRedisPipeline = nullptr;
}
}

int RedisPipeline::Init(UtilIniFileAgent* pIniFileAgent)
{
if (!m_stConfig.LoadIni(pIniFileAgent, "Redis"))
{
LOG_ERROR("fial to load redis config");
return -1;
}
return Init();
}

int RedisPipeline::Init(const LYConfigRedis& stConfig)
{
m_stConfig = stConfig;
return Init();
}

int RedisPipeline::Init()
{
if (m_bIsInited)
{
return 0;
}

// 对于 pipeline ,只用到前两个配置项 addr 与 password
// addr 是单个 ip:port
m_pRedisPipeline = new acl::redis_client_pipeline(m_stConfig.AddrAndPort.c_str());
m_pRedisPipeline->set_password(m_stConfig.Password.c_str());
m_pRedisPipeline->start_thread();

m_bIsInited = true;
return 0;

}

std::unique_ptracl::redis RedisPipeline::get_redis_cmd()
{
std::unique_ptracl::redis cmd(new acl::redis);

cmd->set_pipeline(m_pRedisPipeline);
return cmd;

}

RedisPipelineCmd* RedisPipeline::PushCommand(const std::string& strCmdline)
{
std::vectorstd::string vCmdline;
SplitBySpace(strCmdline, vCmdline);
return PushCommand(vCmdline);
}

RedisPipelineCmd* RedisPipeline::PushCommand(std::vectorstd::string& vCmdline)
{
RedisPipelineCmd* pCmd = new RedisPipelineCmd(this);
pCmd->BuildRequest(vCmdline);
return pCmd;
}

const acl::redis_result* RedisPipeline::WaitResult(RedisPipelineCmd* pCmd, std::string& strResult)
{
const acl::redis_result* pResult = pCmd->WaitResult();
if (pResult)
{
acl::string str;
pResult->to_string(str);
strResult.assign(str.c_str(), str.size());
}
return pResult;
}

/* ************************************************************ */

RedisPipelineCmd::~RedisPipelineCmd()
{
if (m_pRedisCommand != nullptr)
{
delete m_pRedisCommand;
m_pRedisCommand = nullptr;
}
}

void RedisPipelineCmd::BuildRequest(const std::string& strCmdline)
{
std::vectorstd::string vCmdline;
SplitBySpace(strCmdline, vCmdline);
return BuildRequest(vCmdline);
}

void RedisPipelineCmd::BuildRequest(std::vectorstd::string& vCmdline)
{
ASSERT_RET(vCmdline.size() > 1);

if (m_pRedisCommand == nullptr)
{
    m_pRedisCommand = new acl::redis;
}

acl::redis_client_pipeline* pipeline = m_pRedisPipeline->get_acl_pipeline();
ASSERT_RET(pipeline);

m_pRedisCommand->set_pipeline(pipeline);
// 预设 slot 避免 redirect 开销
m_pRedisCommand->hash_slot(vCmdline[1].c_str());

m_vCmdline.clear();
m_vCmdline.swap(vCmdline);

size_t m_argc = m_vCmdline.size();
m_argv.resize(m_argc);
m_lens.resize(m_argc);
for (int i = 0; i < m_argc; ++i)
{
    m_argv[i] = m_vCmdline[i].c_str();
    m_lens[i] = m_vCmdline[i].size();
}

// pipeline 模式下,传入的 argv 指针须较长的生命周期
m_pRedisCommand->build_request(m_argc, &m_argv[0], &m_lens[0]);
acl::redis_pipeline_message& msg = m_pRedisCommand->get_pipeline_message();
pipeline->push(&msg);

}

const acl::redis_result* RedisPipelineCmd::WaitResult()
{
acl::redis_pipeline_message& msg = m_pRedisCommand->get_pipeline_message();
return msg.wait();
}

std::string RedisPipelineCmd::GetKey() const
{
if (m_vCmdline.size() > 2)
{
return m_vCmdline[1];
}
return "";
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions