go-worker provides a simple way to manage and execute prioritized tasks concurrently, backed by a TaskManager with a worker pool and a priority queue.
Stop()removed. UseStopGraceful(ctx)orStopNow().- Local result streaming uses
SubscribeResults(buffer);GetResults()/StreamResults()are removed. RegisterTasksnow returns an error.Task.ExecutereplacesFnin examples.NewGRPCServerrequires a handler map.- Rate limiting is deterministic: burst is
min(maxWorkers, maxTasks)andExecuteTaskuses the shared limiter.
- Task prioritization: tasks are scheduled by priority.
- Concurrent execution: tasks run in a worker pool with strict rate limiting.
- Middleware: wrap the
TaskManagerfor logging/metrics, etc. - Results: fan-out subscriptions via
SubscribeResults. - Cancellation: cancel tasks before or during execution.
- Retries: exponential backoff with capped delays.
flowchart LR
Client[Client code] -->|register tasks| TaskManager
TaskManager --> Queue[Priority Queue]
Queue -->|dispatch| Worker1[Worker]
Queue -->|dispatch| WorkerN[Worker]
Worker1 --> Results[Result Broadcaster]
WorkerN --> Results
go-worker exposes its functionality over gRPC through the WorkerService.
The service allows clients to register tasks, stream results, cancel running
tasks and query their status.
The server registers handlers keyed by name. Each handler consists of a Make function that constructs the expected payload type, and a Fn function that executes the task logic using the unpacked payload.
Clients send a Task message containing a name and a serialized payload using google.protobuf.Any. The server automatically unpacks the Any payload into the correct type based on the registered handler and passes it to the corresponding function.
handlers := map[string]worker.HandlerSpec{
"create_user": {
Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
p := payload.(*workerpb.CreateUserPayload)
return &workerpb.CreateUserResponse{UserId: "1234"}, nil
},
},
}
srv := worker.NewGRPCServer(tm, handlers)For production, configure TLS credentials and interceptors (logging/auth) on the gRPC server; see __examples/grpc for a complete setup.
You can enforce authentication/authorization at the gRPC boundary with WithGRPCAuth.
Return a gRPC status error to control the response code (e.g., Unauthenticated or PermissionDenied).
auth := func(ctx context.Context, method string, _ any) error {
md, _ := metadata.FromIncomingContext(ctx)
values := md.Get("authorization")
if len(values) == 0 {
return status.Error(codes.Unauthenticated, "missing token")
}
token := strings.TrimSpace(strings.TrimPrefix(values[0], "Bearer "))
if token != "expected-token" {
return status.Error(codes.Unauthenticated, "missing or invalid token")
}
return nil
}
srv := worker.NewGRPCServer(tm, handlers, worker.WithGRPCAuth(auth))Note on deadlines: When the client uses a stream context with a deadline, exceeding the deadline will terminate the stream but does not cancel the tasks running on the server. To properly handle cancellation, use separate contexts for task execution or cancel tasks explicitly.
tm := worker.NewTaskManagerWithDefaults(context.Background())
handlers := map[string]worker.HandlerSpec{
"create_user": {
Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
p := payload.(*workerpb.CreateUserPayload)
return &workerpb.CreateUserResponse{UserId: "1234"}, nil
},
},
}
srv := worker.NewGRPCServer(tm, handlers)
gs := grpc.NewServer()
workerpb.RegisterWorkerServiceServer(gs, srv)
// listen and serve ...
client := workerpb.NewWorkerServiceClient(conn)
// register a task with payload
payload, err := anypb.New(&workerpb.CreateUserPayload{
Username: "newuser",
Email: "newuser@example.com",
})
if err != nil {
log.Fatal(err)
}
_, _ = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{
Tasks: []*workerpb.Task{
{
Name: "create_user",
Payload: payload,
CorrelationId: uuid.NewString(),
IdempotencyKey: "create_user:newuser@example.com",
Metadata: map[string]string{"source": "api_example", "role": "admin"},
},
},
})
// cancel by id
_, _ = client.CancelTask(ctx, &workerpb.CancelTaskRequest{Id: "<task-id>"})
// get task information
res, _ := client.GetTask(ctx, &workerpb.GetTaskRequest{Id: "<task-id>"})
fmt.Println(res.Status)tm := worker.NewTaskManager(context.Background(), 2, 10, 5, 30*time.Second, time.Second, 3)
task := &worker.Task{
ID: uuid.New(),
Priority: 1,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) { return "hello", nil },
}
if err := tm.RegisterTask(context.Background(), task); err != nil {
log.Fatal(err)
}
results, cancel := tm.SubscribeResults(1)
res := <-results
cancel()
fmt.Println(res.Result)By default, full subscriber buffers drop new results. You can change the policy:
tm.SetResultsDropPolicy(worker.DropOldest)Create a new TaskManager by calling the NewTaskManager() function with the following parameters:
ctxis the base context for the task manager (used for shutdown and derived task contexts)maxWorkersis the number of workers to start. If <= 0, it will default to the number of available CPUsmaxTasksis the maximum number of queued tasks, defaults to 10tasksPerSecondis the rate limit of tasks that can be executed per second. If <= 0, rate limiting is disabled (the limiter uses a burst size ofmin(maxWorkers, maxTasks)for deterministic throttling)timeoutis the default timeout for tasks, defaults to 5 minutesretryDelayis the default delay between retries, defaults to 1 secondmaxRetriesis the default maximum number of retries, defaults to 3 (0 disables retries)
tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 30*time.Second, 1*time.Second, 3)Optional retention can be configured to prevent unbounded task registry growth:
tm.SetRetentionPolicy(worker.RetentionPolicy{
TTL: 24 * time.Hour,
MaxEntries: 100000,
})Task lifecycle hooks can be configured for structured logging or tracing:
tm.SetHooks(worker.TaskHooks{
OnQueued: func(task *worker.Task) {
// log enqueue
},
OnStart: func(task *worker.Task) {
// log start
},
OnFinish: func(task *worker.Task, status worker.TaskStatus, _ any, err error) {
// log completion
_ = err
_ = status
},
})Tracing hooks can be configured with a tracer implementation:
tm.SetTracer(myTracer)Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.
id := uuid.New()
task := &worker.Task{
ID: id,
Name: "Some task",
Description: "Here goes the description of the task",
Priority: 10,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) {
time.Sleep(time.Second)
return fmt.Sprintf("task %s executed", id), nil
},
Retries: 3,
RetryDelay: 2 * time.Second,
}
task2 := &worker.Task{
ID: uuid.New(),
Priority: 10,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) { return "Hello, World!", nil },
}
if err := tm.RegisterTasks(context.Background(), task, task2); err != nil {
log.Fatal(err)
}Use StopGraceful to stop accepting new tasks and wait for completion, or StopNow to cancel tasks immediately.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tm.StopGraceful(ctx)
// or
// tm.StopNow()Subscribe to results with a dedicated channel per subscriber.
results, cancel := tm.SubscribeResults(10)
ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelWait()
_ = tm.Wait(ctx)
cancel()
for res := range results {
fmt.Println(res)
}You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.
_ = tm.CancelTask(task.ID)You can cancel all tasks by calling the CancelAll() method of the TaskManager struct.
tm.CancelAll()You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.
srv := worker.RegisterMiddleware[worker.Service](tm,
func(next worker.Service) worker.Service {
return middleware.NewLoggerMiddleware(next, logger)
},
)package main
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
worker "github.com/hyp3rd/go-worker"
"github.com/hyp3rd/go-worker/middleware"
)
func main() {
tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 3*time.Second, 30*time.Second, 3)
var srv worker.Service = worker.RegisterMiddleware[worker.Service](tm,
func(next worker.Service) worker.Service {
return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
},
)
task := &worker.Task{
ID: uuid.New(),
Priority: 1,
Ctx: context.Background(),
Execute: func(ctx context.Context, _ ...any) (any, error) {
return 2 + 5, nil
},
}
_ = srv.RegisterTasks(context.Background(), task)
results, cancel := srv.SubscribeResults(10)
defer cancel()
ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelWait()
_ = srv.Wait(ctx)
for res := range results {
fmt.Println(res)
}
}This project follows Semantic Versioning.
We welcome contributions! Fork the repository, create a feature branch, run the linters and tests, then open a pull request.
To propose new ideas, open an issue using the Feature request template.
Issues labeled good first issue or help wanted are ideal starting points for new contributors.
See CHANGELOG for the history of released versions.
This project is licensed under the MIT License - see the LICENSE file for details.