Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion ompi/mca/osc/osc.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ typedef int (*ompi_osc_base_module_put_fn_t)(const void *origin_addr,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);

typedef int (*ompi_osc_base_module_put_with_notify_fn_t)(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win);

typedef int (*ompi_osc_base_module_get_fn_t)(void *origin_addr,
size_t origin_count,
Expand All @@ -226,6 +235,15 @@ typedef int (*ompi_osc_base_module_get_fn_t)(void *origin_addr,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);

typedef int (*ompi_osc_base_module_get_with_notify_fn_t)(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win);

typedef int (*ompi_osc_base_module_accumulate_fn_t)(const void *origin_addr,
size_t origin_count,
Expand Down Expand Up @@ -276,6 +294,17 @@ typedef int (*ompi_osc_base_module_rput_fn_t)(const void *origin_addr,
struct ompi_win_t *win,
struct ompi_request_t **request);

typedef int (*ompi_osc_base_module_rput_with_notify_fn_t)(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win,
struct ompi_request_t **request);

typedef int (*ompi_osc_base_module_rget_fn_t)(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
Expand All @@ -286,6 +315,16 @@ typedef int (*ompi_osc_base_module_rget_fn_t)(void *origin_addr,
struct ompi_win_t *win,
struct ompi_request_t **request);

typedef int (*ompi_osc_base_module_rget_with_notify_fn_t)(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win,
struct ompi_request_t **request);

typedef int (*ompi_osc_base_module_raccumulate_fn_t)(const void *origin_addr,
size_t origin_count,
Expand Down Expand Up @@ -371,7 +410,6 @@ typedef int (*ompi_osc_base_module_flush_local_all_fn_t)(struct ompi_win_t *win)
* module structure.
*/

// TODO: extend the struct and add pointers to put/get_with_notify functions
struct ompi_osc_base_module_4_0_0_t {
ompi_osc_base_module_win_shared_query_fn_t osc_win_shared_query;

Expand All @@ -380,14 +418,18 @@ struct ompi_osc_base_module_4_0_0_t {
ompi_osc_base_module_free_fn_t osc_free;

ompi_osc_base_module_put_fn_t osc_put;
ompi_osc_base_module_put_with_notify_fn_t osc_put_with_notify;
ompi_osc_base_module_get_fn_t osc_get;
ompi_osc_base_module_get_with_notify_fn_t osc_get_with_notify;
ompi_osc_base_module_accumulate_fn_t osc_accumulate;
ompi_osc_base_module_compare_and_swap_fn_t osc_compare_and_swap;
ompi_osc_base_module_fetch_and_op_fn_t osc_fetch_and_op;
ompi_osc_base_module_get_accumulate_fn_t osc_get_accumulate;

ompi_osc_base_module_rput_fn_t osc_rput;
ompi_osc_base_module_rput_with_notify_fn_t osc_rput_with_notify;
ompi_osc_base_module_rget_fn_t osc_rget;
ompi_osc_base_module_rget_with_notify_fn_t osc_rget_with_notify;
ompi_osc_base_module_raccumulate_fn_t osc_raccumulate;
ompi_osc_base_module_rget_accumulate_fn_t osc_rget_accumulate;

Expand Down
42 changes: 42 additions & 0 deletions ompi/mca/osc/sm/osc_sm.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ int ompi_osc_sm_put(const void *origin_addr,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);

int ompi_osc_sm_put_with_notify(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win);

int ompi_osc_sm_get(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
Expand All @@ -127,6 +137,16 @@ int ompi_osc_sm_get(void *origin_addr,
struct ompi_datatype_t *target_dt,
struct ompi_win_t *win);

int ompi_osc_sm_get_with_notify(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win);

int ompi_osc_sm_accumulate(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
Expand Down Expand Up @@ -176,6 +196,17 @@ int ompi_osc_sm_rput(const void *origin_addr,
struct ompi_win_t *win,
struct ompi_request_t **request);

int ompi_osc_sm_rput_with_notify(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win,
struct ompi_request_t **request);

int ompi_osc_sm_rget(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
Expand All @@ -186,6 +217,17 @@ int ompi_osc_sm_rget(void *origin_addr,
struct ompi_win_t *win,
struct ompi_request_t **request);

int ompi_osc_sm_rget_with_notify(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win,
struct ompi_request_t **request);

int ompi_osc_sm_raccumulate(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
Expand Down
130 changes: 128 additions & 2 deletions ompi/mca/osc/sm/osc_sm_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,49 @@ ompi_osc_sm_rput(const void *origin_addr,
return OMPI_SUCCESS;
}

int
ompi_osc_sm_rput_with_notify(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win,
struct ompi_request_t **ompi_req)
{
int ret;
ompi_osc_sm_module_t *module =
(ompi_osc_sm_module_t*) win->w_osc_module;
void *remote_address;

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"rput_notify: 0x%lx, %zu, %s, %d, %d, %zu, %s, %d, 0x%lx",
(unsigned long) origin_addr, origin_count,
origin_dt->name, target, (int) target_disp,
target_count, target_dt->name,
notify,
(unsigned long) win));

remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp;

ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt,
remote_address, target_count, target_dt);
if (OMPI_SUCCESS != ret) {
return ret;
}

/* the only valid field of RMA request status is the MPI_ERROR field.
* ompi_request_empty has status MPI_SUCCESS and indicates the request is
* complete. */
*ompi_req = &ompi_request_empty;

opal_atomic_wmb();
opal_atomic_add(&module->notify_counters[target][notify], 1);

return OMPI_SUCCESS;
}

int
ompi_osc_sm_rget(void *origin_addr,
Expand Down Expand Up @@ -99,6 +142,49 @@ ompi_osc_sm_rget(void *origin_addr,
return OMPI_SUCCESS;
}

int
ompi_osc_sm_rget_with_notify(void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win,
struct ompi_request_t **ompi_req)
{
int ret;
ompi_osc_sm_module_t *module =
(ompi_osc_sm_module_t*) win->w_osc_module;
void *remote_address;

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"rget_notify: 0x%lx, %zu, %s, %d, %d, %zu, %s, %d, 0x%lx",
(unsigned long) origin_addr, origin_count,
origin_dt->name, target, (int) target_disp,
target_count, target_dt->name,
notify,
(unsigned long) win));

remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp;

ret = ompi_datatype_sndrcv(remote_address, target_count, target_dt,
origin_addr, origin_count, origin_dt);
if (OMPI_SUCCESS != ret) {
return ret;
}

/* the only valid field of RMA request status is the MPI_ERROR field.
* ompi_request_empty has status MPI_SUCCESS and indicates the request is
* complete. */
*ompi_req = &ompi_request_empty;

opal_atomic_rmb();
opal_atomic_add(&module->notify_counters[target][notify], 1);

return OMPI_SUCCESS;
}

int
ompi_osc_sm_raccumulate(const void *origin_addr,
Expand Down Expand Up @@ -236,6 +322,44 @@ ompi_osc_sm_put(const void *origin_addr,
}


int
ompi_osc_sm_put_with_notify(const void *origin_addr,
size_t origin_count,
struct ompi_datatype_t *origin_dt,
int target,
ptrdiff_t target_disp,
size_t target_count,
struct ompi_datatype_t *target_dt,
int notify,
struct ompi_win_t *win)
{
int ret;
ompi_osc_sm_module_t *module =
(ompi_osc_sm_module_t*) win->w_osc_module;
void *remote_address;

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"put_notify: 0x%lx, %zu, %s, %d, %d, %zu, %s, %d, 0x%lx",
(unsigned long) origin_addr, origin_count,
origin_dt->name, target, (int) target_disp,
target_count, target_dt->name,
notify,
(unsigned long) win));

remote_address = ((char*) (module->bases[target])) + module->disp_units[target] * target_disp;

ret = ompi_datatype_sndrcv((void *)origin_addr, origin_count, origin_dt,
remote_address, target_count, target_dt);
if (OMPI_SUCCESS != ret) {
return ret;
}

opal_atomic_wmb();
opal_atomic_add(&module->notify_counters[target][notify], 1);

return ret;
}

int
ompi_osc_sm_get(void *origin_addr,
size_t origin_count,
Expand Down Expand Up @@ -294,7 +418,9 @@ ompi_osc_sm_get_with_notify(void *origin_addr,

ret = ompi_datatype_sndrcv(remote_address, target_count, target_dt,
origin_addr, origin_count, origin_dt);
// TODO: do the same for put_with_notify
if (OMPI_SUCCESS != ret) {
return ret;
}
opal_atomic_rmb();
opal_atomic_add(&module->notify_counters[target][notify], 1);

Expand Down Expand Up @@ -472,5 +598,5 @@ ompi_osc_sm_fetch_and_op(const void *origin_addr,
done:
opal_atomic_unlock(&module->node_states[target].accumulate_lock);

return OMPI_SUCCESS;;
return OMPI_SUCCESS;
}
6 changes: 4 additions & 2 deletions ompi/mca/osc/sm/osc_sm_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ ompi_osc_sm_component_t mca_osc_sm_component = {
MCA_BASE_COMPONENT_INIT(ompi, osc, sm)


// TODO: extend the struct and add pointers to put/get_with_notify functions
// TODO: extend it to rput/rget_with_notify as well
ompi_osc_sm_module_t ompi_osc_sm_module_template = {
{
.osc_win_shared_query = ompi_osc_sm_shared_query,
Expand All @@ -81,14 +79,18 @@ ompi_osc_sm_module_t ompi_osc_sm_module_template = {
.osc_free = ompi_osc_sm_free,

.osc_put = ompi_osc_sm_put,
.osc_put_with_notify = ompi_osc_sm_put_with_notify,
.osc_get = ompi_osc_sm_get,
.osc_get_with_notify = ompi_osc_sm_get_with_notify,
.osc_accumulate = ompi_osc_sm_accumulate,
.osc_compare_and_swap = ompi_osc_sm_compare_and_swap,
.osc_fetch_and_op = ompi_osc_sm_fetch_and_op,
.osc_get_accumulate = ompi_osc_sm_get_accumulate,

.osc_rput = ompi_osc_sm_rput,
.osc_rput_with_notify = ompi_osc_sm_rput_with_notify,
.osc_rget = ompi_osc_sm_rget,
.osc_rget_with_notify = ompi_osc_sm_rget_with_notify,
.osc_raccumulate = ompi_osc_sm_raccumulate,
.osc_rget_accumulate = ompi_osc_sm_rget_accumulate,

Expand Down