Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 91 additions & 68 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2519,16 +2519,16 @@ obj_ioc_begin(daos_obj_id_t oid, uint32_t rpc_map_ver, uuid_t pool_uuid,
void
ds_obj_ec_rep_handler(crt_rpc_t *rpc)
{
struct obj_ec_rep_in *oer = crt_req_get(rpc);
struct obj_ec_rep_out *oero = crt_reply_get(rpc);
daos_key_t *dkey;
daos_iod_t *iod;
struct dcs_iod_csums *iod_csums;
struct bio_desc *biod;
daos_recx_t recx = { 0 };
struct obj_io_context ioc;
daos_handle_t ioh = DAOS_HDL_INVAL;
int rc;
struct obj_ec_rep_in *oer = crt_req_get(rpc);
struct obj_ec_rep_out *oero = crt_reply_get(rpc);
daos_key_t *dkey;
daos_iod_t *iod;
struct dcs_iod_csums *iod_csums;
struct bio_desc *biod;
daos_recx_t recx = {0};
struct obj_io_context ioc = {0};
daos_handle_t ioh = DAOS_HDL_INVAL;
int rc;

D_ASSERT(oer != NULL);
D_ASSERT(oero != NULL);
Expand All @@ -2551,6 +2551,8 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)

dkey = (daos_key_t *)&oer->er_dkey;
iod = (daos_iod_t *)&oer->er_iod;

again:
if (iod->iod_nr == 0) /* nothing to replicate, directly remove parity */
goto remove_parity;
iod_csums = oer->er_iod_csums.ca_arrays;
Expand Down Expand Up @@ -2581,6 +2583,11 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)
end:
rc = vos_update_end(ioh, ioc.ioc_map_ver, dkey, rc, &ioc.ioc_io_size, NULL);
if (rc) {
if (rc == -DER_AGAIN) {
ABT_thread_yield();
goto again;
}

D_ERROR(DF_UOID " vos_update_end failed: " DF_RC "\n", DP_UOID(oer->er_oid),
DP_RC(rc));
goto out_agg;
Expand All @@ -2590,6 +2597,11 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)
recx.rx_idx = (oer->er_stripenum * recx.rx_nr) | PARITY_INDICATOR;
rc = vos_obj_array_remove(ioc.ioc_coc->sc_hdl, oer->er_oid, &oer->er_epoch_range, dkey,
&iod->iod_name, &recx);
if (rc == -DER_AGAIN) {
ABT_thread_yield();
goto again;
}

out_agg:
ioc.ioc_coc->sc_ec_agg_updates--;
out:
Expand All @@ -2600,19 +2612,18 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)
void
ds_obj_ec_agg_handler(crt_rpc_t *rpc)
{
struct obj_ec_agg_in *oea = crt_req_get(rpc);
struct obj_ec_agg_out *oeao = crt_reply_get(rpc);
daos_key_t *dkey;
struct bio_desc *biod;
daos_iod_t *iod = &oea->ea_iod;
struct dcs_iod_csums *iod_csums = oea->ea_iod_csums.ca_arrays;

crt_bulk_t parity_bulk = oea->ea_bulk;
daos_recx_t recx = { 0 };
struct obj_io_context ioc;
daos_handle_t ioh = DAOS_HDL_INVAL;
int rc;
int rc1;
struct obj_ec_agg_in *oea = crt_req_get(rpc);
struct obj_ec_agg_out *oeao = crt_reply_get(rpc);
daos_key_t *dkey;
struct bio_desc *biod;
daos_iod_t *iod = &oea->ea_iod;
struct dcs_iod_csums *iod_csums = oea->ea_iod_csums.ca_arrays;
crt_bulk_t parity_bulk = oea->ea_bulk;
daos_recx_t recx = {0};
struct obj_io_context ioc = {0};
daos_handle_t ioh = DAOS_HDL_INVAL;
int rc;
int rc1;

D_ASSERT(oea != NULL);
D_ASSERT(oeao != NULL);
Expand All @@ -2634,6 +2645,8 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc)
ioc.ioc_coc->sc_ec_agg_updates++;

dkey = (daos_key_t *)&oea->ea_dkey;

again:
if (parity_bulk != CRT_BULK_NULL) {
rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oea->ea_oid, oea->ea_epoch_range.epr_hi,
VOS_OF_REBUILD | VOS_OF_CRIT, dkey, 1, iod, iod_csums, 0,
Expand Down Expand Up @@ -2664,6 +2677,11 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc)
end:
rc = vos_update_end(ioh, ioc.ioc_map_ver, dkey, rc, &ioc.ioc_io_size, NULL);
if (rc) {
if (rc == -DER_AGAIN) {
ABT_thread_yield();
goto again;
}

if (rc == -DER_NO_PERM) {
/* Parity already exists, May need a
* different error code.
Expand All @@ -2688,6 +2706,11 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc)
rc1 = vos_obj_array_remove(ioc.ioc_coc->sc_hdl, oea->ea_oid,
&oea->ea_epoch_range, dkey,
&iod->iod_name, &recx);
if (rc1 == -DER_AGAIN) {
ABT_thread_yield();
goto again;
}

if (rc1)
D_ERROR(DF_UOID ": array_remove failed: " DF_RC "\n", DP_UOID(oea->ea_oid),
DP_RC(rc1));
Expand Down Expand Up @@ -2782,18 +2805,18 @@ obj_handle_resend(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, ui
void
ds_obj_tgt_update_handler(crt_rpc_t *rpc)
{
struct obj_rw_in *orw = crt_req_get(rpc);
struct obj_rw_out *orwo = crt_reply_get(rpc);
daos_key_t *dkey = &orw->orw_dkey;
struct obj_io_context ioc;
struct dtx_handle *dth = NULL;
struct dtx_memberships *mbs = NULL;
struct daos_shard_tgt *tgts = NULL;
uint32_t tgt_cnt;
uint32_t opc = opc_get(rpc->cr_opc);
uint32_t dtx_flags = 0;
struct dtx_epoch epoch;
int rc;
struct obj_rw_in *orw = crt_req_get(rpc);
struct obj_rw_out *orwo = crt_reply_get(rpc);
daos_key_t *dkey = &orw->orw_dkey;
struct obj_io_context ioc = {0};
struct dtx_handle *dth = NULL;
struct dtx_memberships *mbs = NULL;
struct daos_shard_tgt *tgts = NULL;
uint32_t tgt_cnt;
uint32_t opc = opc_get(rpc->cr_opc);
uint32_t dtx_flags = 0;
struct dtx_epoch epoch;
int rc;

D_ASSERT(orw != NULL);
D_ASSERT(orwo != NULL);
Expand Down Expand Up @@ -3231,11 +3254,11 @@ ds_obj_rw_handler(crt_rpc_t *rpc)
d_tm_inc_counter(opm->opm_update_restart, 1);
goto again;
case -DER_AGAIN:
ABT_thread_yield();
need_abort = true;
exec_arg.flags |= ORF_RESEND;
flags = ORF_RESEND;
d_tm_inc_counter(opm->opm_update_retry, 1);
ABT_thread_yield();
goto again;
default:
break;
Expand Down Expand Up @@ -3553,14 +3576,14 @@ obj_enum_reply_bulk(crt_rpc_t *rpc)
void
ds_obj_enum_handler(crt_rpc_t *rpc)
{
struct ds_obj_enum_arg enum_arg = { 0 };
struct vos_iter_anchors *anchors = NULL;
struct obj_key_enum_in *oei;
struct obj_key_enum_out *oeo;
struct obj_io_context ioc;
daos_epoch_t epoch = 0;
int opc = opc_get(rpc->cr_opc);
int rc = 0;
struct ds_obj_enum_arg enum_arg = {0};
struct vos_iter_anchors *anchors = NULL;
struct obj_key_enum_in *oei;
struct obj_key_enum_out *oeo;
struct obj_io_context ioc = {0};
daos_epoch_t epoch = 0;
int opc = opc_get(rpc->cr_opc);
int rc = 0;

oei = crt_req_get(rpc);
D_ASSERT(oei != NULL);
Expand Down Expand Up @@ -4131,10 +4154,10 @@ ds_obj_punch_handler(crt_rpc_t *rpc)
flags = ORF_RESEND;
goto again;
case -DER_AGAIN:
ABT_thread_yield();
need_abort = true;
exec_arg.flags |= ORF_RESEND;
flags = ORF_RESEND;
ABT_thread_yield();
goto again;
default:
break;
Expand Down Expand Up @@ -4425,11 +4448,11 @@ ds_obj_query_key_handler(crt_rpc_t *rpc)
void
ds_obj_sync_handler(crt_rpc_t *rpc)
{
struct obj_sync_in *osi;
struct obj_sync_out *oso;
struct obj_io_context ioc;
daos_epoch_t epoch = d_hlc_get();
int rc;
struct obj_sync_in *osi;
struct obj_sync_out *oso;
struct obj_io_context ioc = {0};
daos_epoch_t epoch = d_hlc_get();
int rc;

osi = crt_req_get(rpc);
D_ASSERT(osi != NULL);
Expand Down Expand Up @@ -5557,18 +5580,18 @@ ds_obj_cpd_body_bulk(crt_rpc_t *rpc, struct obj_io_context *ioc, bool leader,
void
ds_obj_cpd_handler(crt_rpc_t *rpc)
{
struct obj_cpd_in *oci = crt_req_get(rpc);
struct obj_cpd_out *oco = crt_reply_get(rpc);
struct daos_cpd_args *dcas = NULL;
struct obj_io_context ioc;
ABT_future future = ABT_FUTURE_NULL;
struct daos_cpd_bulk **dcbs = NULL;
uint32_t dcb_nr = 0;
int tx_count = oci->oci_sub_heads.ca_count;
int rc = 0;
int i;
int j;
bool leader;
struct obj_cpd_in *oci = crt_req_get(rpc);
struct obj_cpd_out *oco = crt_reply_get(rpc);
struct daos_cpd_args *dcas = NULL;
struct obj_io_context ioc = {0};
ABT_future future = ABT_FUTURE_NULL;
struct daos_cpd_bulk **dcbs = NULL;
uint32_t dcb_nr = 0;
int tx_count = oci->oci_sub_heads.ca_count;
int rc = 0;
int i;
int j;
bool leader;

D_ASSERT(oci != NULL);

Expand Down Expand Up @@ -5721,11 +5744,11 @@ ds_obj_cpd_handler(crt_rpc_t *rpc)
void
ds_obj_key2anchor_handler(crt_rpc_t *rpc)
{
struct obj_key2anchor_in *oki;
struct obj_key2anchor_out *oko;
struct obj_io_context ioc;
daos_key_t *akey = NULL;
int rc = 0;
struct obj_key2anchor_in *oki;
struct obj_key2anchor_out *oko;
struct obj_io_context ioc = {0};
daos_key_t *akey = NULL;
int rc = 0;

oki = crt_req_get(rpc);
D_ASSERT(oki != NULL);
Expand Down Expand Up @@ -5903,10 +5926,10 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)
flags = ORF_RESEND;
goto again;
case -DER_AGAIN:
ABT_thread_yield();
need_abort = true;
exec_arg.flags |= ORF_RESEND;
flags = ORF_RESEND;
ABT_thread_yield();
goto again;
default:
break;
Expand Down
9 changes: 9 additions & 0 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,8 @@ __migrate_fetch_update_bulk(struct migrate_one *mrone, daos_handle_t oh,
mrone_recx_daos2_vos(mrone, iods, iod_num);

D_ASSERT(iod_num <= OBJ_ENUM_UNPACK_MAX_IODS);

again:
rc = vos_update_begin(ds_cont->sc_hdl, mrone->mo_oid, update_eph, VOS_OF_REBUILD,
&mrone->mo_dkey, iod_num, iods, mrone->mo_iods_csums,
0, &ioh, NULL);
Expand Down Expand Up @@ -1646,6 +1648,13 @@ __migrate_fetch_update_bulk(struct migrate_one *mrone, daos_handle_t oh,
if (rc == 0)
rc = rc1;

if (rc == -DER_AGAIN) {
D_WARN(DF_RB ": migrating dkey " DF_KEY " need to retry\n", DP_RB_MRO(mrone),
DP_KEY(&mrone->mo_dkey));
ABT_thread_yield();
goto again;
}

if (rc)
DL_ERROR(rc, DF_RB ": " DF_UOID " migrate error", DP_RB_MRO(mrone),
DP_UOID(mrone->mo_oid));
Expand Down
8 changes: 4 additions & 4 deletions src/vos/vos_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ vos_tx_begin(struct dtx_handle *dth, struct umem_instance *umm, bool is_sysdb,
/* CPU may yield when umem_tx_begin, related object maybe evicted during that. */
rc = umem_tx_begin(umm, vos_txd_get(is_sysdb));
if (rc == 0 && obj != NULL && unlikely(vos_obj_is_evicted(obj))) {
D_DEBUG(DB_IO, "Obj " DF_UOID " is evicted(1), need to restart TX.\n",
D_DEBUG(DB_IO, "Obj " DF_UOID " is evicted(1), need to retry.\n",
DP_UOID(obj->obj_id));
rc = umem_tx_end(umm, -DER_TX_RESTART);
rc = umem_tx_end(umm, -DER_AGAIN);
}

return rc;
Expand All @@ -247,10 +247,10 @@ vos_tx_begin(struct dtx_handle *dth, struct umem_instance *umm, bool is_sysdb,
if (rc == 0) {
/* CPU may yield when umem_tx_begin, related object maybe evicted during that. */
if (obj != NULL && unlikely(vos_obj_is_evicted(obj))) {
D_DEBUG(DB_IO, "Obj " DF_UOID " is evicted(2), need to restart TX.\n",
D_DEBUG(DB_IO, "Obj " DF_UOID " is evicted(2), need to retry.\n",
DP_UOID(obj->obj_id));

return umem_tx_end(umm, -DER_TX_RESTART);
return umem_tx_end(umm, -DER_AGAIN);
}

dth->dth_local_tx_started = 1;
Expand Down
4 changes: 2 additions & 2 deletions src/vos/vos_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -2569,10 +2569,10 @@ vos_update_end(daos_handle_t ioh, uint32_t pm_ver, daos_key_t *dkey, int err,

D_ASSERT(ioc->ic_obj != NULL);
if (unlikely(vos_obj_is_evicted(ioc->ic_obj))) {
D_DEBUG(DB_IO, "Obj " DF_UOID " is evicted during update, need to restart TX.\n",
D_DEBUG(DB_IO, "Obj " DF_UOID " is evicted during update, need to retry.\n",
DP_UOID(ioc->ic_oid));

D_GOTO(abort, err = -DER_TX_RESTART);
D_GOTO(abort, err = -DER_AGAIN);
}

err = vos_ts_set_add(ioc->ic_ts_set, ioc->ic_cont->vc_ts_idx, NULL, 0);
Expand Down
Loading