Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 124 additions & 67 deletions cmd/private-org-sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,27 @@ func withRetryOnNonzero(f gitFunc, retries int) gitFunc {
}
}

func withRetryOnTransientError(f gitFunc, retries int) gitFunc {
return func(logger *logrus.Entry, dir string, command ...string) (string, int, error) {
var out string
var exitCode int
var commandErr error
for attempt := 1; attempt <= retries; attempt++ {
out, exitCode, commandErr = f(logger, dir, command...)
if commandErr == nil && exitCode == 0 {
return out, exitCode, nil
}
if attempt < retries && isTransientNetworkError(out) {
logger.Infof("Transient network error, retrying git command (%d/%d)", attempt, retries)
time.Sleep(5 * time.Second)
continue
}
break
}
return out, exitCode, commandErr
}
}

func gitExec(logger *logrus.Entry, dir string, command ...string) (string, int, error) {
cmdLogger := logger.WithField("command", fmt.Sprintf("git %s", strings.Join(command, " ")))
cmd := exec.Command("git", command...)
Expand Down Expand Up @@ -319,6 +340,23 @@ func maybeTooShallow(pushOutput string) bool {
return false
}

func isTransientNetworkError(output string) bool {
patterns := []string{
"Could not resolve host",
"Failed to connect to",
"Connection timed out",
"Connection refused",
"Connection reset by peer",
"The requested URL returned error: 5",
}
for _, pattern := range patterns {
if strings.Contains(output, pattern) {
return true
}
}
return false
}

// location specifies a GitHub repository branch used as a source or destination
type location struct {
org, repo, branch string
Expand Down Expand Up @@ -364,58 +402,107 @@ func (g gitSyncer) initRepo(repoDir, org, repo string) error {
return nil
}

// mirror syncs content from source location to destination one, using a local
// repository in the given path. The `repoDir` must have been previously
// initialized via initRepo(). The git content from the `src` location will
// be fetched to this local repository and then pushed to the `dst` location.
// Multiple `mirror` calls over the same `repoDir` will reuse the content
// fetched in previous calls, acting like a cache.
func (g gitSyncer) mirror(repoDir string, src, dst location) error {
mirrorFields := logrus.Fields{
"source": src.String(),
"destination": dst.String(),
"local-repo": repoDir,
// syncRepo initializes a local git repo, fetches branch heads from both source
// and destination via ls-remote, and mirrors each branch that needs syncing.
func (g gitSyncer) syncRepo(org, repo, targetOrg, dstRepo string, branches []location) []error {
var errs []error
repoLogger := g.logger

gitDir, err := g.makeGitDir(org, repo)
if err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
return errs
}

if err := g.initRepo(gitDir, org, repo); err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
return errs
}
logger := g.logger.WithFields(mirrorFields)
logger.Info("Syncing content between locations")

// We ls-remote destination first thing because when it does not exist
// we do not need to do any of the remaining operations.
logger.Debug("Determining HEAD of destination branch")
destUrlRaw := fmt.Sprintf("%s/%s/%s", g.prefix, dst.org, dst.repo)
// ls-remote destination once per repo
destUrlRaw := fmt.Sprintf("%s/%s/%s", g.prefix, targetOrg, dstRepo)
destUrl, err := url.Parse(destUrlRaw)
if err != nil {
logger.WithField("remote-url", destUrlRaw).WithError(err).Error("Failed to construct URL for the destination remote")
return fmt.Errorf("failed to construct URL for the destination remote")
repoLogger.WithField("remote-url", destUrlRaw).WithError(err).Error("Failed to construct URL for the destination remote")
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: failed to construct URL for the destination remote", source.String()))
}
return errs
}
if g.token != "" {
destUrl.User = url.User(g.token)
}

dstHeads, err := getRemoteBranchHeads(logger, g.git, repoDir, destUrl.String())
dstHeads, err := getRemoteBranchHeads(repoLogger, g.git, gitDir, destUrl.String())
if err != nil {
message := "destination repository does not exist or we cannot access it"
if g.failOnNonexistentDst {
logger.Errorf("%s", message)
return fmt.Errorf("%s", message)
repoLogger.Errorf("%s", message)
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %s", source.String(), message))
}
} else {
repoLogger.Warn(message)
}
return errs
}

logger.Warn(message)
return nil
// ls-remote source once per repo
srcRemote := fmt.Sprintf("%s-%s", org, repo)
srcHeads, err := getRemoteBranchHeads(repoLogger, withRetryOnNonzero(g.git, 5), gitDir, srcRemote)
if err != nil {
repoLogger.WithError(err).Error("Failed to determine branch HEADs in source")
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: failed to determine branch HEADs in source", source.String()))
}
return errs
}

for _, source := range branches {
g.logger = config.LoggerForInfo(config.Info{
Metadata: api.Metadata{
Org: source.org,
Repo: source.repo,
Branch: source.branch,
},
})

destination := location{org: targetOrg, repo: dstRepo, branch: source.branch}

if err := g.mirror(gitDir, source, destination, srcHeads, dstHeads, destUrl); err != nil {
errs = append(errs, fmt.Errorf("%s->%s: %w", source.String(), destination.String(), err))
}
}

return errs
}

// mirror syncs a single branch from source to destination, using pre-fetched
// branch head information. The `repoDir` must have been previously initialized
// with git init and remote setup. The `srcHeads` and `dstHeads` must have been
// obtained from ls-remote calls against the source and destination repos.
// Multiple `mirror` calls over the same `repoDir` will reuse the content
// fetched in previous calls, acting like a cache.
func (g gitSyncer) mirror(repoDir string, src, dst location, srcHeads, dstHeads RemoteBranchHeads, destUrl *url.URL) error {
mirrorFields := logrus.Fields{
"source": src.String(),
"destination": dst.String(),
"local-repo": repoDir,
}
logger := g.logger.WithFields(mirrorFields)
logger.Info("Syncing content between locations")

dstCommitHash := dstHeads[dst.branch]

srcRemote := fmt.Sprintf("%s-%s", src.org, src.repo)

logger.Debug("Determining HEAD of source branch")
srcHeads, err := getRemoteBranchHeads(logger, withRetryOnNonzero(g.git, 5), repoDir, srcRemote)
if err != nil {
logger.WithError(err).Error("Failed to determine branch HEADs in source")
return fmt.Errorf("failed to determine branch HEADs in source")
}
srcCommitHash, ok := srcHeads[src.branch]
if !ok {
logger.WithError(err).Error("Branch does not exist in source remote")
logger.Error("Branch does not exist in source remote")
return fmt.Errorf("branch does not exist in source remote")
}

Expand Down Expand Up @@ -641,7 +728,7 @@ func main() {
token: token,
root: o.gitDir,
confirm: o.confirm,
git: gitExec,
git: withRetryOnTransientError(gitExec, 3),
failOnNonexistentDst: o.failOnNonexistentDst,
gitName: o.gitName,
gitEmail: o.gitEmail,
Expand Down Expand Up @@ -681,44 +768,14 @@ func main() {
}

for key, branches := range grouped {
gitDir, err := syncer.makeGitDir(key.org, key.repo)
if err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
continue
}
syncer.logger = logrus.WithFields(logrus.Fields{"org": key.org, "repo": key.repo})

syncer.logger = logrus.WithFields(logrus.Fields{
"org": key.org,
"repo": key.repo,
})
if err := syncer.initRepo(gitDir, key.org, key.repo); err != nil {
for _, source := range branches {
errs = append(errs, fmt.Errorf("%s: %w", source.String(), err))
}
continue
dstRepo := key.repo
if !flattenedOrgs.Has(key.org) {
dstRepo = fmt.Sprintf("%s-%s", key.org, key.repo)
}

for _, source := range branches {
syncer.logger = config.LoggerForInfo(config.Info{
Metadata: api.Metadata{
Org: source.org,
Repo: source.repo,
Branch: source.branch,
},
})

destination := source
destination.org = o.targetOrg
if !flattenedOrgs.Has(source.org) {
destination.repo = fmt.Sprintf("%s-%s", source.org, source.repo)
}

if err := syncer.mirror(gitDir, source, destination); err != nil {
errs = append(errs, fmt.Errorf("%s->%s: %w", source.String(), destination.String(), err))
}
}
errs = append(errs, syncer.syncRepo(key.org, key.repo, o.targetOrg, dstRepo, branches)...)
}

if len(errs) > 0 {
Expand Down
Loading