Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion internal/pkg/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package database
import (
"context"
"database/sql"
"time"

"github.com/babourine/x/pkg/set"
"github.com/hladush/go-telemetry/pkg/telemetry"
_ "github.com/lib/pq"
)

Expand All @@ -13,7 +15,8 @@ const (
)

var (
ctx = context.Background()
ctx = context.Background()
newSessionMethod = telemetry.NewMethod("db_connection", "new_session")
)

type Database struct {
Expand All @@ -28,22 +31,42 @@ type Session struct {

func (d *Database) NewSession(withTransaction bool) (*Session, error) {

// Track session creation metrics
transactionLabel := "false"
if withTransaction {
transactionLabel = "true"
}

defer newSessionMethod.RecordLatency(time.Now(), "with_transaction", transactionLabel)
newSessionMethod.CountRequest("with_transaction", transactionLabel)

var err error

s := &Session{}

// Track database connection creation
connectionStart := time.Now()
newSessionMethod.CountRequest("with_transaction", transactionLabel)

// open connection
if s.db, err = sql.Open(dbDriverName, d.ConnectionString); err != nil {
newSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel)
return nil, err
}

newSessionMethod.RecordLatency(connectionStart, "with_transaction", transactionLabel)
newSessionMethod.CountSuccess("with_transaction", transactionLabel)

// start transaction
if withTransaction {
if s.trx, err = s.db.BeginTx(ctx, nil); err != nil {
s.db.Close() // Close the connection before returning error
newSessionMethod.LogAndCountError(err, "with_transaction", transactionLabel)
return nil, err
}
}

newSessionMethod.CountSuccess("with_transaction", transactionLabel)
return s, nil

}
Expand Down
18 changes: 18 additions & 0 deletions internal/pkg/database/slice.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
package database

import (
"time"

"github.com/hladush/go-telemetry/pkg/telemetry"
)

var (
getSliceMethod = telemetry.NewMethod("db_connection", "get_slice")
)

func GetSlice(db *Database, query string) (any, error) {

// Track DB connection for slice query operation
defer getSliceMethod.RecordLatency(time.Now())
getSliceMethod.CountRequest()

// open connection
sess, err := db.NewSession(false)
if err != nil {
getSliceMethod.LogAndCountError(err, "new_session")
return nil, err
}
defer sess.Close()

rows, err := sess.Query(query)
if err != nil {
getSliceMethod.LogAndCountError(err, "query")
return nil, err
}
defer rows.Close()
Expand All @@ -21,13 +37,15 @@ func GetSlice(db *Database, query string) (any, error) {

var item any
if err := rows.Scan(&item); err != nil {
getSliceMethod.LogAndCountError(err, "scan")
return nil, err
}

result = append(result, item)

}

getSliceMethod.CountSuccess()
return result, nil

}
48 changes: 45 additions & 3 deletions internal/pkg/heimdall/cluster_dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
_ "embed"
"encoding/json"
"fmt"
"time"

"github.com/hladush/go-telemetry/pkg/telemetry"
_ "github.com/lib/pq"

"github.com/patterninc/heimdall/internal/pkg/database"
Expand Down Expand Up @@ -67,7 +69,12 @@ var (
)

var (
ErrUnknownClusterID = fmt.Errorf(`unknown cluster_id`)
ErrUnknownClusterID = fmt.Errorf(`unknown cluster_id`)
upsertClusterMethod = telemetry.NewMethod("db_connection", "upsert_cluster")
getClusterMethod = telemetry.NewMethod("db_connection", "get_cluster")
getClusterStatusMethod = telemetry.NewMethod("db_connection", "get_cluster_status")
updateClusterStatusMethod = telemetry.NewMethod("db_connection", "update_cluster_status")
getClustersMethod = telemetry.NewMethod("db_connection", "get_clusters")
)

func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) {
Expand All @@ -82,9 +89,14 @@ func (h *Heimdall) submitCluster(c *cluster.Cluster) (any, error) {

func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error {

// Track DB connection for cluster upsert operation
defer upsertClusterMethod.RecordLatency(time.Now())
upsertClusterMethod.CountRequest()

// open connection
sess, err := h.Database.NewSession(true)
if err != nil {
upsertClusterMethod.LogAndCountError(err, "new_session")
return err
}
defer sess.Close()
Expand Down Expand Up @@ -112,15 +124,26 @@ func (h *Heimdall) clusterUpsert(c *cluster.Cluster) error {
}
}

return sess.Commit()
if err := sess.Commit(); err != nil {
upsertClusterMethod.LogAndCountError(err, "commit")
return err
}

upsertClusterMethod.CountSuccess()
return nil

}

func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) {

// Track DB connection for get cluster operation
defer getClusterMethod.RecordLatency(time.Now())
getClusterMethod.CountRequest()

// open connection
sess, err := h.Database.NewSession(false)
if err != nil {
getClusterMethod.LogAndCountError(err, "new_session")
return nil, err
}
defer sess.Close()
Expand All @@ -142,25 +165,32 @@ func (h *Heimdall) getCluster(c *cluster.Cluster) (any, error) {
if err := row.Scan(&r.SystemID, &r.Status, &r.Name, &r.Version, &r.Description, &clusterContext,
&r.User, &r.CreatedAt, &r.UpdatedAt); err != nil {
if err == sql.ErrNoRows {
return nil, ErrUnknownCommandID
return nil, ErrUnknownClusterID
} else {
return nil, err
}
}

if err := clusterParseContextAndTags(r, clusterContext, sess); err != nil {
getClusterMethod.LogAndCountError(err, "cluster_parse_context_and_tags")
return nil, err
}

getClusterMethod.CountSuccess()
return r, nil

}

func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) {

// Track DB connection for cluster status operation
defer getClusterStatusMethod.RecordLatency(time.Now())
getClusterStatusMethod.CountRequest()

// open connection
sess, err := h.Database.NewSession(false)
if err != nil {
getClusterStatusMethod.LogAndCountError(err, "new_session")
return nil, err
}
defer sess.Close()
Expand All @@ -181,15 +211,20 @@ func (h *Heimdall) getClusterStatus(c *cluster.Cluster) (any, error) {
}
}

getClusterStatusMethod.CountSuccess()
return r, nil

}

func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) {

defer updateClusterStatusMethod.RecordLatency(time.Now())
updateClusterStatusMethod.CountRequest()

// open connection
sess, err := h.Database.NewSession(false)
if err != nil {
updateClusterStatusMethod.LogAndCountError(err, "new_session")
return nil, err
}
defer sess.Close()
Expand All @@ -203,15 +238,21 @@ func (h *Heimdall) updateClusterStatus(c *cluster.Cluster) (any, error) {
return nil, ErrUnknownClusterID
}

updateClusterStatusMethod.CountSuccess()
return h.getClusterStatus(c)

}

func (h *Heimdall) getClusters(f *database.Filter) (any, error) {

// Track DB connection for clusters list operation
defer getClustersMethod.RecordLatency(time.Now())
getClustersMethod.CountRequest()

// open connection
sess, err := h.Database.NewSession(false)
if err != nil {
getClustersMethod.LogAndCountError(err, "new_session")
return nil, err
}
defer sess.Close()
Expand Down Expand Up @@ -247,6 +288,7 @@ func (h *Heimdall) getClusters(f *database.Filter) (any, error) {

}

getClustersMethod.CountSuccess()
return &resultset{
Data: result,
}, nil
Expand Down
Loading
Loading