diff --git a/lib/guest/client.go b/lib/guest/client.go index bee9f3a0..cb772dbc 100644 --- a/lib/guest/client.go +++ b/lib/guest/client.go @@ -17,6 +17,10 @@ import ( securejoin "github.com/cyphar/filepath-securejoin" "github.com/kernel/hypeman/lib/hypervisor" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -26,6 +30,10 @@ import ( const ( // vsockGuestPort is the port the guest-agent listens on inside the guest vsockGuestPort = 2222 + + guestExecFastRetryInterval = 25 * time.Millisecond + guestExecSlowRetryInterval = 250 * time.Millisecond + guestExecFastRetryWindow = 2 * time.Second ) // AgentVSockDialError indicates the vsock dial to the guest agent failed. @@ -93,17 +101,25 @@ func GetOrCreateConn(ctx context.Context, dialer hypervisor.VsockDialer) (*grpc. return conn, nil } -// CloseConn removes a connection from the pool by key (call when VM is deleted). -// We only remove from pool, not explicitly close - the connection will fail -// naturally when the VM dies, and grpc will clean up. +// CloseConn removes and closes a connection from the pool by key. func CloseConn(dialerKey string) { connPool.Lock() - defer connPool.Unlock() - - if _, ok := connPool.conns[dialerKey]; ok { + conn, ok := connPool.conns[dialerKey] + if ok { delete(connPool.conns, dialerKey) - slog.Debug("removed gRPC connection from pool", "key", dialerKey) } + connPool.Unlock() + + if !ok { + return + } + go func() { + if err := conn.Close(); err != nil { + slog.Debug("failed to close gRPC connection", "key", dialerKey, "error", err) + return + } + slog.Debug("closed and removed gRPC connection from pool", "key", dialerKey) + }() } // ExitStatus represents command exit information @@ -133,42 +149,128 @@ type ExecOptions struct { // ExecIntoInstance executes command in instance via vsock using gRPC. // The dialer is a hypervisor-specific VsockDialer that knows how to connect to the guest. // If WaitForAgent is set, it will retry on connection errors until the timeout. -func ExecIntoInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts ExecOptions) (*ExitStatus, error) { - // If no wait requested, execute immediately +func ExecIntoInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts ExecOptions) (exit *ExitStatus, err error) { + // If no wait requested, execute immediately. API-level exec calls already have + // their own exec.session span; the detailed retry spans are only useful when + // we are waiting for the guest-agent to become reachable. if opts.WaitForAgent == 0 { return execIntoInstanceOnce(ctx, dialer, opts) } + ctx, span := startGuestExecSpan(ctx, opts) + defer func() { + finishGuestExecSpan(span, exit, err) + }() + deadline := time.Now().Add(opts.WaitForAgent) + start := time.Now() + attempts := 0 + retryableAttempts := 0 + firstRetryableErrorType := "" + lastRetryableErrorType := "" + lastRetryInterval := time.Duration(0) for { + attempts++ exit, err := execIntoInstanceOnce(ctx, dialer, opts) // Success - return immediately if err == nil { + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) return exit, err } // Check if this is a retryable connection error if !isRetryableConnectionError(err) { + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) return exit, err } + retryableAttempts++ + errType := retryableConnectionErrorType(err) + if firstRetryableErrorType == "" { + firstRetryableErrorType = errType + } + lastRetryableErrorType = errType + CloseConn(dialer.Key()) // Connection error - check if we should retry if time.Now().After(deadline) { + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) return nil, err } + retryInterval := guestExecRetryInterval(time.Since(start)) + lastRetryInterval = retryInterval + // Wait before retrying, but respect context cancellation select { case <-ctx.Done(): + recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval) return nil, ctx.Err() - case <-time.After(500 * time.Millisecond): + case <-time.After(retryInterval): // Continue to retry } } } +func guestExecRetryInterval(elapsed time.Duration) time.Duration { + if elapsed < guestExecFastRetryWindow { + return guestExecFastRetryInterval + } + return guestExecSlowRetryInterval +} + +func recordGuestExecWait(span trace.Span, start time.Time, attempts, retryableAttempts int, firstRetryableErrorType, lastRetryableErrorType string, lastRetryInterval time.Duration) { + attrs := []attribute.KeyValue{ + attribute.Int("attempts", attempts), + attribute.Int("retryable_attempts", retryableAttempts), + attribute.Int64("wait_elapsed_ms", time.Since(start).Milliseconds()), + } + if firstRetryableErrorType != "" { + attrs = append(attrs, attribute.String("first_retryable_error_type", firstRetryableErrorType)) + } + if lastRetryableErrorType != "" { + attrs = append(attrs, attribute.String("last_retryable_error_type", lastRetryableErrorType)) + } + if lastRetryInterval > 0 { + attrs = append(attrs, attribute.Int64("last_retry_interval_ms", lastRetryInterval.Milliseconds())) + } + span.SetAttributes(attrs...) +} + +func startGuestExecSpan(ctx context.Context, opts ExecOptions) (context.Context, trace.Span) { + return otel.Tracer("hypeman/guest").Start(ctx, "guest.exec", trace.WithAttributes( + attribute.String("command_name", execCommandName(opts.Command)), + attribute.Bool("tty", opts.TTY), + attribute.Bool("wait_for_agent", opts.WaitForAgent > 0), + attribute.Int64("wait_for_agent_ms", opts.WaitForAgent.Milliseconds()), + attribute.Int("timeout_seconds", int(opts.Timeout)), + attribute.Int64("retry_fast_interval_ms", guestExecFastRetryInterval.Milliseconds()), + attribute.Int64("retry_slow_interval_ms", guestExecSlowRetryInterval.Milliseconds()), + attribute.Int64("retry_fast_window_ms", guestExecFastRetryWindow.Milliseconds()), + )) +} + +func finishGuestExecSpan(span trace.Span, exit *ExitStatus, err error) { + if exit != nil { + span.SetAttributes(attribute.Int("exit_code", exit.Code)) + } + if err != nil { + span.RecordError(err) + span.SetStatus(otelcodes.Error, err.Error()) + } else { + span.SetStatus(otelcodes.Ok, "") + } + span.End() +} + +func execCommandName(command []string) string { + if len(command) == 0 || command[0] == "" { + return "/bin/sh" + } + return filepath.Base(command[0]) +} + // isRetryableConnectionError returns true if the error indicates the guest agent // is not yet ready and we should retry connecting. func isRetryableConnectionError(err error) bool { @@ -188,6 +290,17 @@ func isRetryableConnectionError(err error) bool { return false } +func retryableConnectionErrorType(err error) string { + var dialErr *AgentVSockDialError + if errors.As(err, &dialErr) { + return "vsock_dial" + } + if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable { + return "grpc_unavailable" + } + return fmt.Sprintf("%T", err) +} + // execIntoInstanceOnce executes command in instance via vsock using gRPC (single attempt). func execIntoInstanceOnce(ctx context.Context, dialer hypervisor.VsockDialer, opts ExecOptions) (*ExitStatus, error) { start := time.Now() diff --git a/lib/guest/client_test.go b/lib/guest/client_test.go new file mode 100644 index 00000000..08a122b0 --- /dev/null +++ b/lib/guest/client_test.go @@ -0,0 +1,249 @@ +package guest + +import ( + "context" + "errors" + "io" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/kernel/hypeman/lib/hypervisor" + "google.golang.org/grpc" +) + +func TestExecCommandName(t *testing.T) { + tests := []struct { + name string + command []string + want string + }{ + { + name: "empty command defaults to shell", + command: nil, + want: "/bin/sh", + }, + { + name: "empty binary defaults to shell", + command: []string{""}, + want: "/bin/sh", + }, + { + name: "uses basename", + command: []string{"/usr/bin/ip", "addr", "show"}, + want: "ip", + }, + { + name: "does not include arguments", + command: []string{"/bin/bash", "-lc", "secret-bearing command"}, + want: "bash", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := execCommandName(tt.command); got != tt.want { + t.Fatalf("execCommandName(%v) = %q, want %q", tt.command, got, tt.want) + } + }) + } +} + +func TestGuestExecRetryInterval(t *testing.T) { + tests := []struct { + name string + elapsed time.Duration + want time.Duration + }{ + { + name: "fast path", + elapsed: 500 * time.Millisecond, + want: guestExecFastRetryInterval, + }, + { + name: "boundary", + elapsed: guestExecFastRetryWindow, + want: guestExecSlowRetryInterval, + }, + { + name: "slow path", + elapsed: 3 * time.Second, + want: guestExecSlowRetryInterval, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := guestExecRetryInterval(tt.elapsed); got != tt.want { + t.Fatalf("guestExecRetryInterval(%s) = %s, want %s", tt.elapsed, got, tt.want) + } + }) + } +} + +func TestExecIntoInstanceRetriesWithFreshConnections(t *testing.T) { + dialer := &delayedDialer{ + key: "retry-fresh-connection-test", + readyAt: time.Now().Add(100 * time.Millisecond), + } + + start := time.Now() + exit, err := ExecIntoInstance(context.Background(), dialer, ExecOptions{ + Command: []string{"true"}, + WaitForAgent: 2 * time.Second, + }) + elapsed := time.Since(start) + if err != nil { + t.Fatalf("ExecIntoInstance failed: %v", err) + } + if exit.Code != 0 { + t.Fatalf("exit code = %d, want 0", exit.Code) + } + if attempts := dialer.attempts.Load(); attempts < 2 { + t.Fatalf("dial attempts = %d, want retry", attempts) + } + if elapsed > 500*time.Millisecond { + t.Fatalf("ExecIntoInstance took %s, want under 500ms", elapsed) + } +} + +func TestCloseConnClosesPooledConnection(t *testing.T) { + dialer := &trackingDialer{ + key: "close-conn-test", + conns: make(chan *closeTrackingConn, 1), + } + + conn, err := GetOrCreateConn(context.Background(), dialer) + if err != nil { + t.Fatalf("GetOrCreateConn failed: %v", err) + } + conn.Connect() + + tracked := waitForTrackedConn(t, dialer.conns) + CloseConn(dialer.Key()) + + select { + case <-tracked.closed: + case <-time.After(time.Second): + t.Fatal("CloseConn did not close the underlying connection") + } +} + +func waitForTrackedConn(t *testing.T, conns <-chan *closeTrackingConn) *closeTrackingConn { + t.Helper() + + select { + case conn := <-conns: + return conn + case <-time.After(time.Second): + t.Fatal("gRPC connection was not dialed") + return nil + } +} + +type delayedDialer struct { + key string + readyAt time.Time + attempts atomic.Int32 +} + +func (d *delayedDialer) Key() string { return d.key } + +func (d *delayedDialer) DialVsock(ctx context.Context, port int) (net.Conn, error) { + d.attempts.Add(1) + if time.Now().Before(d.readyAt) { + return nil, errors.New("not ready") + } + + client, server := net.Pipe() + go serveFakeGuest(server) + return client, nil +} + +var _ hypervisor.VsockDialer = (*delayedDialer)(nil) + +type trackingDialer struct { + key string + conns chan *closeTrackingConn +} + +func (d *trackingDialer) Key() string { return d.key } + +func (d *trackingDialer) DialVsock(ctx context.Context, port int) (net.Conn, error) { + client, server := net.Pipe() + tracked := &closeTrackingConn{ + Conn: client, + closed: make(chan struct{}), + } + select { + case d.conns <- tracked: + default: + } + go serveFakeGuest(server) + return tracked, nil +} + +var _ hypervisor.VsockDialer = (*trackingDialer)(nil) + +type closeTrackingConn struct { + net.Conn + closed chan struct{} + once sync.Once +} + +func (c *closeTrackingConn) Close() error { + c.once.Do(func() { + close(c.closed) + }) + return c.Conn.Close() +} + +type fakeGuestServer struct { + UnimplementedGuestServiceServer +} + +func (s *fakeGuestServer) Exec(stream GuestService_ExecServer) error { + if _, err := stream.Recv(); err != nil { + return err + } + if err := stream.Send(&ExecResponse{Response: &ExecResponse_ExitCode{ExitCode: 0}}); err != nil { + return err + } + for { + if _, err := stream.Recv(); err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + } +} + +func serveFakeGuest(conn net.Conn) { + s := grpc.NewServer() + RegisterGuestServiceServer(s, &fakeGuestServer{}) + _ = s.Serve(&singleConnListener{conn: conn}) +} + +type singleConnListener struct { + conn net.Conn + done atomic.Bool +} + +func (l *singleConnListener) Accept() (net.Conn, error) { + if l.done.Swap(true) { + return nil, net.ErrClosed + } + return l.conn, nil +} + +func (l *singleConnListener) Close() error { return nil } + +func (l *singleConnListener) Addr() net.Addr { return dummyAddr{} } + +type dummyAddr struct{} + +func (dummyAddr) Network() string { return "pipe" } +func (dummyAddr) String() string { return "pipe" } diff --git a/lib/instances/exec_test.go b/lib/instances/exec_test.go index 4bf47928..064ddc28 100644 --- a/lib/instances/exec_test.go +++ b/lib/instances/exec_test.go @@ -58,10 +58,9 @@ func waitForExecAgent(ctx context.Context, mgr *manager, instanceID string, time var stdout, stderr bytes.Buffer exit, err := guest.ExecIntoInstance(ctx, dialer, guest.ExecOptions{ - Command: []string{"true"}, - Stdout: &stdout, - Stderr: &stderr, - WaitForAgent: 1 * time.Second, + Command: []string{"true"}, + Stdout: &stdout, + Stderr: &stderr, }) if err == nil && exit.Code == 0 { return nil diff --git a/lib/instances/manager_darwin_test.go b/lib/instances/manager_darwin_test.go index 7d966e02..2a3859e5 100644 --- a/lib/instances/manager_darwin_test.go +++ b/lib/instances/manager_darwin_test.go @@ -454,6 +454,8 @@ func TestVZStandbyAndRestore(t *testing.T) { // Wait for guest agent to be ready err = waitForExecAgent(ctx, mgr, inst.Id, 30*time.Second) require.NoError(t, err, "guest agent should be ready") + inst, err = waitForInstanceState(ctx, mgr, inst.Id, StateRunning, 30*time.Second) + require.NoError(t, err, "instance should be running before standby") t.Log("Guest agent ready") // Exec before standby