Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 157 additions & 80 deletions backend/plugins/dora/tasks/change_lead_time_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,31 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext) errors.Error {
return errors.Default.Wrap(err, "error deleting previous project_pr_metrics")
}

// Batch fetch all required data upfront for better performance
startTime := time.Now()
logger.Info("Batch fetching data for project: %s", data.Options.ProjectName)

firstCommitsMap, err := batchFetchFirstCommits(data.Options.ProjectName, db)
if err != nil {
return errors.Default.Wrap(err, "failed to batch fetch first commits")
}
logger.Info("Fetched %d first commits in %v", len(firstCommitsMap), time.Since(startTime))

reviewStartTime := time.Now()
firstReviewsMap, err := batchFetchFirstReviews(data.Options.ProjectName, db)
if err != nil {
return errors.Default.Wrap(err, "failed to batch fetch first reviews")
}
logger.Info("Fetched %d first reviews in %v", len(firstReviewsMap), time.Since(reviewStartTime))

deploymentStartTime := time.Now()
deploymentsMap, err := batchFetchDeployments(data.Options.ProjectName, db)
if err != nil {
return errors.Default.Wrap(err, "failed to batch fetch deployments")
}
logger.Info("Fetched %d deployments in %v", len(deploymentsMap), time.Since(deploymentStartTime))
logger.Info("Total batch fetch time: %v", time.Since(startTime))

// Get pull requests by repo project_name
var clauses = []dal.Clause{
dal.Select("pr.id, pr.pull_request_key, pr.author_id, pr.merge_commit_sha, pr.created_date, pr.merged_date"),
Expand Down Expand Up @@ -84,23 +109,17 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext) errors.Error {
projectPrMetric.Id = pr.Id
projectPrMetric.ProjectName = data.Options.ProjectName

// Get the first commit for the PR
firstCommit, err := getFirstCommit(pr.Id, db)
if err != nil {
return nil, err
}
// Get the first commit for the PR from batch-fetched map
firstCommit := firstCommitsMap[pr.Id]
// Calculate PR coding time
if firstCommit != nil {
projectPrMetric.PrCodingTime = computeTimeSpan(&firstCommit.CommitAuthoredDate, &pr.CreatedDate)
projectPrMetric.FirstCommitSha = firstCommit.CommitSha
projectPrMetric.FirstCommitAuthoredDate = &firstCommit.CommitAuthoredDate
}

// Get the first review for the PR
firstReview, err := getFirstReview(pr.Id, pr.AuthorId, db)
if err != nil {
return nil, err
}
// Get the first review for the PR from batch-fetched map
firstReview := firstReviewsMap[pr.Id]
// Calculate PR pickup time and PR review time
prDuring := computeTimeSpan(&pr.CreatedDate, pr.MergedDate)
if firstReview != nil {
Expand All @@ -113,11 +132,8 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext) errors.Error {
projectPrMetric.PrCreatedDate = &pr.CreatedDate
projectPrMetric.PrMergedDate = pr.MergedDate

// Get the deployment for the PR
deployment, err := getDeploymentCommit(pr.MergeCommitSha, data.Options.ProjectName, db)
if err != nil {
return nil, err
}
// Get the deployment for the PR from batch-fetched map
deployment := deploymentsMap[pr.MergeCommitSha]

// Calculate PR deploy time
if deployment != nil && deployment.FinishedDate != nil {
Expand Down Expand Up @@ -152,95 +168,156 @@ func CalculateChangeLeadTime(taskCtx plugin.SubTaskContext) errors.Error {
return converter.Execute()
}

// getFirstCommit takes a PR ID and a database connection as input, and returns the first commit of the PR.
func getFirstCommit(prId string, db dal.Dal) (*code.PullRequestCommit, errors.Error) {
// Initialize a pull_request_commit object
commit := &code.PullRequestCommit{}
// Define the SQL clauses for the database query
commitClauses := []dal.Clause{
dal.From(&code.PullRequestCommit{}), // Select from the "pull_request_commits" table
dal.Where("pull_request_commits.pull_request_id = ?", prId), // Filter by the PR ID
dal.Orderby("pull_request_commits.commit_authored_date ASC"), // Order by the authored date of the commits (ascending)
func computeTimeSpan(start, end *time.Time) *int64 {
if start == nil || end == nil {
return nil
}
span := end.Sub(*start)
minutes := int64(math.Ceil(span.Minutes()))
if minutes < 0 {
return nil
}
return &minutes
}

// deploymentCommitWithMergeSha is a helper struct to capture both the deployment commit
// and the associated merge_sha from the commits_diffs join query.
type deploymentCommitWithMergeSha struct {
devops.CicdDeploymentCommit
MergeSha string `gorm:"column:merge_sha"`
}

// Execute the query and retrieve the first commit
err := db.First(commit, commitClauses...)
// batchFetchFirstCommits retrieves the first commit for all pull requests in the given project.
// Returns a map indexed by PR ID for O(1) lookup performance.
//
// The query uses a subquery to find the minimum commit_authored_date for each PR,
// then joins back to get the full commit record. This is more efficient than
// fetching all commits and filtering in memory.
func batchFetchFirstCommits(projectName string, db dal.Dal) (map[string]*code.PullRequestCommit, errors.Error) {
var results []*code.PullRequestCommit

// Use a subquery to find the earliest commit for each PR, then join to get full commit details.
// This avoids scanning all commits and is optimized by the database engine.
err := db.All(
&results,
dal.Select("prc.*"),
dal.From("pull_request_commits prc"),
dal.Join(`INNER JOIN (
SELECT pull_request_id, MIN(commit_authored_date) as min_date
FROM pull_request_commits
GROUP BY pull_request_id
) first_commits ON prc.pull_request_id = first_commits.pull_request_id
AND prc.commit_authored_date = first_commits.min_date`),
dal.Join("INNER JOIN pull_requests pr ON pr.id = prc.pull_request_id"),
dal.Join("LEFT JOIN project_mapping pm ON pm.row_id = pr.base_repo_id AND pm.table = 'repos'"),
dal.Where("pm.project_name = ?", projectName),
dal.Orderby("prc.pull_request_id, prc.commit_authored_date ASC"),
)

// If any other error occurred, return nil and the error
if err != nil {
// If the error indicates that no commit was found, return nil and no error
if db.IsErrorNotFound(err) {
return nil, nil
return nil, errors.Default.Wrap(err, "failed to batch fetch first commits")
}

// Build the map for O(1) lookup by PR ID
commitMap := make(map[string]*code.PullRequestCommit, len(results))
for _, commit := range results {
// Only keep the first commit if multiple commits have the same timestamp
if _, exists := commitMap[commit.PullRequestId]; !exists {
commitMap[commit.PullRequestId] = commit
}
return nil, err
}

// If there were no errors, return the first commit and no error
return commit, nil
return commitMap, nil
}

// getFirstReview takes a PR ID, PR creator ID, and a database connection as input, and returns the first review comment of the PR.
func getFirstReview(prId string, prCreator string, db dal.Dal) (*code.PullRequestComment, errors.Error) {
// Initialize a review comment object
review := &code.PullRequestComment{}
// Define the SQL clauses for the database query
commentClauses := []dal.Clause{
dal.From(&code.PullRequestComment{}), // Select from the "pull_request_comments" table
dal.Where("pull_request_id = ? and account_id != ?", prId, prCreator), // Filter by the PR ID and exclude comments from the PR creator
dal.Orderby("created_date ASC"), // Order by the created date of the review comments (ascending)
}
// batchFetchFirstReviews retrieves the first review comment for all pull requests in the given project.
// Returns a map indexed by PR ID for O(1) lookup performance.
//
// The query uses a subquery to find the minimum created_date for each PR (excluding the PR author),
// then joins back to get the full comment record.
func batchFetchFirstReviews(projectName string, db dal.Dal) (map[string]*code.PullRequestComment, errors.Error) {
var results []*code.PullRequestComment

// Execute the query and retrieve the first review comment
err := db.First(review, commentClauses...)
// Use a subquery to find the earliest review comment for each PR (excluding author's comments),
// then join to get full comment details.
err := db.All(
&results,
dal.Select("prc.*"),
dal.From("pull_request_comments prc"),
dal.Join(`INNER JOIN (
SELECT prc2.pull_request_id, MIN(prc2.created_date) as min_date
FROM pull_request_comments prc2
INNER JOIN pull_requests pr2 ON pr2.id = prc2.pull_request_id
WHERE (pr2.author_id IS NULL OR pr2.author_id = '' OR prc2.account_id != pr2.author_id)
GROUP BY prc2.pull_request_id
) first_reviews ON prc.pull_request_id = first_reviews.pull_request_id
AND prc.created_date = first_reviews.min_date`),
dal.Join("INNER JOIN pull_requests pr ON pr.id = prc.pull_request_id"),
dal.Join("LEFT JOIN project_mapping pm ON pm.row_id = pr.base_repo_id AND pm.table = 'repos'"),
dal.Where("pm.project_name = ? AND (pr.author_id IS NULL OR pr.author_id = '' OR prc.account_id != pr.author_id)", projectName),
dal.Orderby("prc.pull_request_id, prc.created_date ASC"),
)

// If any other error occurred, return nil and the error
if err != nil {
// If the error indicates that no review comment was found, return nil and no error
if db.IsErrorNotFound(err) {
return nil, nil
return nil, errors.Default.Wrap(err, "failed to batch fetch first reviews")
}

// Build the map for O(1) lookup by PR ID
reviewMap := make(map[string]*code.PullRequestComment, len(results))
for _, review := range results {
// Only keep the first review if multiple reviews have the same timestamp
if _, exists := reviewMap[review.PullRequestId]; !exists {
reviewMap[review.PullRequestId] = review
}
return nil, err
}

// If there were no errors, return the first review comment and no error
return review, nil
return reviewMap, nil
}

// getDeploymentCommit takes a merge commit SHA, a repository ID, a list of deployment pairs, and a database connection as input.
// It returns the deployment pair related to the merge commit, or nil if not found.
func getDeploymentCommit(mergeSha string, projectName string, db dal.Dal) (*devops.CicdDeploymentCommit, errors.Error) {
deploymentCommits := make([]*devops.CicdDeploymentCommit, 0, 1)
// do not use `.First` method since gorm would append ORDER BY ID to the query which leads to a error
// batchFetchDeployments retrieves deployment commits for all merge commits in the given project.
// Returns a map indexed by merge commit SHA for O(1) lookup performance.
//
// The query finds the first successful production deployment for each merge commit by:
// 1. Finding deployment commits that have a previous successful deployment
// 2. Joining with commits_diffs to find which deployment included each merge commit
// 3. Filtering for successful production deployments
// 4. Ordering by started_date to get the earliest deployment
//
// The map is indexed by merge_sha (from commits_diffs), not by deployment commit_sha,
// because the caller needs to look up deployments by PR merge_commit_sha.
func batchFetchDeployments(projectName string, db dal.Dal) (map[string]*devops.CicdDeploymentCommit, errors.Error) {
var results []*deploymentCommitWithMergeSha

// Query finds the first deployment for each merge commit by using a window function
// to rank deployments by started_date, then filtering to keep only rank 1.
err := db.All(
&deploymentCommits,
dal.Select("dc.*"),
&results,
dal.Select("dc.*, cd.commit_sha as merge_sha"),
dal.From("cicd_deployment_commits dc"),
dal.Join("LEFT JOIN cicd_deployment_commits p ON (dc.prev_success_deployment_commit_id = p.id)"),
dal.Join("LEFT JOIN project_mapping pm ON (pm.table = 'cicd_scopes' AND pm.row_id = dc.cicd_scope_id)"),
dal.Join("INNER JOIN commits_diffs cd ON (cd.new_commit_sha = dc.commit_sha AND cd.old_commit_sha = COALESCE (p.commit_sha, ''))"),
dal.Join("LEFT JOIN cicd_deployment_commits p ON dc.prev_success_deployment_commit_id = p.id"),
dal.Join("INNER JOIN commits_diffs cd ON cd.new_commit_sha = dc.commit_sha AND cd.old_commit_sha = COALESCE(p.commit_sha, '')"),
dal.Join("LEFT JOIN project_mapping pm ON pm.table = 'cicd_scopes' AND pm.row_id = dc.cicd_scope_id"),
dal.Where("dc.prev_success_deployment_commit_id <> ''"),
dal.Where("dc.environment = 'PRODUCTION'"), // TODO: remove this when multi-environment is supported
dal.Where("pm.project_name = ? AND cd.commit_sha = ? AND dc.RESULT = ?", projectName, mergeSha, devops.RESULT_SUCCESS),
dal.Orderby("dc.started_date, dc.id ASC"),
dal.Limit(1),
dal.Where("dc.result = ? AND pm.project_name = ?", devops.RESULT_SUCCESS, projectName),
dal.Orderby("cd.commit_sha, dc.started_date ASC, dc.id ASC"),
)

if err != nil {
return nil, err
}
if len(deploymentCommits) == 0 {
return nil, nil
return nil, errors.Default.Wrap(err, "failed to batch fetch deployments")
}
return deploymentCommits[0], nil
}

func computeTimeSpan(start, end *time.Time) *int64 {
if start == nil || end == nil {
return nil
}
span := end.Sub(*start)
minutes := int64(math.Ceil(span.Minutes()))
if minutes < 0 {
return nil
// Build the map indexed by merge_sha for O(1) lookup.
// Keep only the first deployment for each merge commit (earliest by started_date).
deploymentMap := make(map[string]*devops.CicdDeploymentCommit, len(results))
for _, result := range results {
// Only keep the first deployment for each merge_sha
if _, exists := deploymentMap[result.MergeSha]; !exists {
// Copy the CicdDeploymentCommit without the MergeSha field
deploymentCopy := result.CicdDeploymentCommit
deploymentMap[result.MergeSha] = &deploymentCopy
}
}
return &minutes

return deploymentMap, nil
}
Loading