Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 123 additions & 10 deletions lib/guest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (

securejoin "github.com/cyphar/filepath-securejoin"
"github.com/kernel/hypeman/lib/hypervisor"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -26,6 +30,10 @@ import (
const (
// vsockGuestPort is the port the guest-agent listens on inside the guest
vsockGuestPort = 2222

guestExecFastRetryInterval = 25 * time.Millisecond
guestExecSlowRetryInterval = 250 * time.Millisecond
guestExecFastRetryWindow = 2 * time.Second
Comment thread
sjmiller609 marked this conversation as resolved.
)

// AgentVSockDialError indicates the vsock dial to the guest agent failed.
Expand Down Expand Up @@ -93,17 +101,25 @@ func GetOrCreateConn(ctx context.Context, dialer hypervisor.VsockDialer) (*grpc.
return conn, nil
}

// CloseConn removes a connection from the pool by key (call when VM is deleted).
// We only remove from pool, not explicitly close - the connection will fail
// naturally when the VM dies, and grpc will clean up.
// CloseConn removes and closes a connection from the pool by key.
func CloseConn(dialerKey string) {
connPool.Lock()
defer connPool.Unlock()

if _, ok := connPool.conns[dialerKey]; ok {
conn, ok := connPool.conns[dialerKey]
if ok {
delete(connPool.conns, dialerKey)
slog.Debug("removed gRPC connection from pool", "key", dialerKey)
}
connPool.Unlock()

if !ok {
return
}
go func() {
if err := conn.Close(); err != nil {
slog.Debug("failed to close gRPC connection", "key", dialerKey, "error", err)
return
}
slog.Debug("closed and removed gRPC connection from pool", "key", dialerKey)
}()
}

// ExitStatus represents command exit information
Expand Down Expand Up @@ -133,42 +149,128 @@ type ExecOptions struct {
// ExecIntoInstance executes command in instance via vsock using gRPC.
// The dialer is a hypervisor-specific VsockDialer that knows how to connect to the guest.
// If WaitForAgent is set, it will retry on connection errors until the timeout.
func ExecIntoInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts ExecOptions) (*ExitStatus, error) {
// If no wait requested, execute immediately
func ExecIntoInstance(ctx context.Context, dialer hypervisor.VsockDialer, opts ExecOptions) (exit *ExitStatus, err error) {
// If no wait requested, execute immediately. API-level exec calls already have
// their own exec.session span; the detailed retry spans are only useful when
// we are waiting for the guest-agent to become reachable.
if opts.WaitForAgent == 0 {
return execIntoInstanceOnce(ctx, dialer, opts)
}

ctx, span := startGuestExecSpan(ctx, opts)
defer func() {
finishGuestExecSpan(span, exit, err)
}()

deadline := time.Now().Add(opts.WaitForAgent)
start := time.Now()
attempts := 0
retryableAttempts := 0
firstRetryableErrorType := ""
lastRetryableErrorType := ""
lastRetryInterval := time.Duration(0)

for {
attempts++
Comment thread
sjmiller609 marked this conversation as resolved.
exit, err := execIntoInstanceOnce(ctx, dialer, opts)

// Success - return immediately
if err == nil {
recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval)
return exit, err
}

// Check if this is a retryable connection error
if !isRetryableConnectionError(err) {
recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval)
return exit, err
}
retryableAttempts++
errType := retryableConnectionErrorType(err)
if firstRetryableErrorType == "" {
firstRetryableErrorType = errType
}
lastRetryableErrorType = errType
CloseConn(dialer.Key())
Comment thread
cursor[bot] marked this conversation as resolved.

// Connection error - check if we should retry
if time.Now().After(deadline) {
recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval)
return nil, err
}

retryInterval := guestExecRetryInterval(time.Since(start))
lastRetryInterval = retryInterval

// Wait before retrying, but respect context cancellation
select {
case <-ctx.Done():
recordGuestExecWait(span, start, attempts, retryableAttempts, firstRetryableErrorType, lastRetryableErrorType, lastRetryInterval)
return nil, ctx.Err()
case <-time.After(500 * time.Millisecond):
case <-time.After(retryInterval):
// Continue to retry
}
}
}

func guestExecRetryInterval(elapsed time.Duration) time.Duration {
if elapsed < guestExecFastRetryWindow {
return guestExecFastRetryInterval
}
return guestExecSlowRetryInterval
}

func recordGuestExecWait(span trace.Span, start time.Time, attempts, retryableAttempts int, firstRetryableErrorType, lastRetryableErrorType string, lastRetryInterval time.Duration) {
attrs := []attribute.KeyValue{
attribute.Int("attempts", attempts),
attribute.Int("retryable_attempts", retryableAttempts),
attribute.Int64("wait_elapsed_ms", time.Since(start).Milliseconds()),
}
if firstRetryableErrorType != "" {
attrs = append(attrs, attribute.String("first_retryable_error_type", firstRetryableErrorType))
}
if lastRetryableErrorType != "" {
attrs = append(attrs, attribute.String("last_retryable_error_type", lastRetryableErrorType))
}
if lastRetryInterval > 0 {
attrs = append(attrs, attribute.Int64("last_retry_interval_ms", lastRetryInterval.Milliseconds()))
}
span.SetAttributes(attrs...)
}

func startGuestExecSpan(ctx context.Context, opts ExecOptions) (context.Context, trace.Span) {
return otel.Tracer("hypeman/guest").Start(ctx, "guest.exec", trace.WithAttributes(
attribute.String("command_name", execCommandName(opts.Command)),
attribute.Bool("tty", opts.TTY),
attribute.Bool("wait_for_agent", opts.WaitForAgent > 0),
attribute.Int64("wait_for_agent_ms", opts.WaitForAgent.Milliseconds()),
attribute.Int("timeout_seconds", int(opts.Timeout)),
attribute.Int64("retry_fast_interval_ms", guestExecFastRetryInterval.Milliseconds()),
attribute.Int64("retry_slow_interval_ms", guestExecSlowRetryInterval.Milliseconds()),
attribute.Int64("retry_fast_window_ms", guestExecFastRetryWindow.Milliseconds()),
))
}

func finishGuestExecSpan(span trace.Span, exit *ExitStatus, err error) {
if exit != nil {
span.SetAttributes(attribute.Int("exit_code", exit.Code))
}
if err != nil {
span.RecordError(err)
span.SetStatus(otelcodes.Error, err.Error())
} else {
span.SetStatus(otelcodes.Ok, "")
}
span.End()
}

func execCommandName(command []string) string {
if len(command) == 0 || command[0] == "" {
return "/bin/sh"
}
return filepath.Base(command[0])
}

// isRetryableConnectionError returns true if the error indicates the guest agent
// is not yet ready and we should retry connecting.
func isRetryableConnectionError(err error) bool {
Expand All @@ -188,6 +290,17 @@ func isRetryableConnectionError(err error) bool {
return false
}

func retryableConnectionErrorType(err error) string {
var dialErr *AgentVSockDialError
if errors.As(err, &dialErr) {
return "vsock_dial"
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
return "grpc_unavailable"
}
return fmt.Sprintf("%T", err)
}

// execIntoInstanceOnce executes command in instance via vsock using gRPC (single attempt).
func execIntoInstanceOnce(ctx context.Context, dialer hypervisor.VsockDialer, opts ExecOptions) (*ExitStatus, error) {
start := time.Now()
Expand Down
Loading
Loading