Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions configs/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pool:
# plugins location
plugin_directory: ./plugins

janitor:
finished_job_retention_days: 14

# auth plugin
auth:
plugin: ./plugins/auth_header.so
Expand Down
18 changes: 13 additions & 5 deletions internal/pkg/janitor/janitor.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package janitor

import (
"fmt"
"time"

"github.com/hladush/go-telemetry/pkg/telemetry"
"github.com/patterninc/heimdall/internal/pkg/database"
)

var (
startMethod = telemetry.NewMethod("Start", "Janitor")
)

type Janitor struct {
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
db *database.Database
Keepalive int `yaml:"keepalive,omitempty" json:"keepalive,omitempty"`
StaleJob int `yaml:"stale_job,omitempty" json:"stale_job,omitempty"`
FinishedJobRetentionDays int `yaml:"finished_job_retention_days,omitempty" json:"finished_job_retention_days,omitempty"`
db *database.Database
}

func (j *Janitor) Start(d *database.Database) error {
Expand All @@ -29,9 +34,12 @@ func (j *Janitor) Start(d *database.Database) error {
for {

if err := j.cleanupStaleJobs(); err != nil {
fmt.Println(`Janitor error:`, err)
startMethod.LogAndCountError(err, "cleanup_stale_jobs")
}

if err := j.cleanupFinishedJobs(); err != nil {
startMethod.LogAndCountError(err, "cleanup_finished_jobs")
}
time.Sleep(60 * time.Second)

}
Expand Down
74 changes: 74 additions & 0 deletions internal/pkg/janitor/job.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package janitor

import (
"database/sql"
_ "embed"
"fmt"
"time"

"github.com/patterninc/heimdall/internal/pkg/database"
)
Expand All @@ -15,6 +18,30 @@ var queryFailStaleJobs string
//go:embed queries/stale_jobs_delete.sql
var queryStaleJobsDelete string

//go:embed queries/old_jobs_cluster_tags_delete.sql
var queryOldJobsClusterTagsDelete string

//go:embed queries/old_jobs_command_tags_delete.sql
var queryOldJobsCommandTagsDelete string

//go:embed queries/old_jobs_tags_delete.sql
var queryOldJobsTagsDelete string

//go:embed queries/old_jobs_delete.sql
var queryOldJobsDelete string

//go:embed queries/old_job_biggest_id.sql
var queryOldJobsBiggestID string

var (
queriesForOldJobsCleanup = []string{
queryOldJobsClusterTagsDelete,
queryOldJobsCommandTagsDelete,
queryOldJobsTagsDelete,
queryOldJobsDelete,
}
)

func (j *Janitor) cleanupStaleJobs() error {

// let's find the jobs we'll be cleaning up...
Expand Down Expand Up @@ -74,3 +101,50 @@ func (j *Janitor) cleanupStaleJobs() error {
return nil

}

func (j *Janitor) cleanupFinishedJobs() error {
if j.FinishedJobRetentionDays == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if j.FinishedJobRetentionDays <= 0

return nil
}
// open session
sess, err := j.db.NewSession(false)
if err != nil {
return err
}
defer sess.Close()

retentionTimestamp := time.Now().AddDate(0, 0, -j.FinishedJobRetentionDays).Unix()

// get biggest ID of old jobs
row, err := sess.QueryRow(queryOldJobsBiggestID, retentionTimestamp)
if err != nil {
return fmt.Errorf("failed to get biggest ID of old jobs: %w", err)
}

var biggestID sql.NullInt64
if err := row.Scan(&biggestID); err != nil {
if err == sql.ErrNoRows {
return nil
}
return fmt.Errorf("failed to get biggest ID of old jobs: %w", err)
}

if !biggestID.Valid || biggestID.Int64 == 0 {
return nil
}

// remove old jobs data
for _, q := range queriesForOldJobsCleanup {
for {
affectedRows, err := sess.Exec(q, biggestID.Int64)
if err != nil {
return err
}
if affectedRows == 0 {
break
}
}
}

return nil
}
5 changes: 5 additions & 0 deletions internal/pkg/janitor/queries/old_job_biggest_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT system_job_id
FROM jobs
WHERE updated_at < $1
ORDER BY updated_at desc
LIMIT 1
7 changes: 7 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_cluster_tags_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DELETE FROM job_cluster_tags
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once we have the cutoff system_job_id... all these queries will become much simpler: delete from job_cluster_tags where system_job_id <= $1 limit 1000 and we keep running this query until changed rownum != 0.

PS the same comment on all other queries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we are not in the MySQL world :( Postgre doesn't support that
While there is no LIMIT clause for DELETE, it is possible to get a similar effect using the same method described in the documentation of UPDATE:

WITH delete_batch AS (
SELECT l.ctid FROM user_logs AS l
WHERE l.status = 'archived'
ORDER BY l.creation_date
FOR UPDATE
LIMIT 10000
)
DELETE FROM user_logs AS dl
USING delete_batch AS del
WHERE dl.ctid = del.ctid;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... this is sad :) CTE way looks a bit more readable than the subquery though :)

WHERE system_job_id IN (
SELECT system_job_id
FROM job_cluster_tags
WHERE system_job_id <= $1
LIMIT 100
);
8 changes: 8 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_command_tags_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

DELETE FROM job_command_tags
WHERE system_job_id IN (
SELECT system_job_id
FROM job_command_tags
WHERE system_job_id <= $1
LIMIT 100
);
8 changes: 8 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

DELETE FROM jobs
WHERE system_job_id IN (
SELECT system_job_id
FROM jobs
WHERE system_job_id <= $1
LIMIT 100
);
7 changes: 7 additions & 0 deletions internal/pkg/janitor/queries/old_jobs_tags_delete.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DELETE FROM job_tags
WHERE system_job_id IN (
SELECT system_job_id
FROM job_tags
WHERE system_job_id <= $1
LIMIT 100
);