Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,16 @@ GET /api/v1/job/<job_id>/stderr

Heimdall supports a growing set of pluggable command types:

| Plugin | Description | Execution Mode |
| ----------- | -------------------------------------- | -------------- |
| `ping` | [Basic plugin used for testing](https://github.com/patterninc/heimdall/blob/main/plugins/ping/README.md) | Sync or Async |
| `shell` | [Shell command execution](https://github.com/patterninc/heimdall/blob/main/plugins/shell/README.md) | Sync or Async |
| `glue` | [Pulling Iceberg table metadata](https://github.com/patterninc/heimdall/blob/main/plugins/glue/README.md) | Sync or Async |
| `dynamo` | [DynamoDB read operation](https://github.com/patterninc/heimdall/blob/main/plugins/dynamo/README.md) | Sync or Async |
| `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async |
| `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async |
| `trino` | [Query execution in Trino](https://github.com/patterninc/heimdall/blob/main/plugins/trino/README.md) | Async |
| `clickhouse` | [Query execution in Clickhouse](https://github.com/patterninc/heimdall/blob/main/plugins/clickhouse/README.md) | Sync |
| `ecs fargate` | [Task Deployment in ECS Fargate](https://github.com/patterninc/heimdall/blob/main/plugins/ecs/README.md) | Async |
| Plugin | Description | Execution Mode |
| ----------- | -------------------------------------- | -------------- |
| `ping` | [Basic plugin used for testing](https://github.com/patterninc/heimdall/blob/main/plugins/ping/README.md) | Sync or Async |
| `shell` | [Shell command execution](https://github.com/patterninc/heimdall/blob/main/plugins/shell/README.md) | Sync or Async |
| `glue` | [Pulling Iceberg table metadata](https://github.com/patterninc/heimdall/blob/main/plugins/glue/README.md) | Sync or Async |
| `dynamo` | [DynamoDB read operation](https://github.com/patterninc/heimdall/blob/main/plugins/dynamo/README.md) | Sync or Async |
| `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async |
| `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async |
| `trino` | [Query execution in Trino](https://github.com/patterninc/heimdall/blob/main/plugins/trino/README.md) | Async |
| `clickhouse`| [Query execution in Clickhouse](https://github.com/patterninc/heimdall/blob/main/plugins/clickhouse/README.md) | Sync |

---

Expand Down Expand Up @@ -164,7 +163,6 @@ It centralizes execution logic, logging, and auditing—all accessible via API o
| `POST /api/v1/job` | Submit a job |
| `GET /api/v1/job/<id>` | Get job details |
| `GET /api/v1/job/<id>/status` | Check job status |
| `POST /api/v1/job/<id>/cancel` | Cancel an async job |
| `GET /api/v1/job/<id>/stdout` | Get stdout for a completed job |
| `GET /api/v1/job/<id>/stderr` | Get stderr for a completed job |
| `GET /api/v1/job/<id>/result` | Get job's result |
Expand Down
4 changes: 1 addition & 3 deletions assets/databases/heimdall/data/job_statuses.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ values
(3, 'RUNNING'),
(4, 'FAILED'),
(5, 'KILLED'),
(6, 'SUCCEEDED'),
(7, 'CANCELLING'),
(8, 'CANCELLED')
(6, 'SUCCEEDED')
on conflict (job_status_id) do update
set
job_status_name = excluded.job_status_name;
4 changes: 1 addition & 3 deletions assets/databases/heimdall/tables/jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,4 @@ create table if not exists jobs
constraint _jobs_job_id unique (job_id)
);

alter table jobs add column if not exists store_result_sync boolean not null default false;
alter table jobs add column if not exists cancelled_by varchar(64) null;
update jobs set cancelled_by = '' where cancelled_by is null;
alter table jobs add column if not exists store_result_sync boolean not null default false;
3 changes: 1 addition & 2 deletions internal/pkg/aws/cloudwatch.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package aws

import (
"context"
"fmt"
"os"
"time"
Expand All @@ -11,7 +10,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
)

func PullLogs(ctx context.Context, writer *os.File, logGroup, logStream string, chunkSize int, memoryLimit int64) error {
func PullLogs(writer *os.File, logGroup, logStream string, chunkSize int, memoryLimit int64) error {
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context import was removed but the function still uses ctx on lines 16 and 41. The ctx variable is not defined anywhere in this file, which will cause a compilation error. A global context variable should be defined similar to other files in this PR, or the context parameter should be kept in the function signature.

Copilot uses AI. Check for mistakes.

// initialize AWS session
cfg, err := config.LoadDefaultConfig(ctx)
Expand Down
9 changes: 4 additions & 5 deletions internal/pkg/aws/glue.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package aws

import (
"context"
"fmt"
"strings"

Expand All @@ -18,7 +17,7 @@ var (
ErrMissingCatalogTableMetadata = fmt.Errorf(`missing table metadata in the glue catalog`)
)

func GetTableMetadata(ctx context.Context, catalogID, tableName string) ([]byte, error) {
func GetTableMetadata(catalogID, tableName string) ([]byte, error) {

// split tableName to namespace and table names
tableNameParts := strings.Split(tableName, `.`)
Expand All @@ -28,18 +27,18 @@ func GetTableMetadata(ctx context.Context, catalogID, tableName string) ([]byte,
}

// let's get the latest metadata file location
location, err := getTableMetadataLocation(ctx, catalogID, tableNameParts[0], tableNameParts[1])
location, err := getTableMetadataLocation(catalogID, tableNameParts[0], tableNameParts[1])
if err != nil {
return nil, err
}

// let's pull the file content
return ReadFromS3(ctx, location)
return ReadFromS3(location)

}

// function that calls AWS glue catalog to get the snapshot ID for a given database, table and branch
func getTableMetadataLocation(ctx context.Context, catalogID, databaseName, tableName string) (string, error) {
func getTableMetadataLocation(catalogID, databaseName, tableName string) (string, error) {

// Return an error if databaseName or tableName is empty
if databaseName == `` || tableName == `` {
Expand Down
5 changes: 3 additions & 2 deletions internal/pkg/aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import (
)

var (
ctx = context.Background()
rxS3Path = regexp.MustCompile(`^s3://([^/]+)/(.*)$`)
)

// WriteToS3 writes a file to S3, providing the same interface as os.WriteFile function
func WriteToS3(ctx context.Context, name string, data []byte, _ os.FileMode) error {
func WriteToS3(name string, data []byte, _ os.FileMode) error {

bucket, key, err := parseS3Path(name)
if err != nil {
Expand Down Expand Up @@ -46,7 +47,7 @@ func WriteToS3(ctx context.Context, name string, data []byte, _ os.FileMode) err

}

func ReadFromS3(ctx context.Context, name string) ([]byte, error) {
func ReadFromS3(name string) ([]byte, error) {

bucket, key, err := parseS3Path(name)
if err != nil {
Expand Down
17 changes: 8 additions & 9 deletions internal/pkg/heimdall/cluster_dal.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package heimdall

import (
"context"
"database/sql"
_ "embed"
"encoding/json"
Expand Down Expand Up @@ -78,13 +77,13 @@ var (
getClustersMethod = telemetry.NewMethod("db_connection", "get_clusters")
)

func (h *Heimdall) submitCluster(ctx context.Context, c *cluster.Cluster) (any, error) {
func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) {

if err := h.clusterUpsert(c); err != nil {
return nil, err
}

return h.getCluster(ctx, &cluster.Cluster{Object: object.Object{ID: c.ID}})
return h.getCluster(&cluster.Cluster{Object: object.Object{ID: c.ID}})

}

Expand Down Expand Up @@ -135,7 +134,7 @@ func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error {

}

func (h *Heimdall) getCluster(ctx context.Context, c *cluster.Cluster) (any, error) {
func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) {

// Track DB connection for get cluster operation
defer getClusterMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -182,7 +181,7 @@ func (h *Heimdall) getCluster(ctx context.Context, c *cluster.Cluster) (any, err

}

func (h *Heimdall) getClusterStatus(ctx context.Context, c *cluster.Cluster) (any, error) {
func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) {

// Track DB connection for cluster status operation
defer getClusterStatusMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -217,7 +216,7 @@ func (h *Heimdall) getClusterStatus(ctx context.Context, c *cluster.Cluster) (an

}

func (h *Heimdall) updateClusterStatus(ctx context.Context, c *cluster.Cluster) (any, error) {
func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) {

defer updateClusterStatusMethod.RecordLatency(time.Now())
updateClusterStatusMethod.CountRequest()
Expand All @@ -240,11 +239,11 @@ func (h *Heimdall) updateClusterStatus(ctx context.Context, c *cluster.Cluster)
}

updateClusterStatusMethod.CountSuccess()
return h.getClusterStatus(ctx, c)
return h.getClusterStatus(c)

}

func (h *Heimdall) getClusters(ctx context.Context, f *database.Filter) (any, error) {
func (h *Heimdall) getClusters(f *database.Filter) (any, error) {

// Track DB connection for clusters list operation
defer getClustersMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -296,7 +295,7 @@ func (h *Heimdall) getClusters(ctx context.Context, f *database.Filter) (any, er

}

func (h *Heimdall) getClusterStatuses(ctx context.Context, _ *database.Filter) (any, error) {
func (h *Heimdall) getClusterStatuses(_ *database.Filter) (any, error) {

return database.GetSlice(h.Database, queryClusterStatusesSelect)

Expand Down
17 changes: 8 additions & 9 deletions internal/pkg/heimdall/command_dal.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package heimdall

import (
"context"
"database/sql"
_ "embed"
"encoding/json"
Expand Down Expand Up @@ -90,13 +89,13 @@ var (
getCommandsMethod = telemetry.NewMethod("db_connection", "get_commands")
)

func (h *Heimdall) submitCommand(ctx context.Context, c *command.Command) (any, error) {
func (h *Heimdall) submitCommand(c *command.Command) (any, error) {

if err := h.commandUpsert(c); err != nil {
return nil, err
}

return h.getCommand(ctx, &command.Command{Object: object.Object{ID: c.ID}})
return h.getCommand(&command.Command{Object: object.Object{ID: c.ID}})

}

Expand Down Expand Up @@ -164,7 +163,7 @@ func (h *Heimdall) commandUpsert(c *command.Command) error {

}

func (h *Heimdall) getCommand(ctx context.Context, c *command.Command) (any, error) {
func (h *Heimdall) getCommand(c *command.Command) (any, error) {

// Track DB connection for get command operation
defer getCommandMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -211,7 +210,7 @@ func (h *Heimdall) getCommand(ctx context.Context, c *command.Command) (any, err

}

func (h *Heimdall) getCommandStatus(ctx context.Context, c *command.Command) (any, error) {
func (h *Heimdall) getCommandStatus(c *command.Command) (any, error) {

// Track DB connection for command status operation
defer getCommandStatusMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -246,7 +245,7 @@ func (h *Heimdall) getCommandStatus(ctx context.Context, c *command.Command) (an

}

func (h *Heimdall) updateCommandStatus(ctx context.Context, c *command.Command) (any, error) {
func (h *Heimdall) updateCommandStatus(c *command.Command) (any, error) {

// Track DB connection for command status update operation
defer updateCommandStatusMethod.RecordLatency(time.Now())
Expand All @@ -270,11 +269,11 @@ func (h *Heimdall) updateCommandStatus(ctx context.Context, c *command.Command)
}

updateCommandStatusMethod.CountSuccess()
return h.getCommandStatus(ctx, c)
return h.getCommandStatus(c)

}

func (h *Heimdall) getCommands(ctx context.Context, f *database.Filter) (any, error) {
func (h *Heimdall) getCommands(f *database.Filter) (any, error) {

// Track DB connection for commands list operation
defer getCommandsMethod.RecordLatency(time.Now())
Expand Down Expand Up @@ -326,7 +325,7 @@ func (h *Heimdall) getCommands(ctx context.Context, f *database.Filter) (any, er

}

func (h *Heimdall) getCommandStatuses(ctx context.Context, _ *database.Filter) (any, error) {
func (h *Heimdall) getCommandStatuses(_ *database.Filter) (any, error) {

return database.GetSlice(h.Database, queryCommandStatusesSelect)

Expand Down
5 changes: 2 additions & 3 deletions internal/pkg/heimdall/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package heimdall

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -49,7 +48,7 @@ func writeAPIError(w http.ResponseWriter, err error, obj any) {
w.Write(responseJSON)
}

func payloadHandler[T any](fn func(context.Context, *T) (any, error)) http.HandlerFunc {
func payloadHandler[T any](fn func(*T) (any, error)) http.HandlerFunc {
// start latency timer
defer payloadHandlerMethod.RecordLatency(time.Now())

Expand Down Expand Up @@ -82,7 +81,7 @@ func payloadHandler[T any](fn func(context.Context, *T) (any, error)) http.Handl
}

// execute request
result, err := fn(r.Context(), &payload)
result, err := fn(&payload)
if err != nil {
writeAPIError(w, err, result)
return
Expand Down
3 changes: 1 addition & 2 deletions internal/pkg/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"github.com/patterninc/heimdall/internal/pkg/database"
"github.com/patterninc/heimdall/internal/pkg/janitor"
"github.com/patterninc/heimdall/internal/pkg/pool"
"github.com/patterninc/heimdall/internal/pkg/rbac"
"github.com/patterninc/heimdall/internal/pkg/server"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/command"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"
"github.com/patterninc/heimdall/internal/pkg/rbac"
Comment on lines 18 to +23
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import order has been changed inconsistently. The rbac import was moved from line 17 (before server) to line 23 (after plugin). This breaks the conventional Go import grouping where internal packages should be grouped together. The import should remain in its original position before the server import.

Suggested change
"github.com/patterninc/heimdall/internal/pkg/server"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/command"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"
"github.com/patterninc/heimdall/internal/pkg/rbac"
"github.com/patterninc/heimdall/internal/pkg/rbac"
"github.com/patterninc/heimdall/internal/pkg/server"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/command"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"

Copilot uses AI. Check for mistakes.
rbacI "github.com/patterninc/heimdall/pkg/rbac"
)

Expand Down Expand Up @@ -173,7 +173,6 @@ func (h *Heimdall) Start() error {
// job(s) endpoints...
apiRouter.Methods(methodGET).PathPrefix(`/job/statuses`).HandlerFunc(payloadHandler(h.getJobStatuses))
apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/status`).HandlerFunc(payloadHandler(h.getJobStatus))
apiRouter.Methods(methodPOST).PathPrefix(`/job/{id}/cancel`).HandlerFunc(payloadHandler(h.cancelJob))
apiRouter.Methods(methodGET).PathPrefix(`/job/{id}/{file}`).HandlerFunc(h.getJobFile)
apiRouter.Methods(methodGET).PathPrefix(`/job/{id}`).HandlerFunc(payloadHandler(h.getJob))
apiRouter.Methods(methodGET).PathPrefix(`/jobs`).HandlerFunc(payloadHandler(h.getJobs))
Expand Down
Loading