Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
20a5007
Improve DLPack support for external tensor consumption
JanuszL Mar 18, 2026
b207209
Address review feedback on DLPack fast path
JanuszL Apr 17, 2026
133753f
Broaden DLPack fast-path exception handling in Batch
JanuszL Apr 17, 2026
0e4cf4b
Fix test name and docstring for CPU torch tensor fast path
JanuszL Apr 17, 2026
61d1b4d
Add tests for fast-path edge cases and GPU CAI fallback
JanuszL Apr 17, 2026
d83d89c
Fix E501 line-too-long in test docstrings
JanuszL Apr 17, 2026
b4d77ab
Fix stream-handshake mismatch, read-only alias, and GPU DLPack fallback
JanuszL Apr 17, 2026
131e6f6
Fix set_order missing on bulk GPU DLPack path and tighten TypeError f…
JanuszL Apr 17, 2026
64ffbd3
Assert device_id in mixed-GPU tests to make fallback policy explicit
JanuszL Apr 17, 2026
1a57ea2
Preflight __dlpack_device__ before __dlpack__ to fix stream handshake…
JanuszL Apr 17, 2026
0c6e13a
Make UserStream::GetStream(size_t) public to fix build error
JanuszL Apr 18, 2026
42b75f5
Allow mixed pinned/non-pinned samples in non-contiguous GPU TensorList
JanuszL Apr 19, 2026
7117d4e
Fix stream=None for tensor 0 and DLPack capsule double-call risk
JanuszL Apr 19, 2026
65fa2ab
Adjust PartialSetupSetMultiGPU test for relaxed GPU pinned check
JanuszL Apr 20, 2026
ae8806b
Fix missing set_order on GPU TensorList in TensorListFromListOfTensors
JanuszL Apr 20, 2026
062f990
Remove no-op wait_order from TensorListFromListOfTensors
JanuszL Apr 20, 2026
dbf0e08
Support kDLCUDAHost (pinned CPU) in DLPack fast path
JanuszL Apr 20, 2026
ed48b6e
Fix DLPack single-use hazard and CPU bulk preflight
JanuszL Apr 20, 2026
13e1d45
Fix missing space in TensorListFromListOfTensors error message
JanuszL Apr 20, 2026
39a59b9
Fix line length in TensorListFromListOfTensors error message
JanuszL Apr 20, 2026
b9edd8e
Fix TensorListFromListOfTensors error message to match test expectations
JanuszL Apr 20, 2026
720086b
Fix
JanuszL Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dali/pipeline/data/tensor_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,11 @@ void TensorList<Backend>::VerifySampleShareCompatibility(DALIDataType type, int
this->GetLayout(), ", new: ", layout, " or come with empty layout ",
error_suffix));

DALI_ENFORCE(this->is_pinned() == pinned,
make_string("Sample must have the same pinned status as target batch, current: ",
this->is_pinned(), ", new: ", pinned, error_suffix));
if constexpr (std::is_same_v<Backend, CPUBackend>) {
DALI_ENFORCE(this->is_pinned() == pinned,
make_string("Sample must have the same pinned status as target batch, current: ",
this->is_pinned(), ", new: ", pinned, error_suffix));
}

DALI_ENFORCE(this->device_id() == device_id,
make_string("Sample must have the same device id as target batch, current: ",
Expand Down
9 changes: 7 additions & 2 deletions dali/pipeline/data/tensor_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,18 @@ TYPED_TEST(TensorListSuite, SetupLikeMultiGPU) {
template <typename Backend>
std::vector<std::pair<std::string, std::function<void(TensorList<Backend> &)>>> SetRequiredSetters(
int sample_dim, DALIDataType type, TensorLayout layout, bool pinned, int device_id) {
return {
std::vector<std::pair<std::string, std::function<void(TensorList<Backend> &)>>> result = {
{"sample dim", [sample_dim](TensorList<Backend> &t) { t.set_sample_dim(sample_dim); }},
{"type", [type](TensorList<Backend> &t) { t.set_type(type); }},
{"layout", [layout](TensorList<Backend> &t) { t.SetLayout(layout); }},
{"device id", [device_id](TensorList<Backend> &t) { t.set_device_id(device_id); }},
{"pinned", [pinned](TensorList<Backend> &t) { t.set_pinned(pinned); }},
};
// GPU TensorList allows mixed pinned/non-pinned samples in non-contiguous mode
// (pinned tensors are valid GPU-accessible staging buffers).
if constexpr (std::is_same_v<Backend, CPUBackend>) {
result.push_back({"pinned", [pinned](TensorList<Backend> &t) { t.set_pinned(pinned); }});
}
return result;
}

TYPED_TEST(TensorListSuite, PartialSetupSetMultiGPU) {
Expand Down
285 changes: 279 additions & 6 deletions dali/python/backend_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ void FillTensorFromDlPack(
DALI_ENFORCE((std::is_same<SrcBackend, GPUBackend>::value &&
dl_tensor.device.device_type == kDLCUDA) ||
(std::is_same<SrcBackend, CPUBackend>::value &&
dl_tensor.device.device_type == kDLCPU),
(dl_tensor.device.device_type == kDLCPU ||
dl_tensor.device.device_type == kDLCUDAHost)),
"DLPack device type doesn't match Tensor type");

const TypeInfo &dali_type = TypeTable::GetTypeInfo(ToDALIType(dl_tensor.dtype));
Expand Down Expand Up @@ -1161,6 +1162,222 @@ std::unique_ptr<Tensor<Backend> > TensorListGetItemImpl(TensorList<Backend> &t,
return ptr;
}

std::shared_ptr<TensorList<CPUBackend>> TensorListFromListOfDLPackObjectsCPU(
py::list &list_of_objects,
const std::optional<std::string> &layout,
bool contiguous) {
DomainTimeRange range("TensorListFromListOfDLPackObjectsCPU", kCPUTensorColor);

if (list_of_objects.empty()) {
auto ptr = std::make_shared<TensorList<CPUBackend>>();
if (layout.has_value()) {
ptr->set_sample_dim(layout->length());
ptr->SetLayout(*layout);
}
return ptr;
}

// Preflight: validate that all elements are CPU tensors before consuming any capsule.
// __dlpack__() is single-use, so device mismatches must be caught here — not after
// the handshake has started. Throws py::value_error (caught by the Python fast-path
// except clause) rather than letting DALI_ENFORCE produce a RuntimeError.
for (size_t i = 0; i < list_of_objects.size(); ++i) {
py::object obj = list_of_objects[i];
if (!py::hasattr(obj, "__dlpack__"))
throw py::type_error(make_string(
"Object at position ", i, " does not support the DLPack protocol."));
if (py::hasattr(obj, "__dlpack_device__")) {
py::tuple dev_info = obj.attr("__dlpack_device__")();
int dev_type = dev_info[0].cast<int>();
if (dev_type != kDLCPU && dev_type != kDLCUDAHost)
throw py::value_error(make_string(
"All tensors must reside in CPU memory. "
"Tensor at position ", i, " has DLPack device type ", dev_type, "."));
}
}

std::optional<TensorList<CPUBackend>> non_contiguous_tmp;
std::shared_ptr<TensorList<CPUBackend>> non_contiguous_out;

if (contiguous)
non_contiguous_tmp = TensorList<CPUBackend>(list_of_objects.size());
else
non_contiguous_out = std::make_shared<TensorList<CPUBackend>>(list_of_objects.size());

TensorList<CPUBackend> &non_contiguous = contiguous
? non_contiguous_tmp.value()
: *non_contiguous_out;

int expected_type = -2;

{
DomainTimeRange build_range("Build initial list", kCPUTensorColor);
for (size_t i = 0; i < list_of_objects.size(); ++i) {
py::object obj = list_of_objects[i];
py::capsule capsule = obj.attr("__dlpack__")();
Tensor<CPUBackend> tensor;
FillTensorFromDlPack(capsule, &tensor, i == 0 ? layout : std::optional<std::string>{});

if (i == 0) {
non_contiguous.SetupLike(tensor);
}

DALIDataType cur_type = tensor.type();
if (expected_type == -2) {
expected_type = cur_type;
} else if (expected_type != static_cast<int>(cur_type)) {
throw py::type_error(make_string(
"Tensors cannot have different data types. Tensor at position ", i, " has type '",
cur_type, "' expected to have type '", DALIDataType(expected_type), "'."));
}
non_contiguous.SetSample(i, tensor);
}
}

if (!contiguous) {
SetLayout(non_contiguous, layout, false);
return non_contiguous_out;
}

{
DomainTimeRange copy_range("Copy to contiguous", kCPUTensorColor);
auto contiguous_out = std::make_shared<TensorList<CPUBackend>>();
contiguous_out->SetContiguity(BatchContiguity::Contiguous);
contiguous_out->Copy(non_contiguous, AccessOrder::host());
SetLayout(*contiguous_out, layout, false);
return contiguous_out;
}
}

std::shared_ptr<TensorList<GPUBackend>> TensorListFromListOfDLPackObjects(
py::list &list_of_objects,
const std::optional<std::string> &layout,
py::object stream,
bool contiguous) {
DomainTimeRange range("TensorListFromListOfDLPackObjects", kGPUTensorColor);

if (list_of_objects.empty()) {
auto ptr = std::make_shared<TensorList<GPUBackend>>();
if (layout.has_value()) {
ptr->set_sample_dim(layout->length());
ptr->SetLayout(*layout);
}
return ptr;
}

AccessOrder copy_order = AccessOrder::host();
if (!stream.is_none())
copy_order = AccessOrderFromPythonStreamObj(stream);

// Preflight pass: validate device homogeneity and, when no stream was provided,
// resolve the consumer stream via UserStream before any __dlpack__() call.
// __dlpack__() is single-use (the capsule is consumed), so device/stream
// mismatches must be caught here — not after the handshake has already happened.
int expected_device_id = -1;
for (size_t i = 0; i < list_of_objects.size(); ++i) {
py::object obj = list_of_objects[i];
if (!py::hasattr(obj, "__dlpack__"))
throw py::type_error(make_string(
"Object at position ", i, " does not support the DLPack protocol."));
if (py::hasattr(obj, "__dlpack_device__")) {
py::tuple dev_info = obj.attr("__dlpack_device__")();
int dev_id = dev_info[1].cast<int>();
if (expected_device_id == -1) {
expected_device_id = dev_id;
// When no explicit stream was given, look up the UserStream for this device
// so every __dlpack__() call below uses the same concrete stream handle.
if (copy_order == AccessOrder::host())
copy_order = AccessOrder(UserStream::Get()->GetStream(
static_cast<size_t>(expected_device_id)));
} else if (dev_id != expected_device_id) {
throw py::value_error(make_string(
"All tensors must reside on the same GPU device. "
"Tensor at position ", i, " is on GPU ", dev_id,
" but expected GPU ", expected_device_id, "."));
}
}
}

// If no __dlpack_device__ was found and no explicit stream was provided, fall back to the
// current CUDA device so every __dlpack__() call — including tensor 0 — receives a concrete
// consumer stream rather than None.
if (copy_order == AccessOrder::host()) {
int current_dev = -1;
CUDA_CALL(cudaGetDevice(&current_dev));
copy_order = AccessOrder(UserStream::Get()->GetStream(static_cast<size_t>(current_dev)));
}

// Derive the DLPack consumer-stream handle from copy_order so the producer always
// synchronizes with the exact same stream that DALI will use for the copy.
py::object stream_handle = py::none();
if (copy_order.is_device()) {
stream_handle = py::int_(reinterpret_cast<int64_t>(copy_order.stream()));
}

std::optional<TensorList<GPUBackend>> non_contiguous_tmp;
std::shared_ptr<TensorList<GPUBackend>> non_contiguous_out;

if (contiguous)
non_contiguous_tmp = TensorList<GPUBackend>(list_of_objects.size());
else
non_contiguous_out = std::make_shared<TensorList<GPUBackend>>(list_of_objects.size());

TensorList<GPUBackend> &non_contiguous = contiguous
? non_contiguous_tmp.value()
: *non_contiguous_out;

int expected_type = -2;

{
DomainTimeRange build_range("Build initial list", kGPUTensorColor);
for (size_t i = 0; i < list_of_objects.size(); ++i) {
py::object obj = list_of_objects[i];

py::capsule capsule = obj.attr("__dlpack__")("stream"_a = stream_handle);
Tensor<GPUBackend> tensor;
FillTensorFromDlPack(capsule, &tensor, i == 0 ? layout : std::optional<std::string>{});

if (i == 0) {
non_contiguous.SetupLike(tensor);
expected_device_id = tensor.device_id();
} else if (tensor.device_id() != expected_device_id) {
throw py::value_error(make_string(
"All tensors must reside on the same GPU device. "
"Tensor at position ", i, " is on GPU ", tensor.device_id(),
" but expected GPU ", expected_device_id, "."));
}

DALIDataType cur_type = tensor.type();
if (expected_type == -2) {
expected_type = cur_type;
} else if (expected_type != static_cast<int>(cur_type)) {
throw py::type_error(make_string(
"Tensors cannot have different data types. Tensor at position ", i, " has type '",
cur_type, "' expected to have type '", DALIDataType(expected_type), "'."));
}
non_contiguous.SetSample(i, tensor);
}
Comment thread
JanuszL marked this conversation as resolved.
}

if (!contiguous) {
SetLayout(non_contiguous, layout, false);
// Record which stream holds the data so downstream consumers can synchronize correctly.
non_contiguous_out->set_order(copy_order);
return non_contiguous_out;
}

{
DomainTimeRange copy_range("Copy to contiguous", kGPUTensorColor);
auto contiguous_out = std::make_shared<TensorList<GPUBackend>>();
contiguous_out->SetContiguity(BatchContiguity::Contiguous);
contiguous_out->set_pinned(non_contiguous.is_pinned());
contiguous_out->Copy(non_contiguous, copy_order);
SetLayout(*contiguous_out, layout, false);
contiguous_out->set_order(copy_order);
return contiguous_out;
}
}

template <typename Backend>
std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
py::list &list_of_tensors,
Expand Down Expand Up @@ -1191,8 +1408,8 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
: *non_contiguous_out;

int expected_type = -2;
int expected_device_id = -1;

AccessOrder wait_order = AccessOrder::host();
AccessOrder copy_order = AccessOrder::host();

{
Expand All @@ -1205,6 +1422,7 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
non_contiguous.SetupLike(t);
if constexpr (std::is_same_v<Backend, GPUBackend>) {
copy_order = AccessOrder(UserStream::Get()->GetStream(t));
expected_device_id = t.device_id();
}
}
DALIDataType cur_type = t.type();
Expand All @@ -1216,19 +1434,29 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
"Tensors cannot have different data types. Tensor at position ", i, " has type '",
cur_type, "' expected to have type '", DALIDataType(expected_type), "'."));
}
if constexpr (std::is_same_v<Backend, GPUBackend>) {
if (t.device_id() != expected_device_id) {
throw py::value_error(make_string(
"All tensors must reside on the same GPU device. "
"Tensor at position ", i, " is on GPU ", t.device_id(),
" but expected GPU ", expected_device_id, "."));
}
}
non_contiguous.SetSample(i, t);
} catch (const py::type_error &) {
throw;
} catch (const std::runtime_error &) {
throw py::type_error(make_string("Object at position ", i, " cannot be converted to Tensor",
std::is_same_v<Backend, GPUBackend> ? "GPU." : "CPU."));
auto tensor_type = std::is_same_v<Backend, GPUBackend> ? "TensorGPU." : "TensorCPU.";
throw py::type_error(
make_string("Object at position ", i, " cannot be converted to ", tensor_type));
}
}
}

if (!contiguous) {
SetLayout(non_contiguous, layout, false);
copy_order.wait(wait_order);
if constexpr (std::is_same_v<Backend, GPUBackend>)
non_contiguous_out->set_order(copy_order);
return non_contiguous_out;
}

Expand All @@ -1239,7 +1467,8 @@ std::shared_ptr<TensorList<Backend>> TensorListFromListOfTensors(
contiguous_out->set_pinned(non_contiguous.is_pinned());
contiguous_out->Copy(non_contiguous, copy_order);
SetLayout(*contiguous_out, layout, false);
copy_order.wait(wait_order);
if constexpr (std::is_same_v<Backend, GPUBackend>)
contiguous_out->set_order(copy_order);
return contiguous_out;
}
}
Expand Down Expand Up @@ -1356,6 +1585,26 @@ void ExposeTensorListCPU(py::module &m) {
If True, the list of tensors is converted to a contiguous TensorListCPU, necessarily
creating a copy. Otherwise, the copy may be avoided.
)code")
.def_static("from_dlpack_list", [](
py::list &list_of_objects,
std::optional<std::string> layout = {},
bool contiguous = false) {
DomainTimeRange range("TensorListCPU::from_dlpack_list", kCPUTensorColor);
return TensorListFromListOfDLPackObjectsCPU(list_of_objects, layout, contiguous);
},
"list_of_objects"_a,
"layout"_a = py::none(),
"contiguous"_a = false,
R"code(
List of tensors residing in the CPU memory, constructed from a Python list of DLPack objects.

list_of_objects : list
Python list of objects supporting the DLPack protocol (e.g. CPU tensors)
layout : str
Layout of the data
contiguous : bool, default False
If True, samples are copied into a single contiguous CPU buffer
)code")
.def_static("broadcast", [](const Tensor<CPUBackend> &t, int num_samples) {
return std::make_shared<TensorList<CPUBackend>>(t, num_samples);
})
Expand Down Expand Up @@ -1623,6 +1872,30 @@ void ExposeTesorListGPU(py::module &m) {
If True, the list of tensors is converted to a contiguous TensorListGPU, necessarily
creating a copy. Otherwise, the copy may be avoided.
)code")
.def_static("from_dlpack_list", [](
py::list &list_of_objects,
std::optional<std::string> layout = {},
py::object stream = py::none(),
bool contiguous = false) {
DomainTimeRange range("TensorListGPU::from_dlpack_list", kGPUTensorColor);
return TensorListFromListOfDLPackObjects(list_of_objects, layout, stream, contiguous);
},
"list_of_objects"_a,
"layout"_a = py::none(),
"stream"_a = py::none(),
"contiguous"_a = false,
R"code(
List of tensors residing in the GPU memory, constructed from a Python list of DLPack objects.

list_of_objects : list
Python list of objects supporting the DLPack protocol (e.g. PyTorch GPU tensors)
layout : str
Layout of the data
stream : stream, optional
CUDA stream used for the DLPack export handshake
contiguous : bool, default False
If True, samples are copied into a single contiguous GPU buffer
)code")
.def(py::init([](const py::object &object,
const std::optional<std::string> &layout = {},
int device_id = -1) {
Expand Down
Loading
Loading