Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
.phpunit.result.cache
.phpunit.cache
.env
/coverage
/coverage
.DS_Store
143 changes: 143 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,149 @@ If you're using separate services for dispatching and handling tasks, and your a
'disable_task_handler' => env('CLOUD_TASKS_DISABLE_TASK_HANDLER', false),
```

### Cloud Run Jobs

If you want jobs to be processed by Cloud Run Jobs instead of HTTP endpoints, you can configure the queue to trigger Cloud Run Job executions.

#### Why Cloud Run Jobs?

Cloud Run Jobs are ideal for long-running batch processing that exceeds Cloud Tasks HTTP timeout limits.

Cloud Run Jobs can run for up to 7 days.

**Tip**: Use seperate queue connections with different targets, for low latency jobs, use HTTP targets, for longer running batch jobs use Cloud Run Jobs.

#### Setup

1. **Create a Cloud Run Job** with your Laravel application container, configured to run:

```bash
php artisan cloud-tasks:work-job
```

The command reads job data from environment variables passed to the Job by Cloud Run.

2. **Configure your queue connection**:

```php
'cloudtasks' => [
'driver' => 'cloudtasks',
'project' => env('CLOUD_TASKS_PROJECT'),
'location' => env('CLOUD_TASKS_LOCATION'),
'queue' => env('CLOUD_TASKS_QUEUE', 'default'),

// Cloud Run Job configuration
'cloud_run_job' => env('CLOUD_TASKS_USE_CLOUD_RUN_JOB', false),
'cloud_run_job_name' => env('CLOUD_RUN_JOB_NAME'),
'cloud_run_job_region' => env('CLOUD_RUN_JOB_REGION'), // defaults to location
'service_account_email' => env('CLOUD_TASKS_SERVICE_EMAIL'),

// Optional: Store large payloads (>10KB) in filesystem
'payload_disk' => env('CLOUD_TASKS_PAYLOAD_DISK'), // Laravel disk name
'payload_prefix' => env('CLOUD_TASKS_PAYLOAD_PREFIX', 'cloud-tasks-payloads'),
'payload_threshold' => env('CLOUD_TASKS_PAYLOAD_THRESHOLD', 10240), // bytes
],
```

> **Note**: The command reads `CLOUD_TASKS_PAYLOAD`, `CLOUD_TASKS_TASK_NAME`, and `CLOUD_TASKS_PAYLOAD_PATH` directly from environment variables at runtime using `getenv()`. These are set automatically by Cloud Tasks via container overrides.

3. **Set environment variables**:

```dotenv
CLOUD_TASKS_USE_CLOUD_RUN_JOB=true
CLOUD_RUN_JOB_NAME=my-queue-worker-job
CLOUD_RUN_JOB_REGION=europe-west1
```

#### Large Payload Storage

For jobs with payloads exceeding environment variable limits (32KB limit enforced by Cloud Run), configure a Laravel filesystem disk:

```dotenv
CLOUD_TASKS_PAYLOAD_DISK=gcs
CLOUD_TASKS_PAYLOAD_PREFIX=cloud-tasks-payloads
CLOUD_TASKS_PAYLOAD_THRESHOLD=30000
```

When the payload exceeds the threshold, it's stored in the disk and `CLOUD_TASKS_PAYLOAD_PATH` is used instead.

> **Note**: The payloads will not be cleared up automatically, you can define lifecycle rules for the GCS bucket to delete old payloads.

#### How It Works

When you dispatch a job with Cloud Run Job target enabled:

1. Package creates a Cloud Task with HTTP target pointing to Cloud Run Jobs API
2. Cloud Tasks calls `run.googleapis.com/v2/.../jobs/{job}:run`
3. Cloud Run Jobs starts a new execution with environment variables set via container overrides:
- `CLOUD_TASKS_PAYLOAD` - Base64-encoded job payload
- `CLOUD_TASKS_TASK_NAME` - The task name
4. The container runs `php artisan cloud-tasks:work-job` which reads the env vars and processes the job

All Laravel queue functionality is retained:
- Job retries and max attempts
- Failed job handling
- Job timeouts
- Encrypted jobs
- Queue events

#### Required IAM Permissions

Cloud Run Jobs requires specific IAM permissions. Set these variables first:

```bash
export PROJECT_ID="your-project-id"
export SA_EMAIL="your-service-account@your-project-id.iam.gserviceaccount.com"
export TASKS_AGENT="service-{PROJECT_NUMBER}@gcp-sa-cloudtasks.iam.gserviceaccount.com"
```

> **Note**: Find your Cloud Tasks service agent email in the IAM console under "Include Google-provided role grants".
> **Note**: Project ID and Project Number are different. Project ID is the name of your project, Project Number is the numeric ID of your project.

**Project-Level Permissions:**

```bash
# Allow enqueuing tasks (required by PHP app running as $SA_EMAIL)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/cloudtasks.enqueuer"

# Allow executing jobs with overrides (required for container overrides)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/run.jobsExecutorWithOverrides"

# Allow invoking Cloud Run Services (if also using Cloud Run Services as HTTP targets)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/run.invoker"
```

**Note**: To restrict access to specific Cloud Run instances, use IAM conditions to limit access to specific Cloud Run Jobs / services.

**Service Account Permissions:**

```bash
# Allow the SA to act as itself (required for task creation and execution)
gcloud iam service-accounts add-iam-policy-binding $SA_EMAIL \
--member="serviceAccount:$SA_EMAIL" \
--role="roles/iam.serviceAccountUser"

# Allow Cloud Tasks to act as the SA (required for OAuth token generation)
gcloud iam service-accounts add-iam-policy-binding $SA_EMAIL \
--member="serviceAccount:$TASKS_AGENT" \
--role="roles/iam.serviceAccountUser"
```

| Permission | Required By | Purpose |
|------------|-------------|---------|
| `cloudtasks.enqueuer` | PHP App | Add tasks to the queue |
| `cloudtasks.viewer` | Cloud Run Job | List queues/tasks (optional) |
| `run.jobsExecutorWithOverrides` | Cloud Task | Execute jobs with container overrides |
| `run.invoker` | Other Workloads | Invoke Cloud Run Services (if using HTTP targets) |
| `iam.serviceAccountUser` (on SA) | Both | Allow SA to create tasks as itself |
| `iam.serviceAccountUser` (Tasks Agent) | Google Infrastructure | Generate OAuth tokens for Cloud Run |

### How-To

#### Pass headers to a task
Expand Down
8 changes: 7 additions & 1 deletion src/CloudTasksConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
* service_account_email?: string,
* backoff?: int,
* dispatch_deadline?: int,
* after_commit?: bool
* after_commit?: bool,
* cloud_run_job?: bool,
* cloud_run_job_name?: string,
* cloud_run_job_region?: string,
* payload_disk?: string,
* payload_prefix?: string,
* payload_threshold?: int
* }
*/
class CloudTasksConnector implements ConnectorInterface
Expand Down
100 changes: 100 additions & 0 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use Illuminate\Queue\WorkerOptions;
use Google\Cloud\Tasks\V2\OidcToken;
use Google\Cloud\Tasks\V2\HttpMethod;
use Google\Cloud\Tasks\V2\OAuthToken;
use Google\Cloud\Tasks\V2\HttpRequest;
use Illuminate\Support\Facades\Storage;
use Google\Cloud\Tasks\V2\AppEngineRouting;
use Illuminate\Queue\Queue as LaravelQueue;
use Google\Cloud\Tasks\V2\AppEngineHttpRequest;
Expand Down Expand Up @@ -278,6 +280,45 @@ public function addPayloadToTask(array $payload, Task $task, $job): Task
}

$task->setAppEngineHttpRequest($appEngineRequest);
} elseif (! empty($this->config['cloud_run_job'])) {
// Cloud Run Job target - call the Cloud Run Jobs execution API
$httpRequest = new HttpRequest;
$httpRequest->setUrl($this->getCloudRunJobExecutionUrl());
$httpRequest->setHttpMethod(HttpMethod::POST);
$httpRequest->setHeaders(array_merge($headers, [
'Content-Type' => 'application/json',
]));

// Build the execution request body with container overrides
// The job payload is passed as environment variables
$taskNameShort = str($task->getName())->afterLast('/')->toString();
$encodedPayload = base64_encode(json_encode($payload));

// Build env vars for the container using fixed env var names
// These map to config keys: cloud_run_job_payload, cloud_run_job_task_name, cloud_run_job_payload_path
$envVars = $this->getCloudRunJobEnvVars($encodedPayload, $taskNameShort);

$executionBody = [
'overrides' => [
'containerOverrides' => [
[
'env' => $envVars,
],
],
],
];

$httpRequest->setBody(json_encode($executionBody));

$token = new OAuthToken;
$token->setServiceAccountEmail($this->config['service_account_email'] ?? '');
$token->setScope('https://www.googleapis.com/auth/cloud-platform');
$httpRequest->setOAuthToken($token);
$task->setHttpRequest($httpRequest);

if (! empty($this->config['dispatch_deadline'])) {
$task->setDispatchDeadline((new Duration)->setSeconds($this->config['dispatch_deadline']));
}
} else {
$httpRequest = new HttpRequest;
$httpRequest->setUrl($this->getHandler($job));
Expand Down Expand Up @@ -367,4 +408,63 @@ private function getQueueForJob(mixed $job): string

return $this->config['queue'];
}

/**
* Get the Cloud Run Jobs execution API URL.
*/
private function getCloudRunJobExecutionUrl(): string
{
$project = $this->config['project'];
$region = $this->config['cloud_run_job_region'] ?? $this->config['location'];
$jobName = $this->config['cloud_run_job_name'] ?? throw new Exception('cloud_run_job_name is required when using Cloud Run Jobs.');

return sprintf(
'https://run.googleapis.com/v2/projects/%s/locations/%s/jobs/%s:run',
$project,
$region,
$jobName
);
}

/**
* Get the environment variables for Cloud Run Job dispatch.
*
* If the payload exceeds the configured threshold, it will be stored
* in the configured disk and the path will be returned instead.
*
* Env vars set map to config keys in the queue connection:
* - CLOUD_TASKS_TASK_NAME -> cloud_run_job_task_name
* - CLOUD_TASKS_PAYLOAD -> cloud_run_job_payload
* - CLOUD_TASKS_PAYLOAD_PATH -> cloud_run_job_payload_path
*
* @return array<int, array{name: string, value: string}>
*/
private function getCloudRunJobEnvVars(string $encodedPayload, string $taskName): array
{
$disk = $this->config['payload_disk'] ?? null;
$threshold = $this->config['payload_threshold'] ?? 10240; // 10KB default

$envVars = [
['name' => 'CLOUD_TASKS_TASK_NAME', 'value' => $taskName],
];

// If no disk configured or payload is below threshold, pass payload directly
if ($disk === null || strlen($encodedPayload) <= $threshold) {
$envVars[] = ['name' => 'CLOUD_TASKS_PAYLOAD', 'value' => $encodedPayload];

return $envVars;
}

// Store payload in configured disk and pass path instead
$prefix = $this->config['payload_prefix'] ?? 'cloud-tasks-payloads';
$timestamp = now()->format('Y-m-d_H:i:s.v');
$path = sprintf('%s/%s_%s.json', $prefix, $timestamp, $taskName);

Storage::disk($disk)->put($path, $encodedPayload);

// Set the path env var for large payloads
$envVars[] = ['name' => 'CLOUD_TASKS_PAYLOAD_PATH', 'value' => $disk.':'.$path];

return $envVars;
}
}
11 changes: 11 additions & 0 deletions src/CloudTasksServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Google\Cloud\Tasks\V2\Client\CloudTasksClient;
use Stackkit\LaravelGoogleCloudTasksQueue\Events\JobReleased;
use Illuminate\Support\ServiceProvider as LaravelServiceProvider;
use Stackkit\LaravelGoogleCloudTasksQueue\Commands\WorkCloudRunJob;

class CloudTasksServiceProvider extends LaravelServiceProvider
{
Expand All @@ -24,6 +25,7 @@ public function boot(): void
$this->registerConfig();
$this->registerRoutes();
$this->registerEvents();
$this->registerCommands();
}

private function registerClient(): void
Expand Down Expand Up @@ -112,4 +114,13 @@ private function registerEvents(): void
}
});
}

private function registerCommands(): void
{
if ($this->app->runningInConsole()) {
$this->commands([
WorkCloudRunJob::class,
]);
}
}
}
Loading
Loading