Skip to content
Merged
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,17 @@ GET /api/v1/job/<job_id>/stderr

Heimdall supports a growing set of pluggable command types:

| Plugin | Description | Execution Mode |
| ----------- | -------------------------------------- | -------------- |
| `ping` | [Basic plugin used for testing](https://github.com/patterninc/heimdall/blob/main/plugins/ping/README.md) | Sync or Async |
| `shell` | [Shell command execution](https://github.com/patterninc/heimdall/blob/main/plugins/shell/README.md) | Sync or Async |
| `glue` | [Pulling Iceberg table metadata](https://github.com/patterninc/heimdall/blob/main/plugins/glue/README.md) | Sync or Async |
| `dynamo` | [DynamoDB read operation](https://github.com/patterninc/heimdall/blob/main/plugins/dynamo/README.md) | Sync or Async |
| `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async |
| `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async |
| `trino` | [Query execution in Trino](https://github.com/patterninc/heimdall/blob/main/plugins/trino/README.md) | Async |
| `clickhouse`| [Query execution in Clickhouse](https://github.com/patterninc/heimdall/blob/main/plugins/clickhouse/README.md) | Sync |
| Plugin | Description | Execution Mode |
| ----------- | -------------------------------------- | -------------- |
| `ping` | [Basic plugin used for testing](https://github.com/patterninc/heimdall/blob/main/plugins/ping/README.md) | Sync or Async |
| `shell` | [Shell command execution](https://github.com/patterninc/heimdall/blob/main/plugins/shell/README.md) | Sync or Async |
| `glue` | [Pulling Iceberg table metadata](https://github.com/patterninc/heimdall/blob/main/plugins/glue/README.md) | Sync or Async |
| `dynamo` | [DynamoDB read operation](https://github.com/patterninc/heimdall/blob/main/plugins/dynamo/README.md) | Sync or Async |
| `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async |
| `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async |
| `trino` | [Query execution in Trino](https://github.com/patterninc/heimdall/blob/main/plugins/trino/README.md) | Async |
| `clickhouse` | [Query execution in Clickhouse](https://github.com/patterninc/heimdall/blob/main/plugins/clickhouse/README.md) | Sync |
| `ecs fargate` | [Task Deployment in ECS Fargate](https://github.com/patterninc/heimdall/blob/main/plugins/ecs/README.md) | Async |

---

Expand Down Expand Up @@ -163,6 +164,7 @@ It centralizes execution logic, logging, and auditing—all accessible via API o
| `POST /api/v1/job` | Submit a job |
| `GET /api/v1/job/<id>` | Get job details |
| `GET /api/v1/job/<id>/status` | Check job status |
| `POST /api/v1/job/<id>/cancel` | Cancel an async job |
| `GET /api/v1/job/<id>/stdout` | Get stdout for a completed job |
| `GET /api/v1/job/<id>/stderr` | Get stderr for a completed job |
| `GET /api/v1/job/<id>/result` | Get job's result |
Expand Down
4 changes: 3 additions & 1 deletion assets/databases/heimdall/data/job_statuses.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ values
(3, 'RUNNING'),
(4, 'FAILED'),
(5, 'KILLED'),
(6, 'SUCCEEDED')
(6, 'SUCCEEDED'),
(7, 'CANCELLING'),
(8, 'CANCELLED')
on conflict (job_status_id) do update
set
job_status_name = excluded.job_status_name;
4 changes: 3 additions & 1 deletion assets/databases/heimdall/tables/jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ create table if not exists jobs
constraint _jobs_job_id unique (job_id)
);

alter table jobs add column if not exists store_result_sync boolean not null default false;
alter table jobs add column if not exists store_result_sync boolean not null default false;
alter table jobs add column if not exists cancelled_by varchar(64) null;
update jobs set cancelled_by = '' where cancelled_by is null;
3 changes: 2 additions & 1 deletion internal/pkg/aws/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package aws

import (
"context"
"fmt"
"os"
"time"
Expand All @@ -10,7 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
)

func PullLogs(writer *os.File, logGroup, logStream string, chunkSize int, memoryLimit int64) error {
func PullLogs(ctx context.Context, writer *os.File, logGroup, logStream string, chunkSize int, memoryLimit int64) error {

// initialize AWS session
cfg, err := config.LoadDefaultConfig(ctx)
Expand Down
9 changes: 5 additions & 4 deletions internal/pkg/aws/glue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package aws

import (
"context"
"fmt"
"strings"

Expand All @@ -17,7 +18,7 @@ var (
ErrMissingCatalogTableMetadata = fmt.Errorf(`missing table metadata in the glue catalog`)
)

func GetTableMetadata(catalogID, tableName string) ([]byte, error) {
func GetTableMetadata(ctx context.Context, catalogID, tableName string) ([]byte, error) {

// split tableName to namespace and table names
tableNameParts := strings.Split(tableName, `.`)
Expand All @@ -27,18 +28,18 @@ func GetTableMetadata(catalogID, tableName string) ([]byte, error) {
}

// let's get the latest metadata file location
location, err := getTableMetadataLocation(catalogID, tableNameParts[0], tableNameParts[1])
location, err := getTableMetadataLocation(ctx, catalogID, tableNameParts[0], tableNameParts[1])
if err != nil {
return nil, err
}

// let's pull the file content
return ReadFromS3(location)
return ReadFromS3(ctx, location)

}

// function that calls AWS glue catalog to get the snapshot ID for a given database, table and branch
func getTableMetadataLocation(catalogID, databaseName, tableName string) (string, error) {
func getTableMetadataLocation(ctx context.Context, catalogID, databaseName, tableName string) (string, error) {

// Return an error if databaseName or tableName is empty
if databaseName == `` || tableName == `` {
Expand Down
5 changes: 2 additions & 3 deletions internal/pkg/aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ import (
)

var (
ctx = context.Background()
rxS3Path = regexp.MustCompile(`^s3://([^/]+)/(.*)$`)
)

// WriteToS3 writes a file to S3, providing the same interface as os.WriteFile function
func WriteToS3(name string, data []byte, _ os.FileMode) error {
func WriteToS3(ctx context.Context, name string, data []byte, _ os.FileMode) error {

bucket, key, err := parseS3Path(name)
if err != nil {
Expand Down Expand Up @@ -47,7 +46,7 @@ func WriteToS3(name string, data []byte, _ os.FileMode) error {

}

func ReadFromS3(name string) ([]byte, error) {
func ReadFromS3(ctx context.Context, name string) ([]byte, error) {

bucket, key, err := parseS3Path(name)
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions internal/pkg/heimdall/cluster_dal.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package heimdall

import (
"context"
"database/sql"
_ "embed"
"encoding/json"
Expand Down Expand Up @@ -77,13 +78,13 @@ var (
getClustersMethod = telemetry.NewMethod("db_connection", "get_clusters")
)

func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) {
func (h *Heimdall) submitCluster(ctx context.Context, c *cluster.Cluster) (any, error) {

if err := h.clusterUpsert(c); err != nil {
return nil, err
}

return h.getCluster(&cluster.Cluster{Object: object.Object{ID: c.ID}})
return h.getCluster(ctx, &cluster.Cluster{Object: object.Object{ID: c.ID}})

}

Expand Down Expand Up @@ -134,7 +135,7 @@ func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error {

}

func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) {
func (h *Heimdall) getCluster(ctx context.Context, c *cluster.Cluster) (any, error) {

// Track DB connection for get cluster operation
defer getClusterMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -181,7 +182,7 @@ func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) {

}

func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) {
func (h *Heimdall) getClusterStatus(ctx context.Context, c *cluster.Cluster) (any, error) {

// Track DB connection for cluster status operation
defer getClusterStatusMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -216,7 +217,7 @@ func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) {

}

func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) {
func (h *Heimdall) updateClusterStatus(ctx context.Context, c *cluster.Cluster) (any, error) {

defer updateClusterStatusMethod.RecordLatency(time.Now())
updateClusterStatusMethod.CountRequest()
Expand All @@ -239,11 +240,11 @@ func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) {
}

updateClusterStatusMethod.CountSuccess()
return h.getClusterStatus(c)
return h.getClusterStatus(ctx, c)

}

func (h *Heimdall) getClusters(f *database.Filter) (any, error) {
func (h *Heimdall) getClusters(ctx context.Context, f *database.Filter) (any, error) {

// Track DB connection for clusters list operation
defer getClustersMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -295,7 +296,7 @@ func (h *Heimdall) getClusters(f *database.Filter) (any, error) {

}

func (h *Heimdall) getClusterStatuses(_ *database.Filter) (any, error) {
func (h *Heimdall) getClusterStatuses(ctx context.Context, _ *database.Filter) (any, error) {

return database.GetSlice(h.Database, queryClusterStatusesSelect)

Expand Down
17 changes: 9 additions & 8 deletions internal/pkg/heimdall/command_dal.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package heimdall

import (
"context"
"database/sql"
_ "embed"
"encoding/json"
Expand Down Expand Up @@ -89,13 +90,13 @@ var (
getCommandsMethod = telemetry.NewMethod("db_connection", "get_commands")
)

func (h *Heimdall) submitCommand(c *command.Command) (any, error) {
func (h *Heimdall) submitCommand(ctx context.Context, c *command.Command) (any, error) {

if err := h.commandUpsert(c); err != nil {
return nil, err
}

return h.getCommand(&command.Command{Object: object.Object{ID: c.ID}})
return h.getCommand(ctx, &command.Command{Object: object.Object{ID: c.ID}})

}

Expand Down Expand Up @@ -163,7 +164,7 @@ func (h *Heimdall) commandUpsert(c *command.Command) error {

}

func (h *Heimdall) getCommand(c *command.Command) (any, error) {
func (h *Heimdall) getCommand(ctx context.Context, c *command.Command) (any, error) {

// Track DB connection for get command operation
defer getCommandMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -210,7 +211,7 @@ func (h *Heimdall) getCommand(c *command.Command) (any, error) {

}

func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) {
func (h *Heimdall) getCommandStatus(ctx context.Context, c *command.Command) (any, error) {

// Track DB connection for command status operation
defer getCommandStatusMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -245,7 +246,7 @@ func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) {

}

func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) {
func (h *Heimdall) updateCommandStatus(ctx context.Context, c *command.Command) (any, error) {

// Track DB connection for command status update operation
defer updateCommandStatusMethod.RecordLatency(time.Now())
Expand All @@ -269,11 +270,11 @@ func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) {
}

updateCommandStatusMethod.CountSuccess()
return h.getCommandStatus(c)
return h.getCommandStatus(ctx, c)

}

func (h *Heimdall) getCommands(f *database.Filter) (any, error) {
func (h *Heimdall) getCommands(ctx context.Context, f *database.Filter) (any, error) {

// Track DB connection for commands list operation
defer getCommandsMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -325,7 +326,7 @@ func (h *Heimdall) getCommands(f *database.Filter) (any, error) {

}

func (h *Heimdall) getCommandStatuses(_ *database.Filter) (any, error) {
func (h *Heimdall) getCommandStatuses(ctx context.Context, _ *database.Filter) (any, error) {

return database.GetSlice(h.Database, queryCommandStatusesSelect)

Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/heimdall/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package heimdall

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -48,7 +49,7 @@ func writeAPIError(w http.ResponseWriter, err error, obj any) {
w.Write(responseJSON)
}

func payloadHandler[T any](fn func(*T) (any, error)) http.HandlerFunc {
func payloadHandler[T any](fn func(context.Context, *T) (any, error)) http.HandlerFunc {
// start latency timer
defer payloadHandlerMethod.RecordLatency(time.Now())

Expand Down Expand Up @@ -81,7 +82,7 @@ func payloadHandler[T any](fn func(*T) (any, error)) http.HandlerFunc {
}

// execute request
result, err := fn(&payload)
result, err := fn(r.Context(), &payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like it

if err != nil {
writeAPIError(w, err, result)
return
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"github.com/patterninc/heimdall/internal/pkg/database"
"github.com/patterninc/heimdall/internal/pkg/janitor"
"github.com/patterninc/heimdall/internal/pkg/pool"
"github.com/patterninc/heimdall/internal/pkg/rbac"
"github.com/patterninc/heimdall/internal/pkg/server"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/command"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"
"github.com/patterninc/heimdall/internal/pkg/rbac"
rbacI "github.com/patterninc/heimdall/pkg/rbac"
)

Expand Down Expand Up @@ -173,6 +173,7 @@ func (h *Heimdall) Start() error {
// job(s) endpoints...
apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses))
apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus))
apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob))
apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/{file}`).HandlerFunc(h.getJobFile)
apiRouter.Methods(methodGET).PathPrefix(`/job/{id}`).HandlerFunc(payloadHandler(h.getJob))
apiRouter.Methods(methodGET).PathPrefix(`/jobs`).HandlerFunc(payloadHandler(h.getJobs))
Expand Down
Loading