Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 18 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,25 @@ func main() {

The options that can be passed to `pg.NewPool` are:

* `WithURL(string)` - Set connection parameters from a PostgreSQL URL in the format
* `pg.WithURL(string)` - Set connection parameters from a PostgreSQL URL in the format
`postgres://user:password@host:port/database?sslmode=disable`. Query parameters are
passed as additional connection options.
* `WithCredentials(string,string)` - Set connection pool username and password.
* `pg.WithCredentials(string, string)` - Set connection pool username and password.
If the database name is not set, then the username will be used as the default database name.
* `WithDatabase(string)` - Set the database name for the connection. If the user name is not set,
* `pg.WithDatabase(string)` - Set the database name for the connection. If the user name is not set,
then the database name will be used as the user name.
* `WithAddr(string)` - Set the address (host) or (host:port) for the connection
* `WithHostPort(string, string)` - Set the hostname and port for the
connection. If the port is not set, then the default port 5432 will be used.
* `WithSSLMode( string)` - Set the SSL connection mode. Valid values are
"disable", "allow", "prefer", "require", "verify-ca", "verify-full". See
* `pg.WithAddr(string)` - Set the address (host) or (host:port) for the connection.
* `pg.WithHostPort(string, string)` - Set the hostname and port for the connection. If the port
is not set, then the default port 5432 will be used.
* `pg.WithSSLMode(string)` - Set the SSL connection mode. Valid values are "disable", "allow",
"prefer", "require", "verify-ca", "verify-full". See
<https://www.postgresql.org/docs/current/libpq-ssl.html> for more information.
* `pg.WithTrace(pg.TraceFn)` - Set the trace function for the connection pool.
The signature of the trace unction is
`func(ctx context.Context, sql string, args any, err error)`
and is called for every query executed by the connection pool.
* `pg.WithBind(string,any)` - Set the bind variable to a value the
the lifetime of the connection.
* `pg.WithApplicationName(string)` - Set the application name for the connection.
This appears in `pg_stat_activity` and helps identify connections.
* `pg.WithSchemaSearchPath(string...)` - Set the schema search path for the connection.
* `pg.WithTrace(pg.TraceFn)` - Set the trace function for the connection pool.
The signature is `func(ctx context.Context, sql string, args any, err error)`.
* `pg.WithBind(string, any)` - Set a bind variable for the lifetime of the connection.

## Executing Statements

Expand Down Expand Up @@ -236,13 +236,13 @@ func (obj *MyList) Scan(row pg.Row) error {
if err := row.Scan(&name); err != nil {
return err
}
obj = append(obj, row.String())
obj.Names = append(obj.Names, name)
return nil
}

// ListReader - optional interface to scan count of all rows
func (obj MyList) Scan(row pg.Row) error {
return row.Scan(&obj.Count)
func (obj *MyList) ScanCount(row pg.Row) error {
return row.Scan(&obj.Count)
}

// Selector - select rows from database. Use bind variables
Expand All @@ -253,7 +253,7 @@ func (obj MyListRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
case pg.List:
return `SELECT name FROM mytable`, nil
default:
return "", fmt.Errorf("Unsupported operation: ",op)
return "", fmt.Errorf("unsupported operation: %v", op)
}
}
```
Expand Down Expand Up @@ -491,6 +491,4 @@ See [pkg/test/README.md](pkg/test/README.md) for documentation.

## PostgreSQL Manager

The `pkg/manager` package provides a comprehensive API for managing PostgreSQL server resources including roles, databases, schemas, tables, connections, replication slots, and more. It includes a REST API with Prometheus metrics.

See [pkg/manager/README.md](pkg/manager/README.md) for documentation.
105 changes: 105 additions & 0 deletions cmd/pgqueue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"context"
"fmt"
"net"
"os"
"os/signal"
"strconv"

// Packages
kong "github.com/alecthomas/kong"
client "github.com/mutablelogic/go-client"
httpclient "github.com/mutablelogic/go-pg/pkg/queue/httpclient"
)

///////////////////////////////////////////////////////////////////////////////
// TYPES

type Globals struct {
// Debug option
Debug bool `name:"debug" help:"Enable debug logging"`
Version kong.VersionFlag `name:"version" help:"Print version and exit"`

// HTTP server options
HTTP struct {
Prefix string `name:"prefix" help:"HTTP path prefix" default:"/api/v1"`
Addr string `name:"addr" env:"PGQUEUE_ADDR" help:"HTTP Listen address" default:":8080"`
} `embed:"" prefix:"http."`

// Private fields
ctx context.Context
cancel context.CancelFunc
}

type CLI struct {
Globals
NamespaceCommands
QueueCommands
TaskCommands
TickerCommands
ServerCommands
}

///////////////////////////////////////////////////////////////////////////////
// LIFECYCLE

func main() {
cli := new(CLI)
ctx := kong.Parse(cli,
kong.Name("pgqueue"),
kong.Description("pgqueue command line interface"),
kong.Vars{
"version": VersionJSON(),
},
kong.UsageOnError(),
kong.ConfigureHelp(kong.HelpOptions{
Compact: true,
}),
)

// Create the context and cancel function
cli.Globals.ctx, cli.Globals.cancel = signal.NotifyContext(context.Background(), os.Interrupt)
defer cli.Globals.cancel()

// Call the Run() method of the selected parsed command.
if err := ctx.Run(&cli.Globals); err != nil {
fmt.Fprintln(os.Stderr, "Error:", err)
os.Exit(1)
}
}

///////////////////////////////////////////////////////////////////////////////
// PRIVATE METHODS

func (g *Globals) Client() (*httpclient.Client, error) {
scheme := "http"
host, port, err := net.SplitHostPort(g.HTTP.Addr)
if err != nil {
return nil, err
}

// Default host to localhost if empty (e.g., ":8080")
if host == "" {
host = "localhost"
}

// Parse port
portn, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return nil, err
}
if portn == 443 {
scheme = "https"
}

// Client options
opts := []client.ClientOpt{}
if g.Debug {
opts = append(opts, client.OptTrace(os.Stderr, true))
}

// Create a client with the calculated endpoint
return httpclient.New(fmt.Sprintf("%s://%s:%v%s", scheme, host, portn, g.HTTP.Prefix), opts...)
}
34 changes: 34 additions & 0 deletions cmd/pgqueue/namespace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"fmt"
)

///////////////////////////////////////////////////////////////////////////////
// TYPES

type NamespaceCommands struct {
ListNamespace ListNamespaceCommand `cmd:"" name:"namespaces" help:"List namespaces." group:"QUEUE"`
}

type ListNamespaceCommand struct{}

///////////////////////////////////////////////////////////////////////////////
// COMMANDS

func (cmd *ListNamespaceCommand) Run(ctx *Globals) error {
client, err := ctx.Client()
if err != nil {
return err
}

// List namespaces
namespaces, err := client.ListNamespaces(ctx.ctx)
if err != nil {
return err
}

// Print
fmt.Println(namespaces)
return nil
}
145 changes: 145 additions & 0 deletions cmd/pgqueue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package main

import (
"fmt"
"time"

// Packages
httpclient "github.com/mutablelogic/go-pg/pkg/queue/httpclient"
schema "github.com/mutablelogic/go-pg/pkg/queue/schema"
)

///////////////////////////////////////////////////////////////////////////////
// TYPES

type QueueCommands struct {
ListQueue ListQueueCommand `cmd:"" name:"queues" help:"List queues." group:"QUEUE"`
GetQueue GetQueueCommand `cmd:"" name:"queue" help:"Get queue." group:"QUEUE"`
CreateQueue CreateQueueCommand `cmd:"" name:"create-queue" help:"Create queue." group:"QUEUE"`
DeleteQueue DeleteQueueCommand `cmd:"" name:"delete-queue" help:"Delete queue." group:"QUEUE"`
UpdateQueue UpdateQueueCommand `cmd:"" name:"update-queue" help:"Update queue." group:"QUEUE"`
}

type ListQueueCommand struct {
Offset uint64 `name:"offset" help:"Offset for pagination"`
Limit *uint64 `name:"limit" help:"Limit for pagination"`
}

type GetQueueCommand struct {
Name string `arg:"" name:"name" help:"Queue name"`
}

type CreateQueueCommand struct {
Name string `arg:"" name:"name" help:"Queue name"`
TTL *time.Duration `name:"ttl" help:"Time-to-live for queue messages"`
Retries *uint64 `name:"retries" help:"Number of retries before failing"`
RetryDelay *time.Duration `name:"retry-delay" help:"Backoff delay between retries"`
}

type DeleteQueueCommand struct {
Name string `arg:"" name:"name" help:"Queue name"`
}

type UpdateQueueCommand struct {
Name string `arg:"" name:"name" help:"Queue name"`
TTL *time.Duration `name:"ttl" help:"Time-to-live for queue messages"`
Retries *uint64 `name:"retries" help:"Number of retries before failing"`
RetryDelay *time.Duration `name:"retry-delay" help:"Backoff delay between retries"`
}

///////////////////////////////////////////////////////////////////////////////
// COMMANDS

func (cmd *ListQueueCommand) Run(ctx *Globals) error {
client, err := ctx.Client()
if err != nil {
return err
}

// List queues
queues, err := client.ListQueues(ctx.ctx, httpclient.WithOffsetLimit(cmd.Offset, cmd.Limit))
if err != nil {
return err
}

// Print
fmt.Println(queues)
return nil
}

func (cmd *GetQueueCommand) Run(ctx *Globals) error {
client, err := ctx.Client()
if err != nil {
return err
}

// Get one queue
queue, err := client.GetQueue(ctx.ctx, cmd.Name)
if err != nil {
return err
}

// Print
fmt.Println(queue)
return nil
}

func (cmd *CreateQueueCommand) Run(ctx *Globals) error {
client, err := ctx.Client()
if err != nil {
return err
}

// Create queue
queue, err := client.CreateQueue(ctx.ctx, schema.QueueMeta{
Queue: cmd.Name,
TTL: cmd.TTL,
Retries: cmd.Retries,
RetryDelay: cmd.RetryDelay,
})
if err != nil {
return err
}

// Print
fmt.Println(queue)
return nil
}

func (cmd *DeleteQueueCommand) Run(ctx *Globals) error {
client, err := ctx.Client()
if err != nil {
return err
}

// Delete queue
queue, err := client.DeleteQueue(ctx.ctx, cmd.Name)
if err != nil {
return err
}

// Print
fmt.Println(queue)
return nil
}

func (cmd *UpdateQueueCommand) Run(ctx *Globals) error {
client, err := ctx.Client()
if err != nil {
return err
}

// Update queue
queue, err := client.UpdateQueue(ctx.ctx, cmd.Name, schema.QueueMeta{
TTL: cmd.TTL,
Retries: cmd.Retries,
RetryDelay: cmd.RetryDelay,
})
if err != nil {
return err
}

// Print
fmt.Println(queue)
return nil
}
Loading