Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 65 additions & 16 deletions apis/workflows/v1/core.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,6 @@ message Tasks {

// TaskSubmission is a message of a task that is just about to be submitted, either by submitting a job or as a subtask.
message TaskSubmission {
option (buf.validate.message).oneof = {
fields: [
"input",
"inputs"
]
required: true
};

// The cluster that this task should be run on
string cluster_slug = 1;
// The task identifier
Expand All @@ -206,15 +198,72 @@ message TaskSubmission {

// The serialized task instance, if there is only a single instance.
bytes input = 3 [(buf.validate.field).bytes.max_len = 2048];
}

// A list of serialized task instances, all sharing the same task properties. This is useful for cases where we have
// a larger number of very similar subtasks, but only the input parameters vary.
repeated bytes inputs = 7 [(buf.validate.field).repeated = {
items: {
bytes: {max_len: 2048}
}
max_items: 100000 // maximum of 100k subtasks in a single subtask tree
}];
// TaskSubmissions is a structure for representing a set of tasks about to be submitted, either as a job or as subtasks.
// It is optimized for efficient serialization for cases where a large number of very similar tasks are submitted,
// with potentially only the individual input parameters varying.
// To reduce the serialization size, we keep a separate list/lookup table of unique task properties, that can then be
// referenced by their index.
message TaskSubmissions {
// Concrete instantiations of tasks, grouped by their dependencies and dependants. Each group is uniquely defined by
// the set of groups that it depends on (dependencies_on_other_groups) and the set of groups that depend on it,
// (which is implicitly given by the inverse of the dependencies on other groups).
repeated TaskSubmissionGroup task_groups = 1;
// Unique values of cluster slugs, referenced by index in the task instantiations.
repeated string cluster_slug_lookup = 2;
// Unique values of task identifiers, referenced by index in the task instantiations.
repeated TaskIdentifier identifier_lookup = 3;
// Unique values of display names, referenced by index in the task instantiations.
repeated string display_lookup = 4 [(buf.validate.field).repeated.items.string.min_len = 1];
}

// TaskSubmissionGroup is a structure for representing a list of submitted tasks, that all share the exact same
// dependencies and dependants. Grouping tasks by their dependency edges, and then converting task dependencies to
// group dependencies can help to drastically reduce the number of edges we need to serialize and transmit.
// Dependants are not explicitly specified, since they can be inferred from the dependencies of the other groups
// in the containing TaskSubmissions message. This means that the `dependencies_on_other_groups` field is not unique,
// across groups, since there may be two groups sharing the same dependencies but having different dependants.
message TaskSubmissionGroup {
option (buf.validate.message).cel = {
id: "task_submission_group.identifiers_size_match"
message: "The number of inputs must match the number of task identifiers."
expression: "this.inputs.size() == this.identifier_pointers.size()"
};
option (buf.validate.message).cel = {
id: "task_submission_group.cluster_slugs_size_match"
message: "The number of cluster slugs must match the number of inputs."
expression: "this.inputs.size() == this.cluster_slug_pointers.size()"
};
option (buf.validate.message).cel = {
id: "task_submission_group.displays_size_match"
message: "The number of display pointers must match the number of inputs."
expression: "this.inputs.size() == this.display_pointers.size()"
};
option (buf.validate.message).cel = {
id: "task_submission_group.max_retries_size_match"
message: "The number of max_retries_values must match the number of inputs."
expression: "this.inputs.size() == this.max_retries_values.size()"
};

// The indices of the groups that this submission group depends on. Indices refer to the groups field of the
// containing TaskSubmissions message.
repeated uint32 dependencies_on_other_groups = 1;
// The input parameters for each task.
// We explicitly don't group the fields into a submessage and then have a single repeated field for that submessage,
// to enable packed encoding of the repeated fields.
repeated bytes inputs = 2 [(buf.validate.field).repeated.items.bytes.max_len = 2048];
// Index of the task identifier in the identifier_lookup field of the containing TaskSubmissions message
// for each task.
repeated uint64 identifier_pointers = 3;
// Index of the cluster slug in the cluster_slug_lookup field of the containing TaskSubmissions message for each task,
// indicating the cluster that the task should be run on.
repeated uint64 cluster_slug_pointers = 4;
// Index of the display name in the display_lookup field of the containing TaskSubmissions message for each task,
// specifying a human-readable description of the task.
repeated uint64 display_pointers = 5;
// The maximum number of retries for each task. Not a pointer to a lookup table, since we just inline the values.
repeated int64 max_retries_values = 6;
}

// A lease for a task.
Expand Down
7 changes: 6 additions & 1 deletion apis/workflows/v1/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ message ComputedTask {
// If not set, the display message specified upon task submission will be kept.
string display = 2;
// A list of sub-tasks that the just computed task spawned.
repeated TaskSubmission sub_tasks = 3 [(buf.validate.field).repeated.max_items = 64];
repeated TaskSubmission legacy_sub_tasks = 3 [
deprecated = true,
(buf.validate.field).repeated.max_items = 64
];
// A list of sub-tasks that the just computed task spawned.
TaskSubmissions sub_tasks = 5;
// A list of progress updates that the computed task wants to report.
repeated Progress progress_updates = 4;
}
Expand Down