Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions docs/core-system/recurring-scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,5 @@ for `datamachine_recurring_<schedule_id>`, and its only job is to call
definition.

Hooks are schedule-scoped because a single task handler can have multiple
schedule bindings. Legacy task-scoped hooks (`datamachine_recurring_<task_type>`)
are unscheduled once the canonical schedule-scoped hook is reconciled.

## Upgrade migration

Existing installs may have pending AS actions under older task-scoped hook
names. `SystemAgentServiceProvider::manageRecurringTaskSchedules()` runs on
`action_scheduler_init`, schedules the canonical schedule-scoped hook, and
unschedules legacy task-scoped hooks when the names differ. No other contract
changes; `FlowScheduling::handle_scheduling_update()` signature, REST
endpoints, CLI commands, the `datamachine_scheduler_intervals` filter,
and `TaskScheduler::schedule()` are all unchanged.
schedule bindings. The runtime reconciles only canonical
`datamachine_recurring_<schedule_id>` hooks.
8 changes: 1 addition & 7 deletions docs/core-system/wp-cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,9 @@ wp datamachine flows list-handlers 10

# Memory file management
wp datamachine flows memory-files 10 --add=context.md

# Operator repair: audit and rewrite pre-1.0 scalar handler rows
wp datamachine flows migrate-legacy-handler-shape --dry-run --verbose
wp datamachine flows migrate-legacy-handler-shape --all-sites --yes
```

`migrate-legacy-handler-shape` is a contained operator repair command for older stored `flow_config` rows that still use scalar `handler`, `handler_slug`, or `handler_config` keys. Normal workflow specs, import/update paths, readers, and writers remain canonical-only and use `handler_slugs`, `handler_configs`, and `flow_step_settings`.

**Options**: `--per_page`, `--offset`, `--handler`, `--format`, `--fields`, `--dry-run`, `--all-sites`, `--yes`, `--verbose`
**Options**: `--per_page`, `--offset`, `--handler`, `--format`, `--fields`, `--dry-run`, `--yes`

Flows own runnable configuration. Put handler config, prompt queues, per-flow task settings, schedules, and run-time overrides on flows, not on the pipeline template.

Expand Down
184 changes: 2 additions & 182 deletions inc/Cli/Commands/Flows/FlowsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,7 @@ class FlowsCommand extends BaseCommand {
* : Destination agent slug or ID for bulk reassign. Must be a valid agent.
*
* [--yes]
* : Skip confirmation prompt (delete/reassign/migrate-legacy-handler-shape subcommands).
*
* [--all-sites]
* : Apply migrate-legacy-handler-shape to every site on the multisite network.
*
* [--verbose]
* : Log per-row detail during migrate-legacy-handler-shape.
* : Skip confirmation prompt (delete/reassign subcommands).
*
* ## EXAMPLES
*
Expand Down Expand Up @@ -257,12 +251,7 @@ class FlowsCommand extends BaseCommand {
* # Dry-run reassign
* wp datamachine flows reassign --where-null --to-agent=events-bot --dry-run
*
* # Audit legacy scalar handler shapes on the current site
* wp datamachine flows migrate-legacy-handler-shape --dry-run --verbose
*
* # Rewrite legacy scalar handler shapes across the whole multisite network
* wp datamachine flows migrate-legacy-handler-shape --all-sites --yes
*/
*/
public function __invoke( array $args, array $assoc_args ): void {
$flow_id = null;
$pipeline_id = null;
Expand Down Expand Up @@ -338,12 +327,6 @@ public function __invoke( array $args, array $assoc_args ): void {
return;
}

// Handle 'migrate-legacy-handler-shape' subcommand.
if ( ! empty( $args ) && 'migrate-legacy-handler-shape' === $args[0] ) {
$this->migrateLegacyHandlerShape( $assoc_args );
return;
}

// Handle 'update' subcommand: `flows update 42 --name="New Name"`.
if ( ! empty( $args ) && 'update' === $args[0] ) {
if ( ! isset( $args[1] ) ) {
Expand Down Expand Up @@ -2134,167 +2117,4 @@ static function ( array $flow ): array {
\WP_CLI\Utils\format_items( 'table', $items, array( 'id', 'name', 'pipeline_id', 'user_id', 'portable_slug', 'created' ) );
WP_CLI::log( 'Repair with: wp datamachine flows reassign --where-null --to-agent=<agent> --dry-run' );
}

/**
* Rewrite legacy scalar handler shapes in stored flow_config rows.
*
* Pre-1.0 Data Machine stored single-handler steps with scalar
* `handler_slug` + `handler_config` keys; #712 (Mar 2026) introduced the
* canonical `handler_slugs` / `handler_configs` shape and migrated every
* row in the same release. That persisted migration was retired in
* `e113857` ("Drop pre-1.0 data-shape migrations") on the assumption that
* every install booting current code had already been migrated. Sites
* that still carry scalar-shape rows — e.g. imported flows, restored
* backups, externally-authored flow_config — therefore have no automatic
* recovery path.
*
* #2102 ("Reject legacy handler flow fields", May 2026) made the readers
* canonical-only: handler-backed fetch steps in the scalar shape now
* resolve to a `null` primary slug and run as silent no-ops, and
* `WorkflowSpecValidator` rejects any spec containing the legacy keys.
* This subcommand is the operator-invoked path back to the canonical
* shape without re-introducing a long-tail persisted migration chain.
*
* Scope: current site only. Use `--all-sites` on multisite to iterate
* every site on the network, or run the command with `--url=<host>`
* once per site for finer control.
*
* @param array $assoc_args Associative arguments.
*/
private function migrateLegacyHandlerShape( array $assoc_args ): void {
$dry_run = isset( $assoc_args['dry-run'] );
$skip = isset( $assoc_args['yes'] );
$all_sites = isset( $assoc_args['all-sites'] );
$verbose = isset( $assoc_args['verbose'] );

if ( $all_sites && ! is_multisite() ) {
WP_CLI::warning( '--all-sites has no effect on a single-site install; continuing on current site.' );
$all_sites = false;
}

if ( ! $dry_run && ! $skip ) {
$scope = $all_sites ? 'every site on the network' : sprintf( 'site %d', (int) get_current_blog_id() );
WP_CLI::confirm( sprintf( 'Rewrite legacy scalar handler shapes in flow_config rows for %s?', $scope ) );
}

$totals = array(
'sites_scanned' => 0,
'flows_scanned' => 0,
'flows_with_legacy' => 0,
'flows_rewritten' => 0,
'steps_migrated' => 0,
'orphan_legacy_dropped' => 0,
'errors' => 0,
);

if ( $all_sites ) {
$site_ids = get_sites( array( 'fields' => 'ids' ) );
foreach ( $site_ids as $site_id ) {
switch_to_blog( (int) $site_id );
$totals = $this->migrateLegacyHandlerShapeForCurrentSite( $totals, $dry_run, $verbose );
restore_current_blog();
}
} else {
$totals = $this->migrateLegacyHandlerShapeForCurrentSite( $totals, $dry_run, $verbose );
}

WP_CLI::log( '' );
WP_CLI::log( sprintf( 'Sites scanned: %d', $totals['sites_scanned'] ) );
WP_CLI::log( sprintf( 'Flows scanned: %d', $totals['flows_scanned'] ) );
WP_CLI::log( sprintf( 'Flows with legacy shape: %d', $totals['flows_with_legacy'] ) );
WP_CLI::log( sprintf( 'Flows rewritten: %d%s', $totals['flows_rewritten'], $dry_run ? ' (dry-run)' : '' ) );
WP_CLI::log( sprintf( 'Steps migrated: %d', $totals['steps_migrated'] ) );
WP_CLI::log( sprintf( 'Orphan legacy dropped: %d', $totals['orphan_legacy_dropped'] ) );

if ( $totals['errors'] > 0 ) {
WP_CLI::warning( sprintf( '%d row(s) could not be processed (see preceding log lines).', $totals['errors'] ) );
}

if ( $dry_run ) {
WP_CLI::success( 'Dry run complete. Re-run without --dry-run to persist the rewrites.' );
return;
}

WP_CLI::success( 'Legacy handler shape migration complete.' );
}

/**
* Walk every flow on the current site, rewriting legacy handler shapes.
*
* @param array<string, int> $totals Accumulator across sites.
* @param bool $dry_run Skip persistence when true.
* @param bool $verbose Log per-flow detail when true.
* @return array<string, int> Updated accumulator.
*/
private function migrateLegacyHandlerShapeForCurrentSite( array $totals, bool $dry_run, bool $verbose ): array {
global $wpdb;
$table = $wpdb->prefix . 'datamachine_flows';

// phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching
$table_exists = $wpdb->get_var( $wpdb->prepare( 'SHOW TABLES LIKE %s', $table ) );
if ( $table_exists !== $table ) {
if ( $verbose ) {
WP_CLI::log( sprintf( '[site:%d] no flows table — skipping.', (int) get_current_blog_id() ) );
}
return $totals;
}

++$totals['sites_scanned'];

// phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching
$rows = $wpdb->get_results( "SELECT flow_id, flow_config FROM {$table}", ARRAY_A );
if ( empty( $rows ) ) {
return $totals;
}

$db_flows = new \DataMachine\Core\Database\Flows\Flows();

foreach ( $rows as $row ) {
++$totals['flows_scanned'];

$flow_id = (int) $row['flow_id'];
$flow_config = json_decode( (string) $row['flow_config'], true );

if ( ! is_array( $flow_config ) ) {
++$totals['errors'];
WP_CLI::warning( sprintf( '[site:%d flow:%d] flow_config is not valid JSON; skipping.', (int) get_current_blog_id(), $flow_id ) );
continue;
}

$report = LegacyHandlerShapeMigrator::migrate_flow_config( $flow_config );

if ( ! $report['changed'] ) {
continue;
}

++$totals['flows_with_legacy'];
$totals['steps_migrated'] += $report['steps_migrated'];
$totals['orphan_legacy_dropped'] += $report['dropped_orphan_legacy_config'];

if ( $verbose ) {
WP_CLI::log( sprintf(
'[site:%d flow:%d] migrated %d step(s): %s',
(int) get_current_blog_id(),
$flow_id,
$report['steps_migrated'],
implode( ', ', $report['migrated_step_ids'] )
) );
}

if ( $dry_run ) {
++$totals['flows_rewritten'];
continue;
}

$ok = $db_flows->update_flow( $flow_id, array( 'flow_config' => $report['config'] ) );
if ( $ok ) {
++$totals['flows_rewritten'];
} else {
++$totals['errors'];
WP_CLI::warning( sprintf( '[site:%d flow:%d] update_flow returned false; row not rewritten.', (int) get_current_blog_id(), $flow_id ) );
}
}

return $totals;
}
}
Loading
Loading