Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion assets/databases/heimdall/tables/jobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ create table if not exists jobs

alter table jobs add column if not exists store_result_sync boolean not null default false;
alter table jobs add column if not exists cancelled_by varchar(64) null;
update jobs set cancelled_by = '' where cancelled_by is null;
update jobs set cancelled_by = '' where cancelled_by is null;
alter table jobs add column if not exists cancellation_ctx jsonb null;
11 changes: 6 additions & 5 deletions internal/pkg/heimdall/heimdall.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Heimdall struct {
Janitor *janitor.Janitor `yaml:"janitor,omitempty" json:"janitor,omitempty"`
Version string `yaml:"-" json:"-"`
agentName string
commandHandlers map[string]plugin.Handler
commandHandlers map[string]*plugin.Handlers
}

func (h *Heimdall) Init() error {
Expand Down Expand Up @@ -96,7 +96,7 @@ func (h *Heimdall) Init() error {
rbacsByName[rbacName] = r
}

h.commandHandlers = make(map[string]plugin.Handler)
h.commandHandlers = make(map[string]*plugin.Handlers)

// process commands / add default values if missing, write commands to db
for _, c := range h.Commands {
Expand All @@ -112,11 +112,12 @@ func (h *Heimdall) Init() error {
return fmt.Errorf(formatErrUnknownPlugin, c.Plugin)
}

handler, err := pluginNew(c.Context)
handlers, err := pluginNew(c.Context)
if err != nil {
return err
}
h.commandHandlers[c.ID] = handler

h.commandHandlers[c.ID] = handlers

// let's record command in the database
if err := h.commandUpsert(c); err != nil {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (h *Heimdall) Init() error {
}

// start janitor
if err := h.Janitor.Start(h.Database); err != nil {
if err := h.Janitor.Start(h.Database, h.commandHandlers, h.Clusters); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/heimdall/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm
// Start plugin execution in goroutine
go func() {
defer close(cancelMonitorDone) // signal monitoring to stop
err := h.commandHandlers[command.ID](pluginCtx, runtime, j, cluster)
handlers := h.commandHandlers[command.ID]
err := handlers.Handler(pluginCtx, runtime, j, cluster)
jobDone <- err
}()

Expand Down
14 changes: 8 additions & 6 deletions internal/pkg/heimdall/plugins.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package heimdall

import (
"fmt"
"os"
"path"
"plugin"
Expand All @@ -16,9 +17,9 @@ const (
pluginExtensionLength = len(pluginExtension)
)

func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (hp.Handler, error), error) {
func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (*hp.Handlers, error), error) {

plugins := make(map[string]func(*context.Context) (hp.Handler, error))
plugins := make(map[string]func(*context.Context) (*hp.Handlers, error))

files, err := os.ReadDir(h.PluginsDirectory)
if err != nil {
Expand All @@ -35,11 +36,12 @@ func (h *Heimdall) loadPlugins() (map[string]func(*context.Context) (hp.Handler,
if err != nil {
return nil, err
}
// is it our plugin?
newPluginFunc, ok := newFunc.(func(*context.Context) (hp.Handler, error))
if ok {
plugins[stripExtension(file.Name())] = newPluginFunc
// plugins must return *Handlers
newPluginFunc, ok := newFunc.(func(*context.Context) (*hp.Handlers, error))
if !ok {
return nil, fmt.Errorf("plugin %s must return *plugin.Handlers", stripExtension(file.Name()))
}
plugins[stripExtension(file.Name())] = newPluginFunc
}
}

Expand Down
34 changes: 21 additions & 13 deletions internal/pkg/janitor/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,46 @@ import (
"time"

"github.com/patterninc/heimdall/internal/pkg/database"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/plugin"
)

const (
defaultJobLimit = 25
)

type Janitor struct {
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
db *database.Database
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
db *database.Database
commandHandlers map[string]*plugin.Handlers
clusters cluster.Clusters
}

func (j *Janitor) Start(d *database.Database) error {
func (j *Janitor) Start(d *database.Database, commandHandlers map[string]*plugin.Handlers, clusters cluster.Clusters) error {

// record database context
j.db = d
j.commandHandlers = commandHandlers
j.clusters = clusters

// let's run jobs cleanup once before we start it as a go routine
if err := j.cleanupStaleJobs(); err != nil {
return err
}

// start cleanup loop
go func() {

// start cleanup loops
runCleanupLoop := func(cleanupFn func() error) {
for {

if err := j.cleanupStaleJobs(); err != nil {
fmt.Println(`Janitor error:`, err)
if err := cleanupFn(); err != nil {
fmt.Printf("Janitor error: %v\n", err)
}

time.Sleep(60 * time.Second)

}
}

}()
go runCleanupLoop(j.cleanupStaleJobs)
go runCleanupLoop(j.cleanupCancellingJobs)

return nil

Expand Down
Loading