Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions internal/strategy/git/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -262,20 +261,30 @@ func (s *Strategy) handleBundleRequest(w http.ResponseWriter, r *http.Request, h
return
}

bundleData, err := s.createBundle(ctx, repo, base)
bundleFile, err := s.createBundle(ctx, repo, base)
if err != nil {
logger.WarnContext(ctx, "Failed to create bundle", "upstream", upstreamURL, "base", base, "error", err)
http.Error(w, "Bundle not available", http.StatusNotFound)
return
}

// Cache for future requests from any pod.
s.cacheBundleAsync(ctx, bKey, bundleData)
defer bundleFile.Close()

w.Header().Set("Content-Type", "application/x-git-bundle")
w.Header().Set("Content-Length", strconv.Itoa(len(bundleData)))
if _, err := w.Write(bundleData); err != nil { //nolint:gosec // bundleData is a git bundle generated from a trusted local mirror
logger.WarnContext(ctx, "Failed to write bundle response", "upstream", upstreamURL, "error", err)

// Stream to client and cache simultaneously.
cacheHeaders := http.Header{"Content-Type": {"application/x-git-bundle"}}
wc, cacheErr := s.cache.Create(ctx, bKey, cacheHeaders, bundleCacheTTL)
if cacheErr != nil {
if _, err := io.Copy(w, bundleFile); err != nil {
logger.WarnContext(ctx, "Failed to stream bundle", "upstream", upstreamURL, "error", err)
}
return
}
if _, err := io.Copy(io.MultiWriter(w, wc), bundleFile); err != nil {
logger.WarnContext(ctx, "Failed to stream bundle", "upstream", upstreamURL, "error", err)
}
if err := wc.Close(); err != nil {
logger.WarnContext(ctx, "Failed to close bundle cache writer", "upstream", upstreamURL, "error", err)
}
}

Expand All @@ -294,12 +303,13 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW
go func() {
bgCtx := context.WithoutCancel(ctx)
logger := logging.FromContext(bgCtx)
bundleData, err := s.createBundle(bgCtx, repo, snapshotCommit)
bundleFile, err := s.createBundle(bgCtx, repo, snapshotCommit)
if err != nil {
logger.WarnContext(bgCtx, "Failed to pre-generate bundle", "upstream", upstreamURL, "error", err)
return
}
s.cacheBundleSync(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleData)
defer bundleFile.Close()
s.cacheBundle(bgCtx, bundleCacheKey(upstreamURL, snapshotCommit), bundleFile)
}()
}

Expand All @@ -310,21 +320,15 @@ func (s *Strategy) serveSnapshotWithBundle(ctx context.Context, w http.ResponseW

const bundleCacheTTL = 2 * time.Hour

func (s *Strategy) cacheBundleAsync(ctx context.Context, key cache.Key, data []byte) {
go func() {
s.cacheBundleSync(context.WithoutCancel(ctx), key, data)
}()
}

func (s *Strategy) cacheBundleSync(ctx context.Context, key cache.Key, data []byte) {
func (s *Strategy) cacheBundle(ctx context.Context, key cache.Key, r io.Reader) {
logger := logging.FromContext(ctx)
headers := http.Header{"Content-Type": {"application/x-git-bundle"}}
wc, err := s.cache.Create(ctx, key, headers, bundleCacheTTL)
if err != nil {
logger.WarnContext(ctx, "Failed to cache bundle", "error", err)
return
}
if _, err := wc.Write(data); err != nil {
if _, err := io.Copy(wc, r); err != nil {
logger.WarnContext(ctx, "Failed to write bundle to cache", "error", err)
_ = wc.Close()
return
Expand All @@ -348,7 +352,9 @@ func (s *Strategy) getMirrorHead(ctx context.Context, repo *gitclone.Repository)
return head
}

func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, baseCommit string) ([]byte, error) {
// createBundle returns an open file containing the bundle. The file is already
// removed from the filesystem; the caller must close it when done.
func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository, baseCommit string) (*os.File, error) {
// No read lock needed: git bundle create reads objects through git's own
// file-level locking, safe to run concurrently with fetches.
headRef := "HEAD"
Expand All @@ -361,22 +367,25 @@ func (s *Strategy) createBundle(ctx context.Context, repo *gitclone.Repository,
return nil, errors.Wrap(err, "create bundle temp file")
}
bundlePath := tmpFile.Name()
defer os.Remove(bundlePath) //nolint:errcheck
if err := tmpFile.Close(); err != nil {
_ = os.Remove(bundlePath) // #nosec G703 -- bundlePath is from os.CreateTemp
return nil, errors.Wrap(err, "close bundle temp file")
}

cmd := exec.CommandContext(ctx, "git", "-C", repo.Path(), "bundle", "create", //nolint:gosec // baseCommit is a SHA string from rev-parse
bundlePath, headRef, "^"+baseCommit)
if output, err := cmd.CombinedOutput(); err != nil {
_ = os.Remove(bundlePath) // #nosec G703 -- bundlePath is from os.CreateTemp
return nil, errors.Wrapf(err, "git bundle create: %s", string(output))
}

data, err := os.ReadFile(bundlePath) //nolint:gosec // bundlePath is a temp file we created
f, err := os.Open(bundlePath) // #nosec G304 G703 -- bundlePath is from os.CreateTemp
if err != nil {
return nil, errors.Wrap(err, "read bundle file")
_ = os.Remove(bundlePath) // #nosec G703 -- bundlePath is from os.CreateTemp
return nil, errors.Wrap(err, "open bundle file")
}
return data, nil
_ = os.Remove(bundlePath) // #nosec G703 -- fd keeps data alive until closed
return f, nil
}

// serveSnapshotWithSpool handles snapshot cache misses using the spool pattern.
Expand Down