Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ func CommandContext(ctx context.Context, host cow.ProcessHost, name string, arg
return cmd
}

// Attach wires IO relays to a process the caller has already resolved.
// Counterpart of [Command] / [CommandContext] for the destination side
// of live migration: caller obtains `p` via the host's restore path
// (e.g. gcs.Container.OpenProcessWithIO) and Attach binds the
// process's stdio to the supplied destination streams.
func Attach(ctx context.Context, p cow.Process, stdin io.Reader, stdout, stderr io.Writer) (*Cmd, error) {
cmd := &Cmd{
Process: p,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Log: log.G(ctx).WithField("pid", p.Pid()),
Context: ctx,
ExitState: &ExitState{},
allDoneCh: make(chan struct{}),
CopyAfterExitTimeout: time.Second,
}
if err := cmd.startRelay(); err != nil {
return nil, err
}
return cmd, nil
}

// Start starts a command. The caller must ensure that if Start succeeds,
// Wait is eventually called to clean up resources.
func (c *Cmd) Start() error {
Expand Down Expand Up @@ -209,7 +232,13 @@ func (c *Cmd) Start() error {
c.Log = c.Log.WithField("pid", p.Pid())
}

// Start relaying process IO.
return c.startRelay()
}

// startRelay wires the IO relay goroutines and the context-cancel
// killer to [Cmd.Process].
func (c *Cmd) startRelay() error {
p := c.Process
stdin, stdout, stderr := p.Stdio()
if c.Stdin != nil {
// Do not make stdin part of the error group because there is no way for
Expand Down
56 changes: 56 additions & 0 deletions internal/cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func (p *localProcess) Pid() int {
return p.p.Pid
}

// MigrationState returns the zero value: the test fake doesn't use vsock
// or a GCS bridge.
func (p *localProcess) MigrationState() cow.MigrationState {
return cow.MigrationState{}
}

func (p *localProcess) ResizeConsole(ctx context.Context, x, y uint16) error {
return errors.New("not supported")
}
Expand Down Expand Up @@ -280,3 +286,53 @@ func TestCmdStuckIo(t *testing.T) {
t.Fatalf("expected: %v; got: %v", errIOTimeOut, err)
}
}

// TestCmdAttach verifies that Attach binds a Cmd to a caller-supplied
// process and the resulting Cmd can be Wait'd to completion. Mirrors
// the migration restore path: caller obtains the process via the
// host's restore API (e.g. gcs.OpenProcessWithIO) and Attach wires IO.
func TestCmdAttach(t *testing.T) {
host := &localProcessHost{}
p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{
CommandLine: "cmd /c exit /b 0",
})
if err != nil {
t.Fatal(err)
}

cmd, err := Attach(context.Background(), p, nil, nil, nil)
if err != nil {
t.Fatalf("Attach: %v", err)
}
if cmd.Process != p {
t.Fatal("Cmd.Process does not match the supplied process")
}
if err := cmd.Wait(); err != nil {
t.Fatalf("Wait: %v", err)
}
}

// TestCmdAttachIO verifies that Attach's IO relays flow process output
// to caller-supplied destination streams.
func TestCmdAttachIO(t *testing.T) {
host := &localProcessHost{}
p, err := host.CreateProcess(context.Background(), &hcsschema.ProcessParameters{
CommandLine: "cmd /c echo hello",
CreateStdOutPipe: true,
})
if err != nil {
t.Fatal(err)
}

var stdout bytes.Buffer
cmd, err := Attach(context.Background(), p, nil, &stdout, nil)
if err != nil {
t.Fatalf("Attach: %v", err)
}
if err := cmd.Wait(); err != nil {
t.Fatalf("Wait: %v", err)
}
if got := stdout.String(); got != "hello\r\n" {
t.Fatalf("stdout=%q, want %q", got, "hello\r\n")
}
}
14 changes: 14 additions & 0 deletions internal/controller/process/mocks/mock_cow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions internal/cow/cow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
hcsschema "github.com/Microsoft/hcsshim/internal/hcs/schema2"
)

// MigrationState captures the host-side identifiers needed to rebind a
// process during live migration. Zero fields mean the host doesn't use the
// corresponding facility (e.g. vsock or a GCS bridge).
type MigrationState struct {
StdinPort, StdoutPort, StderrPort uint32
WaitCallID int64
}

// Process is the interface for an OS process running in a container or utility VM.
type Process interface {
// Close releases resources associated with the process and closes the
Expand All @@ -27,6 +35,10 @@ type Process interface {
CloseStderr(ctx context.Context) error
// Pid returns the process ID.
Pid() int
// MigrationState returns the host-side identifiers (vsock stdio ports
// and GCS bridge wait-call id) needed by the live-migration save path.
// Zero fields indicate the host doesn't use those facilities.
MigrationState() MigrationState
// Stdio returns the stdio streams for a process. These may be nil if a stream
// was not requested during CreateProcess.
Stdio() (_ io.Writer, _ io.Reader, _ io.Reader)
Expand Down
41 changes: 41 additions & 0 deletions internal/gcs/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,44 @@ func (brdg *bridge) sendRPC(buf *bytes.Buffer, enc *json.Encoder, call *rpc) err
}
return nil
}

// NextID returns the bridge's next request id. Used by the migration save
// path so the destination can seed its counter above all source ids.
func (brdg *bridge) NextID() int64 {
brdg.mu.Lock()
defer brdg.mu.Unlock()
return brdg.nextID
}

// SeedNextID raises the request id allocator to at least next. No-op if
// already higher.
func (brdg *bridge) SeedNextID(next int64) {
brdg.mu.Lock()
defer brdg.mu.Unlock()
if next > brdg.nextID {
brdg.nextID = next
}
}

// PreregisterRPC inserts a stub rpc into the response table without sending
// anything. Used on the migration destination to adopt requests (today:
// WaitForProcess) the source had outstanding in the guest, so their
// eventual responses route normally.
func (brdg *bridge) PreregisterRPC(id int64, proc prot.RPCProc, resp responseMessage) (*rpc, error) {
call := &rpc{
ch: make(chan struct{}),
id: id,
proc: proc,
resp: resp,
}
brdg.mu.Lock()
defer brdg.mu.Unlock()
if brdg.rpcs == nil {
return nil, ErrBridgeClosed
}
if _, dup := brdg.rpcs[id]; dup {
return nil, fmt.Errorf("preregister rpc: id %d already in use", id)
}
brdg.rpcs[id] = call
return call, nil
}
68 changes: 65 additions & 3 deletions internal/gcs/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gcs
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -67,9 +68,9 @@ func (gc *GuestConnection) CreateContainer(ctx context.Context, cid string, conf
return c, nil
}

// CloneContainer just creates the wrappers and sets up notification requests for a
// container that is already running inside the UVM (after cloning).
func (gc *GuestConnection) CloneContainer(ctx context.Context, cid string) (_ *Container, err error) {
// OpenContainer attaches a host-side wrapper to a container already
// running inside the UVM.
func (gc *GuestConnection) OpenContainer(_ context.Context, cid string) (_ *Container, err error) {
c := &Container{
gc: gc,
id: cid,
Expand Down Expand Up @@ -118,6 +119,67 @@ func (c *Container) CreateProcess(ctx context.Context, config interface{}) (_ co
return c.gc.exec(ctx, c.id, config)
}

// OpenProcessWithIO is the live-migration restore counterpart of
// [Container.CreateProcess]: it attaches to a process already running
// in this container, re-listens on the supplied vsock ports, and
// pre-registers the source bridge's WaitForProcess id so the guest's
// still-outstanding response is routed without arming a duplicate wait.
func (c *Container) OpenProcessWithIO(ctx context.Context, pid uint32, stdinPort, stdoutPort, stderrPort uint32, waitCallID int64) (_ *Process, err error) {
ctx, span := oc.StartSpan(ctx, "gcs::Container::OpenProcessWithIO", oc.WithClientSpanKind)
defer span.End()
defer func() { oc.SetSpanStatus(span, err) }()
span.AddAttributes(
trace.StringAttribute("cid", c.id),
trace.Int64Attribute("pid", int64(pid)),
trace.Int64Attribute("waitCallID", waitCallID))

if waitCallID == 0 {
return nil, fmt.Errorf("open process pid %d in container %s: waitCallID is required", pid, c.id)
}

p := &Process{
gc: c.gc,
cid: c.id,
id: pid,
stdinPort: stdinPort,
stdoutPort: stdoutPort,
stderrPort: stderrPort,
}
defer func() {
if err != nil {
p.Close()
}
}()

listen := func(port uint32) (*ioChannel, error) {
if port == 0 {
return nil, nil
}
l, err := c.gc.ioListenFn(port)
if err != nil {
return nil, fmt.Errorf("listen vsock port %d: %w", port, err)
}
return newIoChannel(l), nil
}
if p.stdin, err = listen(stdinPort); err != nil {
return nil, err
}
if p.stdout, err = listen(stdoutPort); err != nil {
return nil, err
}
if p.stderr, err = listen(stderrPort); err != nil {
return nil, err
}

p.waitCall, err = c.gc.brdg.PreregisterRPC(waitCallID, prot.RPCWaitForProcess, &p.waitResp)
if err != nil {
return nil, fmt.Errorf("preregister wait for pid %d in container %s (id %d): %w", pid, c.id, waitCallID, err)
}
go p.waitBackground()
log.G(ctx).WithField("pid", p.id).Debug("opened existing process with IO")
return p, nil
}

// ID returns the container's ID.
func (c *Container) ID() string {
return c.id
Expand Down
39 changes: 39 additions & 0 deletions internal/gcs/guestconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,45 @@ func (gc *GuestConnection) newIoChannel() (*ioChannel, uint32, error) {
return newIoChannel(l), port, nil
}

// SetNextPort raises the new-process IO port allocator floor. Called
// by the live-migration restore path after [Connect] to skip past
// vsock ports already in use by restored processes. Never goes
// backwards.
func (gc *GuestConnection) SetNextPort(p uint32) {
gc.mu.Lock()
defer gc.mu.Unlock()
if p > gc.nextPort {
gc.nextPort = p
}
}

// NextPort returns the current allocator floor. Used by the
// live-migration save path to record what [SetNextPort] should be
// seeded with on the destination.
func (gc *GuestConnection) NextPort() uint32 {
gc.mu.Lock()
defer gc.mu.Unlock()
return gc.nextPort
}

// BridgeNextID returns the bridge's next request id allocator value.
func (gc *GuestConnection) BridgeNextID() int64 {
return gc.brdg.NextID()
}

// SeedBridgeNextID raises the bridge's request id allocator floor.
func (gc *GuestConnection) SeedBridgeNextID(next int64) {
gc.brdg.SeedNextID(next)
}

// PreregisterWaitForProcess registers a stub WaitForProcess rpc against the
// source-issued id so a later [Container.OpenProcessWithIO] (or the bridge
// itself) routes the guest's response without issuing a duplicate wait.
func (gc *GuestConnection) PreregisterWaitForProcess(id int64, resp *prot.ContainerWaitForProcessResponse) error {
_, err := gc.brdg.PreregisterRPC(id, prot.RPCWaitForProcess, resp)
return err
}

func (gc *GuestConnection) requestNotify(cid string, ch chan struct{}) error {
gc.mu.Lock()
defer gc.mu.Unlock()
Expand Down
Loading
Loading