Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/gcs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,12 @@ func main() {
break
}

// Drop every container stdio ConnSlot. Relay goroutines park inside
// ConnSlot.Write until the host re-attaches stdio with a fresh
// connection; producing processes pause naturally when their kernel
// pipe buffers fill, preserving in-flight bytes.
h.DisconnectAllStdio()

logrus.WithError(serveErr).Warn("bridge connection lost, will reconnect")
time.Sleep(reconnectInterval)
}
Expand Down
21 changes: 18 additions & 3 deletions internal/guest/runtime/hcsv2/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ const (
type Container struct {
id string

vsock transport.Transport
logPath string // path to [logFile].
logFile *os.File // file to redirect container's stdio to.
vsock transport.Transport
slotRegistry slotRegistry
logPath string // path to [logFile].
logFile *os.File // file to redirect container's stdio to.

spec *oci.Spec
ociBundlePath string
Expand Down Expand Up @@ -81,6 +82,14 @@ type Container struct {
sandboxRoot string
}

// slotRegistry is the narrow seam Container uses to register the stdio
// ConnSlots produced by stdio.Connect with the parent Host so the bridge
// reconnect loop can disconnect them after live migration. Defined here so
// container.go does not depend on the concrete *Host type.
type slotRegistry interface {
RegisterStdioSlots(*stdio.ConnectionSet)
}

func (c *Container) Start(ctx context.Context, conSettings stdio.ConnectionSettings) (_ int, err error) {
entity := log.G(ctx).WithField(logfields.ContainerID, c.id)
entity.Info("opengcs::Container::Start")
Expand Down Expand Up @@ -116,6 +125,9 @@ func (c *Container) Start(ctx context.Context, conSettings stdio.ConnectionSetti
if err != nil {
return -1, err
}
if c.slotRegistry != nil {
c.slotRegistry.RegisterStdioSlots(stdioSet)
}

if c.initProcess.spec.Terminal {
ttyr := c.container.Tty()
Expand All @@ -140,6 +152,9 @@ func (c *Container) ExecProcess(ctx context.Context, process *oci.Process, conSe
if err != nil {
return -1, err
}
if c.slotRegistry != nil {
c.slotRegistry.RegisterStdioSlots(stdioSet)
}

// Add in the core rlimit specified on the container in case there was one set. This makes it so that execed processes can also generate
// core dumps.
Expand Down
135 changes: 135 additions & 0 deletions internal/guest/runtime/hcsv2/stdio_slots_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//go:build linux

package hcsv2

import (
"errors"
"io"
"os"
"sync"
"testing"

"github.com/Microsoft/hcsshim/internal/guest/stdio"
"github.com/Microsoft/hcsshim/internal/guest/transport"
)

// stubConn is a minimal transport.Connection used to exercise
// Host.RegisterStdioSlots / DisconnectAllStdio without real sockets.
type stubConn struct {
mu sync.Mutex
closed bool
}

func (c *stubConn) Read(p []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return 0, io.EOF
}
return 0, nil
}

func (c *stubConn) Write(p []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return 0, io.ErrClosedPipe
}
return len(p), nil
}

func (c *stubConn) Close() error {
c.mu.Lock()
c.closed = true
c.mu.Unlock()
return nil
}

func (c *stubConn) CloseRead() error { return c.Close() }
func (c *stubConn) CloseWrite() error { return nil }
func (c *stubConn) File() (*os.File, error) { return nil, errors.New("no file") }
func (c *stubConn) isClosed() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.closed
}

var _ transport.Connection = (*stubConn)(nil)

// Host satisfies the slotRegistry contract used by Container.
var _ slotRegistry = (*Host)(nil)

func TestHost_RegisterStdioSlots_TracksSlots(t *testing.T) {
h := &Host{}
c1, c2, c3 := &stubConn{}, &stubConn{}, &stubConn{}
set := &stdio.ConnectionSet{
In: stdio.NewConnSlot(c1, nil),
Out: stdio.NewConnSlot(c2, nil),
Err: stdio.NewConnSlot(c3, nil),
}
h.RegisterStdioSlots(set)

if got, want := len(h.stdioSlots), 3; got != want {
t.Fatalf("stdioSlots len = %d, want %d", got, want)
}
}

func TestHost_RegisterStdioSlots_IgnoresNilAndNonSlot(t *testing.T) {
h := &Host{}
// Mixed set: only Out is a ConnSlot; In is nil; Err is a non-slot.
set := &stdio.ConnectionSet{
Out: stdio.NewConnSlot(&stubConn{}, nil),
Err: &stubConn{},
}
h.RegisterStdioSlots(set)

if got, want := len(h.stdioSlots), 1; got != want {
t.Fatalf("stdioSlots len = %d, want %d", got, want)
}
}

func TestHost_RegisterStdioSlots_NilSet_NoOp(t *testing.T) {
h := &Host{}
h.RegisterStdioSlots(nil) // must not panic
if len(h.stdioSlots) != 0 {
t.Fatalf("nil set must register nothing, got %d", len(h.stdioSlots))
}
}

func TestHost_DisconnectAllStdio_ClosesEveryUnderlyingConn(t *testing.T) {
h := &Host{}
conns := []*stubConn{{}, {}, {}}
for _, c := range conns {
h.RegisterStdioSlots(&stdio.ConnectionSet{Out: stdio.NewConnSlot(c, nil)})
}

h.DisconnectAllStdio()

for i, c := range conns {
if !c.isClosed() {
t.Fatalf("conns[%d] not closed by DisconnectAllStdio", i)
}
}
}

func TestHost_RegisterStdioSlots_CompactsClosedSlots(t *testing.T) {
h := &Host{}
live := stdio.NewConnSlot(&stubConn{}, nil)
dead := stdio.NewConnSlot(&stubConn{}, nil)
_ = dead.Close()

h.RegisterStdioSlots(&stdio.ConnectionSet{Out: live})
h.RegisterStdioSlots(&stdio.ConnectionSet{Out: dead})

// dead is registered but should compact away on the next register call.
h.RegisterStdioSlots(&stdio.ConnectionSet{Out: stdio.NewConnSlot(&stubConn{}, nil)})

for _, s := range h.stdioSlots {
if !s.IsAlive() {
t.Fatal("compaction did not drop closed slot")
}
}
if got := len(h.stdioSlots); got != 2 {
t.Fatalf("after compact want 2 live slots, got %d", got)
}
}
69 changes: 69 additions & 0 deletions internal/guest/runtime/hcsv2/uvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type VirtualPod struct {
type Host struct {
containersMutex sync.Mutex
containers map[string]*Container
stdioSlots []*stdio.ConnSlot

externalProcessesMutex sync.Mutex
externalProcesses map[int]*externalProcess
Expand Down Expand Up @@ -205,6 +206,72 @@ func (h *Host) Transport() transport.Transport {
return h.vsock
}

// RegisterStdioSlots tracks per-process stdio so the bridge reconnect loop
// can disconnect them after live migration. Called from container Start,
// ExecProcess, and runExternalProcess after stdio.Connect. Any
// *stdio.ConnSlot in the set is added to the registry; nil entries and
// other transport.Connection types are ignored. Already-closed slots are
// compacted out on each call to bound the slice's growth.
func (h *Host) RegisterStdioSlots(set *stdio.ConnectionSet) {
if set == nil {
return
}
incoming := make([]*stdio.ConnSlot, 0, 3)
for _, c := range []transport.Connection{set.In, set.Out, set.Err} {
if slot, ok := c.(*stdio.ConnSlot); ok && slot != nil {
incoming = append(incoming, slot)
}
}
if len(incoming) == 0 {
return
}
h.containersMutex.Lock()
defer h.containersMutex.Unlock()
h.stdioSlots = compactStdioSlots(h.stdioSlots)
h.stdioSlots = append(h.stdioSlots, incoming...)
}

// DisconnectAllStdio drops the current connection on every tracked stdio
// slot. Called from the GCS reconnect loop after the bridge connection is
// lost. Relays park inside slot.Write until the host re-attaches stdio with
// a fresh connection; the producing process pauses naturally when its
// kernel pipe buffer fills.
//
// Each slot's Disconnect is wrapped in a recover so a single bad slot
// cannot break the loop and leave the rest of the container stdio without
// back pressure.
func (h *Host) DisconnectAllStdio() {
h.containersMutex.Lock()
h.stdioSlots = compactStdioSlots(h.stdioSlots)
slots := append([]*stdio.ConnSlot(nil), h.stdioSlots...)
h.containersMutex.Unlock()
for _, s := range slots {
func() {
defer func() {
if r := recover(); r != nil {
logrus.WithField("panic", r).Error("ConnSlot: Disconnect panicked")
}
}()
s.Disconnect()
}()
}
}

// compactStdioSlots returns a new slice with closed slots filtered out so
// the registry does not grow unbounded over the UVM lifetime.
func compactStdioSlots(slots []*stdio.ConnSlot) []*stdio.ConnSlot {
if len(slots) == 0 {
return slots[:0]
}
out := slots[:0]
for _, s := range slots {
if s.IsAlive() {
out = append(out, s)
}
}
return out
}

func (h *Host) RemoveContainer(id string) {
h.containersMutex.Lock()
defer h.containersMutex.Unlock()
Expand Down Expand Up @@ -406,6 +473,7 @@ func (h *Host) CreateContainer(ctx context.Context, id string, settings *prot.VM
c := &Container{
id: id,
vsock: h.vsock,
slotRegistry: h,
spec: settings.OCISpecification,
ociBundlePath: settings.OCIBundlePath,
isSandbox: criType == "sandbox",
Expand Down Expand Up @@ -1077,6 +1145,7 @@ func (h *Host) runExternalProcess(
if err != nil {
return -1, err
}
h.RegisterStdioSlots(stdioSet)
defer func() {
if err != nil {
stdioSet.Close()
Expand Down
32 changes: 25 additions & 7 deletions internal/guest/stdio/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type ConnectionSettings struct {

// Connect returns new transport.Connection instances, one for each stdio pipe
// to be used. If CreateStd*Pipe for a given pipe is false, the given Connection
// is set to nil.
// is set to nil. Each connection is wrapped in a ConnSlot so the underlying
// vsock can be replaced when the bridge reconnects after live migration.
func Connect(tport transport.Transport, settings ConnectionSettings) (_ *ConnectionSet, err error) {
connSet := &ConnectionSet{}
defer func() {
Expand All @@ -28,25 +29,42 @@ func Connect(tport transport.Transport, settings ConnectionSettings) (_ *Connect
}
}()
if settings.StdIn != nil {
c, err := tport.Dial(*settings.StdIn)
port := *settings.StdIn
c, err := tport.Dial(port)
if err != nil {
return nil, errors.Wrap(err, "failed creating stdin Connection")
}
connSet.In = transport.NewLogConnection(c, *settings.StdIn)
connSet.In = NewConnSlot(transport.NewLogConnection(c, port), redialer(tport, port))
}
if settings.StdOut != nil {
c, err := tport.Dial(*settings.StdOut)
port := *settings.StdOut
c, err := tport.Dial(port)
if err != nil {
return nil, errors.Wrap(err, "failed creating stdout Connection")
}
connSet.Out = transport.NewLogConnection(c, *settings.StdOut)
connSet.Out = NewConnSlot(transport.NewLogConnection(c, port), redialer(tport, port))
}
if settings.StdErr != nil {
c, err := tport.Dial(*settings.StdErr)
port := *settings.StdErr
c, err := tport.Dial(port)
if err != nil {
return nil, errors.Wrap(err, "failed creating stderr Connection")
}
connSet.Err = transport.NewLogConnection(c, *settings.StdErr)
connSet.Err = NewConnSlot(transport.NewLogConnection(c, port), redialer(tport, port))
}
return connSet, nil
}

// redialer returns a callback that re-dials the given vsock port via the
// provided transport. Used by ConnSlot to recover from a bridge disconnect:
// after live migration the source-host listener is gone but the destination
// host has a fresh listener on the same port number.
func redialer(tport transport.Transport, port uint32) func() (transport.Connection, error) {
return func() (transport.Connection, error) {
nc, err := tport.Dial(port)
if err != nil {
return nil, err
}
return transport.NewLogConnection(nc, port), nil
}
}
Loading
Loading