Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions pkg/log/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func (r *Reader) readAvailableTaskLogs(tr *v1.TaskRun) (<-chan Log, <-chan error
return logC, errC, nil
}

func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step, pod *pods.Pod, follow, timestamps bool) {
func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step, podRef *pods.Pod, pod *corev1.Pod, follow, timestamps bool) {
for _, step := range steps {
if !follow && !step.hasStarted() {
continue
}

container := pod.Container(step.container)
container := podRef.Container(step.container)
containerLogC, containerLogErrC, err := container.LogReader(follow, timestamps).Read()
if err != nil {
errC <- fmt.Errorf("error in getting logs for step %s: %s", step.name, err)
Expand All @@ -153,10 +153,16 @@ func (r *Reader) readStepsLogs(logC chan<- Log, errC chan<- error, steps []*step
}
}

if err := container.Status(); err != nil {
errC <- err
return
err = pods.CheckFailedContainers(pod, []string{step.container})
if follow {
err = podRef.CheckFailedContainers([]string{step.container})
}
if err == nil {
continue
}

errC <- err
return
}
}

Expand Down Expand Up @@ -206,7 +212,7 @@ func (r *Reader) readPodLogs(podC <-chan string, podErrC <-chan error, follow, t
errC <- fmt.Errorf("no steps found for task %s", r.task)
continue
}
r.readStepsLogs(logC, errC, steps, p, follow, timestamps)
r.readStepsLogs(logC, errC, steps, p, pod, follow, timestamps)
}
}()

Expand Down
106 changes: 106 additions & 0 deletions pkg/log/task_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package log

import (
"testing"

"github.com/tektoncd/cli/pkg/cli"
podsfake "github.com/tektoncd/cli/pkg/pods/fake"
"github.com/tektoncd/cli/pkg/test"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestReadPodLogs_stopsAfterFailedStep(t *testing.T) {
const (
ns = "ns"
podName = "pod"
)

pods := []*corev1.Pod{{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: ns,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "step-first"},
{Name: "step-second"},
},
},
Status: corev1.PodStatus{
ContainerStatuses: []corev1.ContainerStatus{
{
Name: "step-first",
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 137,
Reason: "OOMKilled",
},
},
},
{
Name: "step-second",
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 0,
},
},
},
},
},
}}

cs, _ := test.SeedV1beta1TestData(t, test.Data{Pods: pods})
reader := &Reader{
ns: ns,
clients: &cli.Clients{Kube: cs.Kube},
streamer: podsfake.Streamer(podsfake.Logs(
podsfake.Task(podName,
podsfake.Step("step-first", "first-log"),
podsfake.Step("step-second", "second-log"),
),
)),
task: "task",
}

podC := make(chan string, 1)
podC <- podName
close(podC)

logC, errC := reader.readPodLogs(podC, nil, false, false)

var logs []Log
var errs []error
for logC != nil || errC != nil {
select {
case l, ok := <-logC:
if !ok {
logC = nil
continue
}
logs = append(logs, l)
case err, ok := <-errC:
if !ok {
errC = nil
continue
}
errs = append(errs, err)
}
}

if len(logs) != 2 {
t.Fatalf("expected first step log and EOF only, got %#v", logs)
}
if logs[0].Step != "first" || logs[0].Log != "first-log" {
t.Fatalf("unexpected first log: %#v", logs[0])
}
if logs[1].Step != "first" || logs[1].Log != "EOFLOG" {
t.Fatalf("unexpected EOF log: %#v", logs[1])
}
if len(errs) != 1 {
t.Fatalf("expected one error, got %#v", errs)
}
if errs[0] == nil || errs[0].Error() == "" {
t.Fatalf("expected non-empty error, got %#v", errs[0])
}
}
36 changes: 33 additions & 3 deletions pkg/pipelinerun/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pipelinerun
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
informers "github.com/tektoncd/pipeline/pkg/client/informers/externalversions"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -223,13 +225,21 @@ func GetTaskRunsWithStatus(pr *v1.PipelineRun, c *cli.Clients, ns string) (map[s
return map[string]*v1.PipelineRunTaskRunStatus{}, nil
}

taskRunsByName, err := getTaskRunsByPipelineRun(pr.Name, c, ns)
if err != nil && !canFallbackToTaskRunGet(err) {
return nil, err
}

trStatuses := make(map[string]*v1.PipelineRunTaskRunStatus)
for _, cr := range pr.Status.ChildReferences {
//TODO: Needs to handle Run, CustomRun later
if cr.Kind == "TaskRun" {
tr, err := taskrunpkg.GetTaskRun(taskrunGroupResource, c, cr.Name, ns)
if err != nil {
return nil, err
tr, ok := taskRunsByName[cr.Name]
if !ok {
tr, err = taskrunpkg.GetTaskRun(taskrunGroupResource, c, cr.Name, ns)
if err != nil {
return nil, err
}
}

trStatuses[cr.Name] = &v1.PipelineRunTaskRunStatus{
Expand All @@ -242,3 +252,23 @@ func GetTaskRunsWithStatus(pr *v1.PipelineRun, c *cli.Clients, ns string) (map[s

return trStatuses, nil
}

func getTaskRunsByPipelineRun(prName string, c *cli.Clients, ns string) (map[string]*v1.TaskRun, error) {
var taskRuns v1.TaskRunList
if err := actions.ListV1(taskrunGroupResource, c, metav1.ListOptions{
LabelSelector: fmt.Sprintf("tekton.dev/pipelineRun=%s", prName),
}, ns, &taskRuns); err != nil {
return nil, err
}

taskRunsByName := make(map[string]*v1.TaskRun, len(taskRuns.Items))
for i := range taskRuns.Items {
taskRunsByName[taskRuns.Items[i].Name] = &taskRuns.Items[i]
}

return taskRunsByName, nil
}

func canFallbackToTaskRunGet(err error) bool {
return apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err)
}
73 changes: 73 additions & 0 deletions pkg/pipelinerun/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
pipelinetest "github.com/tektoncd/pipeline/test"
"github.com/tektoncd/pipeline/test/diff"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -266,3 +267,75 @@ func TestTracker_watchErrorHandler(t *testing.T) {
})
}
}

func TestGetTaskRunsWithStatus_fallsBackWhenListForbidden(t *testing.T) {
const (
ns = "namespace"
prName = "output-pipeline-1"
trName = "output-task-1"
task = "output-task-1"
pod = "output-task-1-pod"
)

pr := &v1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: prName,
Namespace: ns,
},
Status: v1.PipelineRunStatus{
PipelineRunStatusFields: v1.PipelineRunStatusFields{
ChildReferences: []v1.ChildStatusReference{{
Name: trName,
PipelineTaskName: task,
TypeMeta: runtime.TypeMeta{
APIVersion: "tekton.dev/v1",
Kind: "TaskRun",
},
}},
},
},
}
tr := &v1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: trName,
Namespace: ns,
},
Status: v1.TaskRunStatus{
TaskRunStatusFields: v1.TaskRunStatusFields{
PodName: pod,
},
},
}

cs, _ := test.SeedTestData(t, pipelinetest.Data{TaskRuns: []*v1.TaskRun{tr}})
cs.Pipeline.Resources = cb.APIResourceList("v1", []string{"taskrun", "pipelinerun"})

tdc := testDynamic.Options{
PrependReactors: []testDynamic.PrependOpt{{
Verb: "list",
Resource: "taskruns",
Action: func(_ k8stest.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewForbidden(taskrunGroupResource.GroupResource(), "", errors.New("forbidden"))
},
}},
}
dynamic, err := tdc.Client(cb.UnstructuredTR(tr, "v1"))
if err != nil {
t.Fatalf("unable to create dynamic client: %v", err)
}

clients := &cli.Clients{
Tekton: cs.Pipeline,
Kube: cs.Kube,
Dynamic: dynamic,
}

trStatuses, err := GetTaskRunsWithStatus(pr, clients, ns)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if trStatuses[trName] == nil || trStatuses[trName].Status == nil || trStatuses[trName].Status.PodName != pod {
t.Fatalf("unexpected taskrun statuses: %#v", trStatuses)
}
}
86 changes: 49 additions & 37 deletions pkg/pods/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,7 @@ type Container struct {
}

func (c *Container) Status() error {
pod, err := c.pod.Get()
if err != nil {
return err
}

container := c.name
for _, cs := range pod.Status.ContainerStatuses {
if cs.Name != container {
continue
}

if cs.State.Terminated != nil && cs.State.Terminated.ExitCode == 1 {
msg := ""

if cs.State.Terminated.Reason != "" && cs.State.Terminated.Reason != "Error" {
msg = msg + " : " + cs.State.Terminated.Reason
}

if cs.State.Terminated.Message != "" && cs.State.Terminated.Message != "Error" {
msg = msg + " : " + cs.State.Terminated.Message
}

return fmt.Errorf("container %s has failed %s", container, msg)
}
}

for _, cs := range pod.Status.InitContainerStatuses {
if cs.Name != container {
continue
}

if cs.State.Terminated != nil && cs.State.Terminated.ExitCode == 1 {
return fmt.Errorf("container %s has failed: %s", container, cs.State.Terminated.Reason)
}
}

return nil
return c.pod.CheckFailedContainers([]string{c.name})
}

// Log represents one log message from a pod
Expand Down Expand Up @@ -128,3 +92,51 @@ func (lr *LogReader) Read() (<-chan Log, <-chan error, error) {

return logC, errC, nil
}

func (p *Pod) CheckFailedContainers(containerNames []string) error {
pod, err := p.Get()
if err != nil {
return err
}

return CheckFailedContainers(pod, containerNames)
}

func CheckFailedContainers(pod *corev1.Pod, containerNames []string) error {
containerSet := map[string]struct{}{}
for _, containerName := range containerNames {
containerSet[containerName] = struct{}{}
}

for _, cs := range pod.Status.ContainerStatuses {
if _, ok := containerSet[cs.Name]; !ok {
continue
}

if cs.State.Terminated != nil && cs.State.Terminated.ExitCode != 0 {
msg := ""

if cs.State.Terminated.Reason != "" && cs.State.Terminated.Reason != "Error" {
msg += " : " + cs.State.Terminated.Reason
}

if cs.State.Terminated.Message != "" && cs.State.Terminated.Message != "Error" {
msg += " : " + cs.State.Terminated.Message
}

return fmt.Errorf("container %s has failed %s", cs.Name, msg)
}
}

for _, cs := range pod.Status.InitContainerStatuses {
if _, ok := containerSet[cs.Name]; !ok {
continue
}

if cs.State.Terminated != nil && cs.State.Terminated.ExitCode != 0 {
return fmt.Errorf("container %s has failed: %s", cs.Name, cs.State.Terminated.Reason)
}
}

return nil
}
Loading
Loading