Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/ferryctl/cmd/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cmd
import (
"fmt"
"os"
"strconv"

"github.com/spf13/cobra"

Expand All @@ -37,15 +38,21 @@ func init() {
}

func delta(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Fprintf(os.Stderr, "Usage: delta [repoName]\n")
if len(args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: delta [repoName] [maxGenerate]\n")
return
}

maxGen, err := strconv.ParseInt(args[1], 10, 32)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid integer: %v\n", err)
return
}

client := libferry.NewClient(socketPath)
defer client.Close()

if err := client.DeltaRepo(args[0]); err != nil {
if err := client.DeltaRepo(args[0], int(maxGen)); err != nil {
fmt.Fprintf(os.Stderr, "Error while creating deltas: %v\n", err)
return
}
Expand Down
14 changes: 12 additions & 2 deletions src/ferryd/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,20 @@ func (s *Server) DeleteRepo(w http.ResponseWriter, r *http.Request, p httprouter
// DeltaRepo will handle remote requests for repository deltaing
func (s *Server) DeltaRepo(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
id := p.ByName("id")

req := libferry.DeltaPackagesRequest{}

if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

log.WithFields(log.Fields{
"id": id,
"id": id,
"maxGenerate": req.MaxGenerate,
}).Info("Repository delta requested")
s.jproc.PushJob(jobs.NewDeltaRepoJob(id))

s.jproc.PushJob(jobs.NewDeltaRepoJob(id, req.MaxGenerate))
}

// IndexRepo will handle remote requests for repository indexing
Expand Down
34 changes: 26 additions & 8 deletions src/ferryd/jobs/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"sort"
"strconv"

log "github.com/sirupsen/logrus"

Expand All @@ -35,43 +36,51 @@ type DeltaJobHandler struct {
packageName string
indexRepo bool
nDeltas int // Track how many deltas we actually produce
maxGenerate int // Limit how many deltas are generated per package
}

// NewDeltaJob will return a job suitable for adding to the job processor
func NewDeltaJob(repoID, packageID string) *JobEntry {
func NewDeltaJob(repoID, packageID string, maxGenerate int) *JobEntry {
return &JobEntry{
sequential: false,
Type: Delta,
Params: []string{repoID, packageID},
Params: []string{repoID, packageID, fmt.Sprintf("%d", maxGenerate)},
}
}

// NewDeltaIndexJob will return a new job for creating delta packages as well
// as scheduling an index operation when complete.
func NewDeltaIndexJob(repoID, packageID string) *JobEntry {
func NewDeltaIndexJob(repoID, packageID string, maxGenerate int) *JobEntry {
return &JobEntry{
sequential: false,
Type: DeltaIndex,
Params: []string{repoID, packageID},
Params: []string{repoID, packageID, fmt.Sprintf("%d", maxGenerate)},
}
}

// NewDeltaJobHandler will create a job handler for the input job and ensure it validates
func NewDeltaJobHandler(j *JobEntry, indexRepo bool) (*DeltaJobHandler, error) {
if len(j.Params) != 2 {
if len(j.Params) != 3 {
return nil, fmt.Errorf("job has invalid parameters")
}

maxGen, err := strconv.ParseInt(j.Params[2], 10, 32)
if err != nil {
return nil, err
}

return &DeltaJobHandler{
repoID: j.Params[0],
packageName: j.Params[1],
indexRepo: indexRepo,
nDeltas: 0,
maxGenerate: int(maxGen),
}, nil
}

// executeInternal is the common code shared in the delta jobs, and is
// split out to save duplication.
func (j *DeltaJobHandler) executeInternal(manager *core.Manager) error {
func (j *DeltaJobHandler) executeInternal(manager *core.Manager, maxGenerate int) error {
pkgs, err := manager.GetPackages(j.repoID, j.packageName)
if err != nil {
return err
Expand All @@ -89,8 +98,17 @@ func (j *DeltaJobHandler) executeInternal(manager *core.Manager) error {
sort.Sort(libeopkg.PackageSet(pkgs))
tip := pkgs[len(pkgs)-1]

// Determine the start index for deltas
startIndex := 0
if maxGenerate > 0 {
numHistorical := len(pkgs) - 1
if numHistorical > maxGenerate {
startIndex = numHistorical - maxGenerate
}
}

// Process all potential deltas
for i := 0; i < len(pkgs)-1; i++ {
for i := startIndex; i < len(pkgs)-1; i++ {
old := pkgs[i]
fields := log.Fields{
"old": old.GetID(),
Expand Down Expand Up @@ -188,7 +206,7 @@ func (j *DeltaJobHandler) includeDelta(manager *core.Manager, mapping *core.Delt

// Execute will delta the target package within the target repository.
func (j *DeltaJobHandler) Execute(_ *Processor, manager *core.Manager) error {
err := j.executeInternal(manager)
err := j.executeInternal(manager, j.maxGenerate)
if err != nil {
return err
}
Expand Down
21 changes: 15 additions & 6 deletions src/ferryd/jobs/delta_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package jobs

import (
"fmt"
"strconv"

log "github.com/sirupsen/logrus"

Expand All @@ -27,25 +28,33 @@ import (
// DeltaRepoJobHandler is responsible for delta'ing repositories and should only
// ever be used in sequential queues.
type DeltaRepoJobHandler struct {
repoID string
repoID string
maxGenerate int
}

// NewDeltaRepoJob will return a job suitable for adding to the job processor
func NewDeltaRepoJob(id string) *JobEntry {
func NewDeltaRepoJob(id string, maxGenerate int) *JobEntry {
return &JobEntry{
sequential: true,
Type: DeltaRepo,
Params: []string{id},
Params: []string{id, fmt.Sprintf("%d", maxGenerate)},
}
}

// NewDeltaRepoJobHandler will create a job handler for the input job and ensure it validates
func NewDeltaRepoJobHandler(j *JobEntry) (*DeltaRepoJobHandler, error) {
if len(j.Params) != 1 {
if len(j.Params) != 2 {
return nil, fmt.Errorf("job has invalid parameters")
}

generate, err := strconv.ParseInt(j.Params[1], 10, 32)
if err != nil {
return nil, err
}

return &DeltaRepoJobHandler{
repoID: j.Params[0],
repoID: j.Params[0],
maxGenerate: int(generate),
}, nil
}

Expand All @@ -72,7 +81,7 @@ func (j *DeltaRepoJobHandler) Execute(jproc *Processor, manager *core.Manager) e

// Fire off parallel delta jobs for every package in this repository
for _, name := range packageNames {
jproc.PushJob(NewDeltaJob(j.repoID, name))
jproc.PushJob(NewDeltaJob(j.repoID, name, j.maxGenerate))
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion src/ferryd/jobs/pull_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (j *PullRepoJobHandler) Execute(jproc *Processor, manager *core.Manager) er
// Create delta job in this repository on the changed names
// Don't cause indexing because it causes noise
for _, pkg := range changedNames {
jproc.PushJob(NewDeltaIndexJob(j.targetID, pkg))
jproc.PushJob(NewDeltaIndexJob(j.targetID, pkg, 0))
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion src/ferryd/jobs/transit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (j *TransitJobHandler) Execute(jproc *Processor, manager *core.Manager) err
if ent != nil {
return err
}
jproc.PushJob(NewDeltaIndexJob(repo, p.Name))
jproc.PushJob(NewDeltaIndexJob(repo, p.Name, 0))
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion src/ferryd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewServer() (*Server, error) {
// Repo management
router.GET("/api/v1/create/repo/:id", s.CreateRepo)
router.GET("/api/v1/remove/repo/:id", s.DeleteRepo)
router.GET("/api/v1/delta/repo/:id", s.DeltaRepo)
router.POST("/api/v1/delta/repo/:id", s.DeltaRepo)
router.GET("/api/v1/index/repo/:id", s.IndexRepo)

// Client sends us data
Expand Down
8 changes: 5 additions & 3 deletions src/libferry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,11 @@ func (c *Client) DeleteRepo(id string) error {
}

// DeltaRepo will attempt to reproduce deltas in the given repo
func (c *Client) DeltaRepo(id string) error {
uri := c.formURI("/api/v1/delta/repo/" + id)
return c.getBasicResponse(uri, &Response{})
func (c *Client) DeltaRepo(id string, maxGenerate int) error {
tq := DeltaPackagesRequest{
MaxGenerate: maxGenerate,
}
return c.postBasicResponse(c.formURI("api/v1/delta/repo/"+id), &tq, &Response{})
}

// IndexRepo will attempt to index a repository in the daemon
Expand Down
6 changes: 6 additions & 0 deletions src/libferry/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ type TrimPackagesRequest struct {
MaxKeep int `json:"maxPackages"`
}

// DeltaPackagesRequest is sent when generating deltas for a repository.
type DeltaPackagesRequest struct {
Response
MaxGenerate int `json:"maxPackages"`
}

// TimingInformation stores relevant timing stats on jobs so we can know what
// kind of latency we're dealing with, etc.
//
Expand Down