Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions backend/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (be *postgresBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi
}
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

var visibleTime*time.Time = nil
var visibleTime *time.Time = nil
Comment thread
jeanmartins marked this conversation as resolved.
if delay := wi.GetAbandonDelay(); delay > 0 {
t := time.Now().UTC().Add(delay)
visibleTime = &t
Expand Down Expand Up @@ -497,6 +497,12 @@ func (be *postgresBackend) createOrchestrationInstanceInternal(ctx context.Conte
}

func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx pgx.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) {
var parentInstanceID *string
if pi := startEvent.GetParentInstance(); pi != nil {
if instanceID := pi.GetOrchestrationInstance().GetInstanceId(); instanceID != "" {
parentInstanceID = &instanceID
}
}
Comment thread
jeanmartins marked this conversation as resolved.
res, err := tx.Exec(
ctx,
`INSERT INTO Instances (
Expand All @@ -506,15 +512,17 @@ func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx pgx.Tx, e *back
ExecutionID,
Input,
RuntimeStatus,
CreatedTime
) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING`,
CreatedTime,
ParentInstanceID
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING`,
Comment thread
jeanmartins marked this conversation as resolved.
startEvent.Name,
startEvent.Version.GetValue(),
startEvent.OrchestrationInstance.InstanceId,
startEvent.OrchestrationInstance.ExecutionId.GetValue(),
startEvent.Input.GetValue(),
"PENDING",
e.Timestamp.AsTime(),
parentInstanceID,
)
if err != nil {
return -1, fmt.Errorf("failed to insert into Instances table: %w", err)
Expand Down Expand Up @@ -769,7 +777,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe
defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op

now := time.Now().UTC()
newLockExpiration:= now.Add(be.options.OrchestrationLockTimeout)
newLockExpiration := now.Add(be.options.OrchestrationLockTimeout)

// Place a lock on an orchestration instance that has new events that are ready to be executed.
row := tx.QueryRow(
Expand Down
12 changes: 10 additions & 2 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context
}

func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) {
var parentInstanceID *string
if pi := startEvent.GetParentInstance(); pi != nil {
if instanceID := pi.GetOrchestrationInstance().GetInstanceId(); instanceID != "" {
parentInstanceID = &instanceID
}
}
Comment thread
jeanmartins marked this conversation as resolved.
res, err := tx.ExecContext(
ctx,
`INSERT OR IGNORE INTO [Instances] (
Expand All @@ -479,15 +485,17 @@ func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *bac
[ExecutionID],
[Input],
[RuntimeStatus],
[CreatedTime]
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
[CreatedTime],
[ParentInstanceID]
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
Comment thread
jeanmartins marked this conversation as resolved.
startEvent.Name,
startEvent.Version.GetValue(),
startEvent.OrchestrationInstance.InstanceId,
startEvent.OrchestrationInstance.ExecutionId.GetValue(),
startEvent.Input.GetValue(),
"PENDING",
e.Timestamp.AsTime(),
parentInstanceID,
)
if err != nil {
return -1, fmt.Errorf("failed to insert into [Instances] table: %w", err)
Expand Down
Loading