diff --git a/cmd/api/config/config.go b/cmd/api/config/config.go index 8690ad52..260b06d6 100644 --- a/cmd/api/config/config.go +++ b/cmd/api/config/config.go @@ -193,13 +193,27 @@ type CapacityConfig struct { // HypervisorConfig holds hypervisor settings. type HypervisorConfig struct { - Default string `koanf:"default"` - CloudHypervisorDefaultVersion string `koanf:"cloud_hypervisor_default_version"` - FirecrackerBinaryPath string `koanf:"firecracker_binary_path"` - FirecrackerSnapshotMemoryBackend string `koanf:"firecracker_snapshot_memory_backend"` - FirecrackerUFFDCacheMaxBytes string `koanf:"firecracker_uffd_cache_max_bytes"` - FirecrackerMaxConcurrentRestores int `koanf:"firecracker_max_concurrent_restores"` - Memory HypervisorMemoryConfig `koanf:"memory"` + Default string `koanf:"default"` + CloudHypervisorDefaultVersion string `koanf:"cloud_hypervisor_default_version"` + FirecrackerBinaryPath string `koanf:"firecracker_binary_path"` + FirecrackerSnapshotMemoryBackend string `koanf:"firecracker_snapshot_memory_backend"` + FirecrackerUFFDCacheMaxBytes string `koanf:"firecracker_uffd_cache_max_bytes"` + FirecrackerMaxConcurrentRestores int `koanf:"firecracker_max_concurrent_restores"` + FirecrackerUFFDGraduation FirecrackerUFFDGraduationConfig `koanf:"firecracker_uffd_graduation"` + Memory HypervisorMemoryConfig `koanf:"memory"` +} + +// FirecrackerUFFDGraduationConfig controls the background controller that +// detaches running UFFD-backed VMs from their snapshot memory pager once they +// have soaked, bounding active pager sessions and letting old pager versions +// retire. Disabled by default and only active on the uffd backend. +type FirecrackerUFFDGraduationConfig struct { + Enabled bool `koanf:"enabled"` + MinSessionAge string `koanf:"min_session_age"` + MaxConcurrent int `koanf:"max_concurrent"` + MaxActiveSessions int `koanf:"max_active_sessions"` + ScanInterval string `koanf:"scan_interval"` + CompletionTimeout string `koanf:"completion_timeout"` } // HypervisorMemoryConfig holds guest memory management settings. @@ -413,6 +427,14 @@ func defaultConfig() *Config { FirecrackerSnapshotMemoryBackend: "file", FirecrackerUFFDCacheMaxBytes: "4294967296", FirecrackerMaxConcurrentRestores: 32, + FirecrackerUFFDGraduation: FirecrackerUFFDGraduationConfig{ + Enabled: false, + MinSessionAge: "10m", + MaxConcurrent: 1, + MaxActiveSessions: 0, + ScanInterval: "1m", + CompletionTimeout: "10m", + }, Memory: HypervisorMemoryConfig{ Enabled: false, KernelPageInitMode: "hardened", @@ -640,6 +662,9 @@ func (c *Config) Validate() error { if err := validateByteSize("hypervisor.firecracker_uffd_cache_max_bytes", c.Hypervisor.FirecrackerUFFDCacheMaxBytes); err != nil { return err } + if err := c.validateFirecrackerUFFDGraduation(); err != nil { + return err + } if err := validateDuration("hypervisor.memory.active_ballooning.poll_interval", c.Hypervisor.Memory.ActiveBallooning.PollInterval); err != nil { return err } @@ -692,6 +717,29 @@ func validateDuration(field string, value string) error { return nil } +func (c *Config) validateFirecrackerUFFDGraduation() error { + g := c.Hypervisor.FirecrackerUFFDGraduation + if !g.Enabled { + return nil + } + for field, value := range map[string]string{ + "hypervisor.firecracker_uffd_graduation.min_session_age": g.MinSessionAge, + "hypervisor.firecracker_uffd_graduation.scan_interval": g.ScanInterval, + "hypervisor.firecracker_uffd_graduation.completion_timeout": g.CompletionTimeout, + } { + if err := validateDuration(field, value); err != nil { + return err + } + } + if g.MaxConcurrent < 0 { + return fmt.Errorf("hypervisor.firecracker_uffd_graduation.max_concurrent must not be negative") + } + if g.MaxActiveSessions < 0 { + return fmt.Errorf("hypervisor.firecracker_uffd_graduation.max_active_sessions must not be negative") + } + return nil +} + func intPtr(v int) *int { return &v } diff --git a/cmd/api/main.go b/cmd/api/main.go index da2a2326..3a92291b 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -35,8 +35,10 @@ import ( "github.com/kernel/hypeman/lib/ocicachegc" "github.com/kernel/hypeman/lib/otel" "github.com/kernel/hypeman/lib/paths" + "github.com/kernel/hypeman/lib/providers" "github.com/kernel/hypeman/lib/registry" "github.com/kernel/hypeman/lib/scopes" + "github.com/kernel/hypeman/lib/uffdgraduate" "github.com/kernel/hypeman/lib/vmm" nethttpmiddleware "github.com/oapi-codegen/nethttp-middleware" "github.com/riandyrn/otelchi" @@ -131,6 +133,33 @@ func startOCICacheGC(grp *errgroup.Group, ctx context.Context, runner ociCacheGC return true } +func configureUFFDGraduationController(cfg *config.Config, instanceManager instances.Manager, logger *slog.Logger) (*uffdgraduate.Controller, error) { + g := cfg.Hypervisor.FirecrackerUFFDGraduation + if !g.Enabled { + return nil, nil + } + minSessionAge, err := time.ParseDuration(g.MinSessionAge) + if err != nil { + return nil, fmt.Errorf("invalid hypervisor.firecracker_uffd_graduation.min_session_age %q: %w", g.MinSessionAge, err) + } + scanInterval, err := time.ParseDuration(g.ScanInterval) + if err != nil { + return nil, fmt.Errorf("invalid hypervisor.firecracker_uffd_graduation.scan_interval %q: %w", g.ScanInterval, err) + } + completionTimeout, err := time.ParseDuration(g.CompletionTimeout) + if err != nil { + return nil, fmt.Errorf("invalid hypervisor.firecracker_uffd_graduation.completion_timeout %q: %w", g.CompletionTimeout, err) + } + return providers.ProvideUFFDGraduationController(instanceManager, uffdgraduate.Config{ + Enabled: true, + MinSessionAge: minSessionAge, + MaxConcurrent: g.MaxConcurrent, + MaxActiveSessions: g.MaxActiveSessions, + ScanInterval: scanInterval, + CompletionTimeout: completionTimeout, + }, logger), nil +} + func run() error { // Load config early for OTel initialization // Config path can be specified via CONFIG_PATH env var or defaults to platform-specific locations @@ -565,6 +594,17 @@ func run() error { return app.AutoStandbyController.Run(gctx) }) } + + uffdGraduationController, err := configureUFFDGraduationController(app.Config, app.InstanceManager, logger) + if err != nil { + return err + } + if uffdGraduationController != nil { + grp.Go(func() error { + logger.Info("starting uffd graduation controller") + return uffdGraduationController.Run(gctx) + }) + } if app.HealthCheckController != nil { grp.Go(func() error { logger.Info("starting health check controller") diff --git a/lib/hypervisor/firecracker/firecracker.go b/lib/hypervisor/firecracker/firecracker.go index c2bb7db3..3040e2a7 100644 --- a/lib/hypervisor/firecracker/firecracker.go +++ b/lib/hypervisor/firecracker/firecracker.go @@ -57,16 +57,17 @@ func (f *Firecracker) Capabilities() hypervisor.Capabilities { func capabilities() hypervisor.Capabilities { return hypervisor.Capabilities{ - SupportsSnapshot: true, - SupportsHotplugMemory: false, - SupportsBalloonControl: true, - SupportsPause: true, - SupportsVsock: true, - SupportsGPUPassthrough: false, - SupportsDiskIOLimit: true, - SupportsGracefulVMMShutdown: false, - SupportsSnapshotBaseReuse: true, - SupportsConcurrentForkPrepare: true, + SupportsSnapshot: true, + SupportsHotplugMemory: false, + SupportsBalloonControl: true, + SupportsPause: true, + SupportsVsock: true, + SupportsGPUPassthrough: false, + SupportsDiskIOLimit: true, + SupportsGracefulVMMShutdown: false, + SupportsSnapshotBaseReuse: true, + SupportsConcurrentForkPrepare: true, + UsesDetachableSnapshotMemoryPager: true, } } diff --git a/lib/hypervisor/hypervisor.go b/lib/hypervisor/hypervisor.go index ce96fa85..03ef7cfd 100644 --- a/lib/hypervisor/hypervisor.go +++ b/lib/hypervisor/hypervisor.go @@ -263,6 +263,11 @@ type Capabilities struct { // SupportsDiskResize indicates if live disk resizing (/vm.resize-disk) is available. // Cloud Hypervisor v50.0+ only. SupportsDiskResize bool + + // UsesDetachableSnapshotMemoryPager indicates restores can be backed by an + // external snapshot-memory pager that a running VM can later be detached + // from (populate remaining pages, then release the session). + UsesDetachableSnapshotMemoryPager bool } // VsockDialer provides vsock connectivity to a guest VM. diff --git a/lib/instances/firecracker_test.go b/lib/instances/firecracker_test.go index 111c11b8..266c3eeb 100644 --- a/lib/instances/firecracker_test.go +++ b/lib/instances/firecracker_test.go @@ -861,6 +861,159 @@ func TestFCUFFDOneShotLifecycle(t *testing.T) { snapshotDeleted = true } +// TestFCUFFDGraduationLifecycle exercises detaching a running UFFD-backed VM +// from its pager: the pager populates the remaining pages and unregisters the +// session, and the VM must keep running on resident memory with its guest state +// intact. It is a sibling of TestFCUFFDOneShotLifecycle and leaves that test's +// coverage unchanged. +func TestFCUFFDGraduationLifecycle(t *testing.T) { + // Intentionally not parallel: graduation forces a full guest-memory populate, + // and overlapping that with the sibling UFFD lifecycle test's VMs saturated + // the CI runner and timed out guest-agent readiness. Running solo keeps peak + // concurrent UFFD VM load the same as before this test existed. + requireFirecrackerIntegrationPrereqs(t) + requireUserfaultfdIntegrationPrereqs(t) + if pagerBinary := strings.TrimSpace(os.Getenv("HYPEMAN_UFFD_PAGER_BINARY")); pagerBinary == "" { + t.Skip("HYPEMAN_UFFD_PAGER_BINARY must point at hypeman-uffd-pager for UFFD integration tests") + } else if st, err := os.Stat(pagerBinary); err != nil || !st.Mode().IsRegular() { + t.Skipf("HYPEMAN_UFFD_PAGER_BINARY is not a regular file: %s", pagerBinary) + } + + mgr, tmpDir := setupTestManagerForFirecrackerWithConfig(t, legacyParallelTestNetworkConfig(testNetworkSeq.Add(1)), ManagerConfig{ + FirecrackerSnapshotMemoryBackend: uffdpager.BackendUFFD, + FirecrackerUFFDCacheMaxBytes: 512 << 20, + }) + ctx := context.Background() + p := paths.New(tmpDir) + + imageManager, err := images.NewManager(p, 1, nil) + require.NoError(t, err) + imageName := integrationTestImageRef(t, "docker.io/library/alpine:latest") + snapshottest.EnsureImageReady(t, ctx, p, imageManager, imageName) + + systemManager := system.NewManager(p) + require.NoError(t, systemManager.EnsureSystemFiles(ctx)) + + source, err := mgr.CreateInstance(ctx, CreateInstanceRequest{ + Name: "fc-uffd-grad-src", + Image: imageName, + Size: 1024 * 1024 * 1024, + OverlaySize: 1024 * 1024 * 1024, + Vcpus: 1, + NetworkEnabled: false, + Hypervisor: hypervisor.TypeFirecracker, + Cmd: []string{"sleep", "infinity"}, + }) + require.NoError(t, err) + sourceID := source.Id + sourceDeleted := false + t.Cleanup(func() { + if !sourceDeleted { + _ = mgr.DeleteInstance(context.Background(), sourceID) + } + }) + + source = requireRunningSleepInstance(t, ctx, mgr, sourceID) + requireGuestTmpfs(t, ctx, source) + writeGuestFile(t, ctx, source, "/root/uffd-grad/source", "source-disk") + writeGuestFile(t, ctx, source, "/dev/shm/uffd-grad/source", "source-memory") + + // A VM with no pager session (the freshly created, file-backed source) is a + // no-op to graduate. + require.NoError(t, mgr.GraduateSnapshotMemoryPager(ctx, sourceID)) + + snapshot, err := mgr.CreateSnapshot(ctx, sourceID, CreateSnapshotRequest{ + Kind: SnapshotKindStandby, + Name: "fc-uffd-grad-snap", + }) + require.NoError(t, err) + snapshotDeleted := false + t.Cleanup(func() { + if !snapshotDeleted { + _ = mgr.DeleteSnapshot(context.Background(), snapshot.Id) + } + }) + + // Forking the standby snapshot to a running VM restores it UFFD-backed and + // pins a live pager session. + parent, err := mgr.ForkSnapshot(ctx, snapshot.Id, ForkSnapshotRequest{ + Name: "fc-uffd-grad-parent", + TargetState: StateRunning, + }) + require.NoError(t, err) + parentID := parent.Id + parentDeleted := false + t.Cleanup(func() { + if !parentDeleted { + _ = mgr.DeleteInstance(context.Background(), parentID) + } + }) + + parent = requireRunningSleepInstance(t, ctx, mgr, parentID) + assertGuestFile(t, ctx, parent, "/root/uffd-grad/source", "source-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/source", "source-memory") + writeGuestFile(t, ctx, parent, "/root/uffd-grad/parent", "parent-disk") + writeGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/parent", "parent-memory") + + parentMeta, err := mgr.loadMetadata(parentID) + require.NoError(t, err) + require.NotEmpty(t, parentMeta.StoredMetadata.FirecrackerUFFDSessionID, "running UFFD fork should hold a pager session") + target := mgr.UFFDGraduationTargetVersion() + require.NotEmpty(t, target, "uffd backend should expose a target pager version") + require.Equal(t, target, parentMeta.StoredMetadata.FirecrackerUFFDPagerVersion) + + // Graduate: the pager fully populates memory from the backing file and + // unregisters the session. The VM keeps running with no pager dependency. + require.NoError(t, mgr.GraduateSnapshotMemoryPager(ctx, parentID)) + + parentMeta, err = mgr.loadMetadata(parentID) + require.NoError(t, err) + require.Empty(t, parentMeta.StoredMetadata.FirecrackerUFFDSessionID, "graduation should clear the pager session binding") + require.Empty(t, parentMeta.StoredMetadata.FirecrackerUFFDPagerVersion) + + // The VM is still running and all guest memory and disk content survived the + // populate + unregister. + parent = requireRunningSleepInstance(t, ctx, mgr, parentID) + assertGuestFile(t, ctx, parent, "/root/uffd-grad/source", "source-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/source", "source-memory") + assertGuestFile(t, ctx, parent, "/root/uffd-grad/parent", "parent-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/parent", "parent-memory") + + // New guest memory and disk writes still work, proving the guest did not hang + // on a previously untouched page after userfaultfd was unregistered. + writeGuestFile(t, ctx, parent, "/root/uffd-grad/post", "post-disk") + writeGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/post", "post-memory") + assertGuestFile(t, ctx, parent, "/root/uffd-grad/post", "post-disk") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/post", "post-memory") + + // Graduating again is a no-op now that the session is gone. + require.NoError(t, mgr.GraduateSnapshotMemoryPager(ctx, parentID)) + + // A graduated VM still standbys and restores via the file backend, and its + // memory survives the round trip. + parent, err = mgr.StandbyInstance(ctx, parentID, StandbyInstanceRequest{}) + require.NoError(t, err) + require.Equal(t, StateStandby, parent.State) + + parent, err = mgr.RestoreInstance(ctx, parentID) + require.NoError(t, err) + parent = requireRunningSleepInstance(t, ctx, mgr, parentID) + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/source", "source-memory") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/parent", "parent-memory") + assertGuestFile(t, ctx, parent, "/dev/shm/uffd-grad/post", "post-memory") + + parentMeta, err = mgr.loadMetadata(parentID) + require.NoError(t, err) + require.Empty(t, parentMeta.StoredMetadata.FirecrackerUFFDSessionID, "file-backed restore after graduation should not create a pager session") + + require.NoError(t, mgr.DeleteInstance(ctx, parentID)) + parentDeleted = true + require.NoError(t, mgr.DeleteInstance(ctx, sourceID)) + sourceDeleted = true + require.NoError(t, mgr.DeleteSnapshot(ctx, snapshot.Id)) + snapshotDeleted = true +} + func requireRunningSleepInstance(t *testing.T, ctx context.Context, mgr Manager, instanceID string) *Instance { t.Helper() inst, err := waitForInstanceState(ctx, mgr, instanceID, StateRunning, integrationTestTimeout(20*time.Second)) diff --git a/lib/instances/firecracker_uffd_graduate.go b/lib/instances/firecracker_uffd_graduate.go new file mode 100644 index 00000000..3a3cd59b --- /dev/null +++ b/lib/instances/firecracker_uffd_graduate.go @@ -0,0 +1,90 @@ +package instances + +import ( + "context" + "errors" + "fmt" + "strings" + + "github.com/kernel/hypeman/lib/hypervisor" + "github.com/kernel/hypeman/lib/logger" + "github.com/kernel/hypeman/lib/uffdpager" +) + +// UFFDGraduationTargetVersion returns the pager version that new restores bind +// to, or "" when no UFFD pager is configured. The graduation controller treats +// sessions on a different version as the highest priority to retire. +func (m *manager) UFFDGraduationTargetVersion() string { + if m == nil || m.firecrackerUFFDPager == nil { + return "" + } + return m.firecrackerUFFDPager.VersionKey() +} + +// GraduateSnapshotMemoryPager detaches a running UFFD-backed VM from its pager. +// The pager populates the remaining snapshot pages and unregisters the session, +// leaving the VM running on resident memory with no pager dependency. The VM is +// never paused and its network is untouched, so this is safe to run on a live +// VM that is serving traffic. +func (m *manager) GraduateSnapshotMemoryPager(ctx context.Context, id string) error { + lock := m.getInstanceLock(id) + lock.Lock() + defer lock.Unlock() + + log := logger.FromContext(ctx) + + meta, err := m.loadMetadata(id) + if err != nil { + return err + } + stored := &meta.StoredMetadata + + if !m.snapshotMemoryPagerDetachable(stored.HypervisorType) { + return fmt.Errorf("hypervisor %s does not use a detachable snapshot memory pager", stored.HypervisorType) + } + sessionID := strings.TrimSpace(stored.FirecrackerUFFDSessionID) + if sessionID == "" { + // Nothing bound to a pager; the VM is already independent. + return nil + } + if m.firecrackerUFFDPager == nil { + return fmt.Errorf("firecracker uffd pager is not configured") + } + + inst := m.toInstanceWithoutHydration(ctx, meta) + if inst.State != StateRunning { + return fmt.Errorf("%w: cannot graduate snapshot memory pager from state %s", ErrInvalidState, inst.State) + } + + version := strings.TrimSpace(stored.FirecrackerUFFDPagerVersion) + if version == "" { + version = m.firecrackerUFFDPager.VersionKey() + } + + log.InfoContext(ctx, "graduating instance off uffd snapshot memory pager", + "instance_id", id, "session_id", sessionID, "pager_version", version) + + err = m.firecrackerUFFDPager.CompleteSessionVersion(ctx, version, sessionID) + if err != nil && !errors.Is(err, uffdpager.ErrSessionNotFound) { + return fmt.Errorf("complete uffd session: %w", err) + } + + // The session is gone whether we completed it or the pager had already lost + // it, so clear the binding. Standby and health paths key off the session ID + // and must not chase a session that no longer exists. + stored.FirecrackerUFFDSessionID = "" + stored.FirecrackerUFFDPagerVersion = "" + meta = &metadata{StoredMetadata: *stored} + if saveErr := m.saveMetadata(meta); saveErr != nil { + return fmt.Errorf("save metadata after graduation: %w", saveErr) + } + return nil +} + +func (m *manager) snapshotMemoryPagerDetachable(hvType hypervisor.Type) bool { + caps, ok := hypervisor.CapabilitiesForType(hvType) + if !ok { + return false + } + return caps.UsesDetachableSnapshotMemoryPager +} diff --git a/lib/providers/uffd_graduation.go b/lib/providers/uffd_graduation.go new file mode 100644 index 00000000..1f743c0d --- /dev/null +++ b/lib/providers/uffd_graduation.go @@ -0,0 +1,77 @@ +package providers + +import ( + "context" + "log/slog" + "strings" + + "github.com/kernel/hypeman/lib/instances" + "github.com/kernel/hypeman/lib/uffdgraduate" + "go.opentelemetry.io/otel" +) + +// uffdGraduationManager is the subset of the instance manager the graduation +// controller needs. ListInstances is on the public interface; the other two are +// concrete manager methods, matched here by type assertion. +type uffdGraduationManager interface { + ListInstances(ctx context.Context, filter *instances.ListInstancesFilter) ([]instances.Instance, error) + GraduateSnapshotMemoryPager(ctx context.Context, id string) error + UFFDGraduationTargetVersion() string +} + +type uffdGraduationStore struct { + manager uffdGraduationManager +} + +func (s uffdGraduationStore) ListPagerInstances(ctx context.Context) ([]uffdgraduate.Instance, error) { + insts, err := s.manager.ListInstances(ctx, nil) + if err != nil { + return nil, err + } + out := make([]uffdgraduate.Instance, 0, len(insts)) + for _, inst := range insts { + if inst.State != instances.StateRunning { + continue + } + if strings.TrimSpace(inst.FirecrackerUFFDSessionID) == "" { + continue + } + out = append(out, uffdgraduate.Instance{ + ID: inst.Id, + Name: inst.Name, + PagerVersion: inst.FirecrackerUFFDPagerVersion, + }) + } + return out, nil +} + +func (s uffdGraduationStore) GraduateInstance(ctx context.Context, id string) error { + return s.manager.GraduateSnapshotMemoryPager(ctx, id) +} + +func (s uffdGraduationStore) TargetVersion() string { + return s.manager.UFFDGraduationTargetVersion() +} + +// ProvideUFFDGraduationController builds the controller, or returns nil when the +// feature is disabled or no UFFD pager is configured (file backend / non-linux). +func ProvideUFFDGraduationController(instanceManager instances.Manager, cfg uffdgraduate.Config, log *slog.Logger) *uffdgraduate.Controller { + if instanceManager == nil || log == nil || !cfg.Enabled { + return nil + } + mgr, ok := instanceManager.(uffdGraduationManager) + if !ok { + return nil + } + if strings.TrimSpace(mgr.UFFDGraduationTargetVersion()) == "" { + return nil + } + return uffdgraduate.NewController( + uffdGraduationStore{manager: mgr}, + cfg, + uffdgraduate.ControllerOptions{ + Log: log.With("controller", "uffd_graduation"), + Meter: otel.GetMeterProvider().Meter("hypeman/uffdgraduate"), + }, + ) +} diff --git a/lib/uffdgraduate/README.md b/lib/uffdgraduate/README.md new file mode 100644 index 00000000..a9ec896b --- /dev/null +++ b/lib/uffdgraduate/README.md @@ -0,0 +1,44 @@ +# UFFD Graduation + +This controller detaches running VMs from the UFFD snapshot memory pager after +they have been up for a while, so the number of VMs depending on a pager stays +bounded and old pager versions can drain to zero and exit. + +## Why detach instead of migrate or fall back to file + +A UFFD-backed VM is pinned to its pager session for the life of the restore. The +memory backend (anonymous + userfaultfd vs. a private file mapping) is fixed when +the VM is restored, so there is no way to move a running VM onto the file backend +without restarting the VMM — which would drop its network connections. + +What can be done without touching the VM is to let the pager finish its job: it +populates every page that has not yet been faulted in from the backing file, then +unregisters userfaultfd and closes the session. The guest never pauses and its +network is untouched. The VM ends up running on resident memory with no pager +dependency. + +The cost is that the populated pages become resident anonymous memory (reclaimable +only to swap, unlike clean file-backed pages), and completion reads the whole +remaining image once. That is why graduation is paced and only applied to VMs that +have already had a soak period. + +## What it does + +On each scan the controller lists running VMs that still depend on a detachable +pager, then graduates eligible ones subject to the configured limits: + +- a session must be at least `min_session_age` old (tracked in memory; a control + plane restart restarts the soak, which is only more conservative) +- at most `max_concurrent` graduations run at once +- when `max_active_sessions` is zero, every soaked session is graduated + (time-based weaning); when positive, only enough oldest sessions are graduated + to bring the live count back to the ceiling +- sessions bound to an outdated pager version are always graduated after the soak, + so old pager versions retire quickly + +## Limits + +- The feature is disabled by default and only does anything when the host runs the + `uffd` snapshot memory backend. +- A failed graduation leaves the VM untouched (still on its pager); it is retried + on a later scan. diff --git a/lib/uffdgraduate/config.go b/lib/uffdgraduate/config.go new file mode 100644 index 00000000..b7d3f35c --- /dev/null +++ b/lib/uffdgraduate/config.go @@ -0,0 +1,83 @@ +package uffdgraduate + +import ( + "fmt" + "time" +) + +// Config controls how aggressively the controller graduates VMs off the pager. +// The zero value is disabled, so the feature is a no-op until explicitly turned +// on. +type Config struct { + Enabled bool + + // MinSessionAge is how long a session must have been observed before it is + // eligible to graduate. It gives the pager time to do its job as a restore + // accelerator and avoids churning a freshly restored VM. + MinSessionAge time.Duration + + // MaxConcurrent bounds simultaneous graduations. Each one reads the whole + // remaining memory image, so this is the IO/RAM blast-radius lever. + MaxConcurrent int + + // MaxActiveSessions is the hard ceiling on concurrent pager sessions. When + // zero, graduation is purely time based: every session past MinSessionAge is + // graduated. When positive, only enough oldest sessions are graduated to get + // back to the ceiling (sessions on an outdated pager version are always + // graduated regardless of the ceiling). + MaxActiveSessions int + + // ScanInterval is how often the controller evaluates sessions. + ScanInterval time.Duration + + // CompletionTimeout bounds a single graduation. Completion reads the whole + // remaining image, so this is generous. + CompletionTimeout time.Duration +} + +const ( + defaultMinSessionAge = 10 * time.Minute + defaultMaxConcurrent = 1 + defaultScanInterval = time.Minute + defaultCompletionTimeout = 10 * time.Minute +) + +// Normalize fills in defaults for unset fields. +func (c Config) Normalize() Config { + if c.MinSessionAge <= 0 { + c.MinSessionAge = defaultMinSessionAge + } + if c.MaxConcurrent <= 0 { + c.MaxConcurrent = defaultMaxConcurrent + } + if c.ScanInterval <= 0 { + c.ScanInterval = defaultScanInterval + } + if c.CompletionTimeout <= 0 { + c.CompletionTimeout = defaultCompletionTimeout + } + if c.MaxActiveSessions < 0 { + c.MaxActiveSessions = 0 + } + return c +} + +// Validate rejects nonsensical enabled configs. +func (c Config) Validate() error { + if !c.Enabled { + return nil + } + if c.MinSessionAge < 0 { + return fmt.Errorf("uffd graduation min_session_age must not be negative") + } + if c.MaxConcurrent < 0 { + return fmt.Errorf("uffd graduation max_concurrent must not be negative") + } + if c.MaxActiveSessions < 0 { + return fmt.Errorf("uffd graduation max_active_sessions must not be negative") + } + if c.ScanInterval < 0 { + return fmt.Errorf("uffd graduation scan_interval must not be negative") + } + return nil +} diff --git a/lib/uffdgraduate/config_test.go b/lib/uffdgraduate/config_test.go new file mode 100644 index 00000000..187386ca --- /dev/null +++ b/lib/uffdgraduate/config_test.go @@ -0,0 +1,52 @@ +package uffdgraduate + +import ( + "testing" + "time" +) + +func TestConfigNormalizeDefaults(t *testing.T) { + got := Config{Enabled: true}.Normalize() + if got.MinSessionAge != defaultMinSessionAge { + t.Fatalf("MinSessionAge = %s, want %s", got.MinSessionAge, defaultMinSessionAge) + } + if got.MaxConcurrent != defaultMaxConcurrent { + t.Fatalf("MaxConcurrent = %d, want %d", got.MaxConcurrent, defaultMaxConcurrent) + } + if got.ScanInterval != defaultScanInterval { + t.Fatalf("ScanInterval = %s, want %s", got.ScanInterval, defaultScanInterval) + } + if got.CompletionTimeout != defaultCompletionTimeout { + t.Fatalf("CompletionTimeout = %s, want %s", got.CompletionTimeout, defaultCompletionTimeout) + } + if got.MaxActiveSessions != 0 { + t.Fatalf("MaxActiveSessions = %d, want 0", got.MaxActiveSessions) + } +} + +func TestConfigNormalizeKeepsExplicit(t *testing.T) { + in := Config{ + Enabled: true, + MinSessionAge: 2 * time.Minute, + MaxConcurrent: 4, + MaxActiveSessions: 8, + ScanInterval: 30 * time.Second, + CompletionTimeout: time.Minute, + } + got := in.Normalize() + if got != in { + t.Fatalf("Normalize changed explicit config: got %+v want %+v", got, in) + } +} + +func TestConfigValidate(t *testing.T) { + if err := (Config{Enabled: false, MinSessionAge: -1}).Validate(); err != nil { + t.Fatalf("disabled config should always validate, got %v", err) + } + if err := (Config{Enabled: true, MaxConcurrent: -1}).Validate(); err == nil { + t.Fatal("expected error for negative max_concurrent") + } + if err := (Config{Enabled: true, MinSessionAge: time.Minute}).Validate(); err != nil { + t.Fatalf("valid enabled config should pass, got %v", err) + } +} diff --git a/lib/uffdgraduate/controller.go b/lib/uffdgraduate/controller.go new file mode 100644 index 00000000..2ceb215e --- /dev/null +++ b/lib/uffdgraduate/controller.go @@ -0,0 +1,223 @@ +package uffdgraduate + +import ( + "context" + "log/slog" + "sort" + "sync" + "time" + + "go.opentelemetry.io/otel/metric" +) + +// ControllerOptions configures logging, metrics, and time. +type ControllerOptions struct { + Log *slog.Logger + Meter metric.Meter + Now func() time.Time +} + +// Controller periodically detaches eligible running VMs from their snapshot +// memory pager so the pool of active pager sessions stays bounded and old pager +// versions can drain to zero and exit. +type Controller struct { + store InstanceStore + cfg Config + log *slog.Logger + now func() time.Time + + metrics *Metrics + wg sync.WaitGroup + + mu sync.Mutex + firstSeen map[string]time.Time + inFlight map[string]struct{} +} + +// NewController builds a controller. cfg is normalized here. +func NewController(store InstanceStore, cfg Config, opts ControllerOptions) *Controller { + log := opts.Log + if log == nil { + log = slog.Default() + } + now := opts.Now + if now == nil { + now = time.Now + } + c := &Controller{ + store: store, + cfg: cfg.Normalize(), + log: log, + now: now, + firstSeen: make(map[string]time.Time), + inFlight: make(map[string]struct{}), + } + c.metrics = newMetrics(opts.Meter, c) + return c +} + +// Run scans on an interval until ctx is cancelled. +func (c *Controller) Run(ctx context.Context) error { + if !c.cfg.Enabled { + <-ctx.Done() + return nil + } + c.log.Info("uffd graduation controller started", + "min_session_age", c.cfg.MinSessionAge, + "max_concurrent", c.cfg.MaxConcurrent, + "max_active_sessions", c.cfg.MaxActiveSessions, + "scan_interval", c.cfg.ScanInterval, + ) + + ticker := time.NewTicker(c.cfg.ScanInterval) + defer ticker.Stop() + + c.scan(ctx) + for { + select { + case <-ctx.Done(): + c.wg.Wait() + return nil + case <-ticker.C: + c.scan(ctx) + } + } +} + +func (c *Controller) scan(ctx context.Context) { + insts, err := c.store.ListPagerInstances(ctx) + if err != nil { + c.recordError("list") + c.log.Warn("uffd graduation scan failed to list instances", "error", err) + return + } + target := c.store.TargetVersion() + now := c.now() + + c.mu.Lock() + present := make(map[string]struct{}, len(insts)) + for _, inst := range insts { + present[inst.ID] = struct{}{} + if _, ok := c.firstSeen[inst.ID]; !ok { + c.firstSeen[inst.ID] = now + } + } + for id := range c.firstSeen { + if _, ok := present[id]; !ok { + delete(c.firstSeen, id) + } + } + + candidates := c.selectCandidatesLocked(insts, target, now) + launch := make([]Instance, 0, len(candidates)) + for _, inst := range candidates { + if len(c.inFlight) >= c.cfg.MaxConcurrent { + break + } + if _, busy := c.inFlight[inst.ID]; busy { + continue + } + c.inFlight[inst.ID] = struct{}{} + launch = append(launch, inst) + } + c.mu.Unlock() + + for _, inst := range launch { + c.wg.Add(1) + go c.graduate(ctx, inst) + } +} + +// selectCandidatesLocked returns the soaked instances that should graduate, +// ordered by priority: outdated pager versions first, then oldest first. +func (c *Controller) selectCandidatesLocked(insts []Instance, target string, now time.Time) []Instance { + type candidate struct { + inst Instance + outdated bool + age time.Duration + } + + soaked := make([]candidate, 0, len(insts)) + for _, inst := range insts { + seen, ok := c.firstSeen[inst.ID] + if !ok { + continue + } + if now.Sub(seen) < c.cfg.MinSessionAge { + continue + } + if _, busy := c.inFlight[inst.ID]; busy { + continue + } + outdated := target != "" && inst.PagerVersion != "" && inst.PagerVersion != target + soaked = append(soaked, candidate{inst: inst, outdated: outdated, age: now.Sub(seen)}) + } + + sort.SliceStable(soaked, func(i, j int) bool { + if soaked[i].outdated != soaked[j].outdated { + return soaked[i].outdated + } + return soaked[i].age > soaked[j].age + }) + + overCap := 0 + if c.cfg.MaxActiveSessions > 0 { + overCap = len(insts) - c.cfg.MaxActiveSessions + } + + out := make([]Instance, 0, len(soaked)) + for _, s := range soaked { + switch { + case c.cfg.MaxActiveSessions == 0: + out = append(out, s.inst) + case s.outdated: + out = append(out, s.inst) + case overCap > 0: + out = append(out, s.inst) + overCap-- + } + } + return out +} + +func (c *Controller) graduate(ctx context.Context, inst Instance) { + defer c.wg.Done() + defer func() { + c.mu.Lock() + delete(c.inFlight, inst.ID) + c.mu.Unlock() + }() + + gctx, cancel := context.WithTimeout(ctx, c.cfg.CompletionTimeout) + defer cancel() + + c.log.Info("graduating instance off uffd pager", + "instance_id", inst.ID, "instance_name", inst.Name, "pager_version", inst.PagerVersion) + + if err := c.store.GraduateInstance(gctx, inst.ID); err != nil { + c.recordAttempt("error") + c.recordError("graduate") + c.log.Warn("uffd graduation failed", "instance_id", inst.ID, "instance_name", inst.Name, "error", err) + return + } + + c.recordAttempt("success") + // Drop first-seen so a later rebind (e.g. after a future restore) restarts + // its soak rather than graduating immediately. + c.mu.Lock() + delete(c.firstSeen, inst.ID) + c.mu.Unlock() + c.log.Info("instance graduated off uffd pager", "instance_id", inst.ID, "instance_name", inst.Name) +} + +func (c *Controller) trackedSessions() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.firstSeen) +} + +func (c *Controller) inFlightCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.inFlight) +} diff --git a/lib/uffdgraduate/controller_test.go b/lib/uffdgraduate/controller_test.go new file mode 100644 index 00000000..7909ada3 --- /dev/null +++ b/lib/uffdgraduate/controller_test.go @@ -0,0 +1,170 @@ +package uffdgraduate + +import ( + "context" + "sync" + "testing" + "time" +) + +type fakeClock struct { + mu sync.Mutex + t time.Time +} + +func (c *fakeClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.t +} + +func (c *fakeClock) advance(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.t = c.t.Add(d) +} + +type fakeStore struct { + mu sync.Mutex + insts []Instance + target string + graduated []string + gradCh chan string + err error +} + +func newFakeStore(target string, insts ...Instance) *fakeStore { + return &fakeStore{insts: insts, target: target, gradCh: make(chan string, 16)} +} + +func (f *fakeStore) ListPagerInstances(context.Context) ([]Instance, error) { + f.mu.Lock() + defer f.mu.Unlock() + return append([]Instance(nil), f.insts...), f.err +} + +func (f *fakeStore) GraduateInstance(_ context.Context, id string) error { + f.mu.Lock() + if f.err != nil { + err := f.err + f.mu.Unlock() + return err + } + f.graduated = append(f.graduated, id) + rm := f.insts[:0] + for _, inst := range f.insts { + if inst.ID != id { + rm = append(rm, inst) + } + } + f.insts = rm + f.mu.Unlock() + f.gradCh <- id + return nil +} + +func (f *fakeStore) TargetVersion() string { return f.target } + +func newTestController(store InstanceStore, cfg Config, clock *fakeClock) *Controller { + return NewController(store, cfg, ControllerOptions{Now: clock.Now}) +} + +func ids(insts []Instance) []string { + out := make([]string, len(insts)) + for i, inst := range insts { + out[i] = inst.ID + } + return out +} + +func TestSelectCandidatesTimeBasedWeaning(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", + Instance{ID: "a", PagerVersion: "old"}, + Instance{ID: "b", PagerVersion: "new"}, + Instance{ID: "fresh", PagerVersion: "new"}, + ) + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute}, clock) + + c.firstSeen["a"] = base.Add(-20 * time.Minute) + c.firstSeen["b"] = base.Add(-15 * time.Minute) + c.firstSeen["fresh"] = base.Add(-time.Minute) + + got := ids(c.selectCandidatesLocked(store.insts, "new", clock.Now())) + // Both soaked instances graduate; the outdated one is ordered first; the + // fresh instance is excluded. + if len(got) != 2 || got[0] != "a" || got[1] != "b" { + t.Fatalf("candidates = %v, want [a b]", got) + } +} + +func TestSelectCandidatesCapKeepsNewest(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", + Instance{ID: "old1", PagerVersion: "new"}, + Instance{ID: "old2", PagerVersion: "new"}, + ) + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute, MaxActiveSessions: 1}, clock) + c.firstSeen["old1"] = base.Add(-30 * time.Minute) + c.firstSeen["old2"] = base.Add(-20 * time.Minute) + + got := ids(c.selectCandidatesLocked(store.insts, "new", clock.Now())) + // Over the ceiling by one; only the oldest is graduated, newest stays warm. + if len(got) != 1 || got[0] != "old1" { + t.Fatalf("candidates = %v, want [old1]", got) + } +} + +func TestSelectCandidatesOutdatedAlwaysGraduates(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", + Instance{ID: "outdated", PagerVersion: "old"}, + Instance{ID: "current", PagerVersion: "new"}, + ) + // Ceiling above the live count, so capacity alone would graduate nothing. + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute, MaxActiveSessions: 5}, clock) + c.firstSeen["outdated"] = base.Add(-20 * time.Minute) + c.firstSeen["current"] = base.Add(-20 * time.Minute) + + got := ids(c.selectCandidatesLocked(store.insts, "new", clock.Now())) + if len(got) != 1 || got[0] != "outdated" { + t.Fatalf("candidates = %v, want [outdated]", got) + } +} + +func TestScanRespectsSoak(t *testing.T) { + base := time.Unix(1_700_000_000, 0) + clock := &fakeClock{t: base} + store := newFakeStore("new", Instance{ID: "vm1", PagerVersion: "old"}) + c := newTestController(store, Config{Enabled: true, MinSessionAge: 10 * time.Minute, MaxConcurrent: 1}, clock) + + c.scan(context.Background()) // records first-seen, age 0 -> no graduation + select { + case id := <-store.gradCh: + t.Fatalf("unexpected graduation before soak: %s", id) + case <-time.After(50 * time.Millisecond): + } + + clock.advance(11 * time.Minute) + c.scan(context.Background()) + select { + case id := <-store.gradCh: + if id != "vm1" { + t.Fatalf("graduated %s, want vm1", id) + } + case <-time.After(2 * time.Second): + t.Fatal("expected graduation after soak") + } + + // First-seen is dropped on success so a rebind restarts the soak. + c.wg.Wait() + c.mu.Lock() + _, tracked := c.firstSeen["vm1"] + c.mu.Unlock() + if tracked { + t.Fatal("expected first-seen cleared after successful graduation") + } +} diff --git a/lib/uffdgraduate/metrics.go b/lib/uffdgraduate/metrics.go new file mode 100644 index 00000000..f93e4c35 --- /dev/null +++ b/lib/uffdgraduate/metrics.go @@ -0,0 +1,86 @@ +package uffdgraduate + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type Metrics struct { + attemptsTotal metric.Int64Counter + errorsTotal metric.Int64Counter + inFlightGauge metric.Int64ObservableGauge + trackedGauge metric.Int64ObservableGauge +} + +func newMetrics(meter metric.Meter, controller *Controller) *Metrics { + if meter == nil { + return &Metrics{} + } + + attemptsTotal, err := meter.Int64Counter( + "hypeman_uffd_graduation_attempts_total", + metric.WithDescription("Total UFFD pager graduation attempts"), + ) + if err != nil { + return &Metrics{} + } + errorsTotal, err := meter.Int64Counter( + "hypeman_uffd_graduation_errors_total", + metric.WithDescription("Total UFFD graduation controller errors"), + ) + if err != nil { + return &Metrics{} + } + inFlightGauge, err := meter.Int64ObservableGauge( + "hypeman_uffd_graduation_in_flight", + metric.WithDescription("Graduations currently in flight"), + ) + if err != nil { + return &Metrics{} + } + trackedGauge, err := meter.Int64ObservableGauge( + "hypeman_uffd_graduation_tracked_sessions", + metric.WithDescription("Pager-backed sessions currently tracked by the graduation controller"), + ) + if err != nil { + return &Metrics{} + } + + m := &Metrics{ + attemptsTotal: attemptsTotal, + errorsTotal: errorsTotal, + inFlightGauge: inFlightGauge, + trackedGauge: trackedGauge, + } + + _, _ = meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error { + if controller == nil { + return nil + } + observer.ObserveInt64(m.inFlightGauge, int64(controller.inFlightCount())) + observer.ObserveInt64(m.trackedGauge, int64(controller.trackedSessions())) + return nil + }, inFlightGauge, trackedGauge) + + return m +} + +func (c *Controller) recordAttempt(status string) { + if c.metrics == nil || c.metrics.attemptsTotal == nil { + return + } + c.metrics.attemptsTotal.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("status", status), + )) +} + +func (c *Controller) recordError(operation string) { + if c.metrics == nil || c.metrics.errorsTotal == nil { + return + } + c.metrics.errorsTotal.Add(context.Background(), 1, metric.WithAttributes( + attribute.String("operation", operation), + )) +} diff --git a/lib/uffdgraduate/types.go b/lib/uffdgraduate/types.go new file mode 100644 index 00000000..82735a9c --- /dev/null +++ b/lib/uffdgraduate/types.go @@ -0,0 +1,26 @@ +package uffdgraduate + +import "context" + +// Instance is a running VM that currently depends on a detachable snapshot +// memory pager. PagerVersion is the version it is bound to. +type Instance struct { + ID string + Name string + PagerVersion string +} + +// InstanceStore is the controller's view of the instance manager. It is kept +// narrow and free of hypervisor/UFFD types so the controller stays agnostic to +// how graduation is actually performed. +type InstanceStore interface { + // ListPagerInstances returns running instances that still depend on a + // detachable snapshot memory pager, each tagged with its bound version. + ListPagerInstances(ctx context.Context) ([]Instance, error) + // GraduateInstance detaches the instance from its pager. The call blocks + // until the pager has populated remaining pages and released the session. + GraduateInstance(ctx context.Context, id string) error + // TargetVersion is the pager version new restores bind to. Sessions on a + // different version are prioritised so old pager versions can retire. + TargetVersion() string +} diff --git a/lib/uffdpager/README.md b/lib/uffdpager/README.md index aec2decb..4724e22c 100644 --- a/lib/uffdpager/README.md +++ b/lib/uffdpager/README.md @@ -37,8 +37,17 @@ Hypeman talks to the pager over Unix HTTP at - `GET /stats` - `POST /sessions` - `POST /sessions/{id}/close` +- `POST /sessions/{id}/complete` - `POST /drain` +`POST /sessions/{id}/complete` populates every outstanding page of a session +from the backing file and then unregisters userfaultfd, so the restored VM stops +depending on this pager and keeps running on resident memory. The VM is never +paused and its network is untouched. Completion reads the whole remaining memory +image, so it is paced by the caller; the request is bounded by its context +rather than a fixed timeout. Unregister happens only after a full populate, +because once a range is unregistered the kernel zero-fills any still-absent page. + Firecracker does not use this control socket directly. Each restore session gets its own Unix socket that receives Firecracker's UFFD file descriptor and memory regions. diff --git a/lib/uffdpager/VERSION b/lib/uffdpager/VERSION index d917d3e2..b1e80bb2 100644 --- a/lib/uffdpager/VERSION +++ b/lib/uffdpager/VERSION @@ -1 +1 @@ -0.1.2 +0.1.3 diff --git a/lib/uffdpager/complete_linux.go b/lib/uffdpager/complete_linux.go new file mode 100644 index 00000000..f5d52f91 --- /dev/null +++ b/lib/uffdpager/complete_linux.go @@ -0,0 +1,240 @@ +//go:build linux + +package uffdpager + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "strings" + "unsafe" + + "github.com/go-chi/chi/v5" + "golang.org/x/sys/unix" +) + +// _UFFDIO_UNREGISTER ioctl. Derived the same way as uffdioCopy in +// server_faults_linux.go: _IOR(UFFDIO=0xAA, _UFFDIO_UNREGISTER=1, struct +// uffdio_range{u64 start; u64 len}) on the asm-generic ioctl encoding. +const uffdioUnregister = 0x8010aa01 + +type uffdioRange struct { + start uint64 + len uint64 +} + +// completeRequest is a single /complete attempt handed to the fault loop. The +// reply channel is buffered so the fault loop never blocks delivering a result +// even if the HTTP caller has already given up. +type completeRequest struct { + resp chan error +} + +// handleComplete drives an existing session to fully populate its guest memory +// from the backing file and detach userfaultfd, so the restored VM stops +// depending on this pager. The VM keeps running throughout. +func (s *server) handleComplete(w http.ResponseWriter, r *http.Request) { + id := sanitizeSessionID(chi.URLParam(r, "id")) + s.mu.Lock() + sess := s.sessions[id] + s.mu.Unlock() + if sess == nil { + http.Error(w, "session not found", http.StatusNotFound) + return + } + + req := &completeRequest{resp: make(chan error, 1)} + select { + case sess.completeReqCh <- req: + case <-sess.done: + http.Error(w, "session closed", http.StatusConflict) + return + default: + http.Error(w, "completion already in progress", http.StatusConflict) + return + } + sess.wake() + + select { + case err := <-req.resp: + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + case <-sess.done: + http.Error(w, "session closed before completion", http.StatusInternalServerError) + case <-r.Context().Done(): + http.Error(w, r.Context().Err().Error(), http.StatusGatewayTimeout) + } +} + +// wake writes the wake pipe so the fault loop breaks out of an idle poll and +// notices a pending completion request. +func (s *session) wake() { + if s.wakeW >= 0 { + var b [1]byte + _, _ = unix.Write(s.wakeW, b[:]) + } +} + +// takeCompletion runs a queued completion request if one is pending. It returns +// true only when the session fully completed and the fault loop should exit so +// the session tears down; on failure the session keeps serving faults so the VM +// stays safely backed by the pager. +func (s *session) takeCompletion() bool { + select { + case req := <-s.completeReqCh: + err := s.completeAndUnregister() + req.resp <- err + return err == nil + default: + return false + } +} + +// completeAndUnregister populates every page from the backing file and only then +// unregisters the ranges. Unregister must happen strictly after a full populate: +// once a range is unregistered the kernel zero-fills any page that was still +// absent, which would be corruption. If the populate fails the ranges are left +// registered so the fault loop can keep serving them. +func (s *session) completeAndUnregister() error { + if err := s.populateAll(); err != nil { + return err + } + return s.unregisterAll() +} + +func (s *session) populateAll() error { + var firstErr error + noteErr := func(err error) { + if firstErr == nil { + firstErr = err + } + } + + eventBuf := make([]byte, uffdMsgSize) + for _, mapping := range s.mappings { + pageSize := int(mapping.PageSize) + if pageSize <= 0 { + continue + } + page := make([]byte, pageSize) + for off := uint64(0); off+uint64(pageSize) <= mapping.Size; off += uint64(pageSize) { + fileOffset := int64(mapping.Offset + off) + n, err := s.backingFile.ReadAt(page, fileOffset) + if n != pageSize { + noteErr(fmt.Errorf("read backing page at %d: read %d/%d bytes: %w", fileOffset, n, pageSize, err)) + continue + } + s.server.backingBytesRead.Add(int64(n)) + if err := s.populatePage(mapping.BaseHostVirtAddr+off, page, eventBuf); err != nil { + s.server.copyErrors.Add(1) + noteErr(fmt.Errorf("populate page at %#x: %w", mapping.BaseHostVirtAddr+off, err)) + } + } + } + return firstErr +} + +func (s *session) unregisterAll() error { + var firstErr error + for _, mapping := range s.mappings { + // EINVAL means the range is already unregistered, which makes a retried + // completion idempotent. + if err := uffdUnregister(s.uffdFD, mapping.BaseHostVirtAddr, mapping.Size); err != nil && !errors.Is(err, unix.EINVAL) { + if firstErr == nil { + firstErr = fmt.Errorf("unregister range %#x len %d: %w", mapping.BaseHostVirtAddr, mapping.Size, err) + } + } + } + return firstErr +} + +// populatePage copies one page into guest memory. UFFDIO_COPY returns EEXIST +// (treated as success) for pages the guest already faulted in, and can return +// EAGAIN when a pending remove/unmap event from ballooning blocks the copy; +// draining those events and retrying clears it, mirroring the fault loop. +func (s *session) populatePage(dst uint64, page, eventBuf []byte) error { + const maxAttempts = 5 + var lastErr error + for attempt := 0; attempt < maxAttempts; attempt++ { + err := uffdCopy(s.uffdFD, dst, page) + if err == nil { + s.server.copies.Add(1) + return nil + } + if !errors.Is(err, unix.EAGAIN) { + return err + } + drainUFFDEvents(s.uffdFD, eventBuf) + lastErr = err + } + return lastErr +} + +func drainUFFDEvents(fd int, buf []byte) { + for { + _, ok, err := readUFFDEvent(fd, buf) + if err != nil || !ok { + return + } + } +} + +func drainWake(fd int) { + var buf [16]byte + for { + n, err := unix.Read(fd, buf[:]) + if n <= 0 || err != nil { + return + } + } +} + +func newWakePipe() (int, int, error) { + var fds [2]int + if err := unix.Pipe2(fds[:], unix.O_NONBLOCK|unix.O_CLOEXEC); err != nil { + return -1, -1, err + } + return fds[0], fds[1], nil +} + +func uffdUnregister(fd int, start, length uint64) error { + rng := uffdioRange{start: start, len: length} + _, _, errno := unix.Syscall(unix.SYS_IOCTL, uintptr(fd), uintptr(uffdioUnregister), uintptr(unsafe.Pointer(&rng))) + if errno != 0 { + return errno + } + return nil +} + +// CompleteSessionVersion asks the pager for the given version to fully populate +// and detach the session. It uses a context-bounded client with no fixed +// timeout because completion reads the whole backing image. +func (s *Supervisor) CompleteSessionVersion(ctx context.Context, versionKey, sessionID string) error { + if s == nil || strings.TrimSpace(versionKey) == "" || strings.TrimSpace(sessionID) == "" { + return nil + } + client := newUnixHTTPClient(pagerControlSocket(s.dataDir, versionKey), 0) + path := "/sessions/" + urlPathEscape(sessionID) + "/complete" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://unix"+path, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + data, _ := io.ReadAll(io.LimitReader(resp.Body, 64<<10)) + if resp.StatusCode == http.StatusNotFound { + return ErrSessionNotFound + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("uffd pager complete %s returned %s: %s", sessionID, resp.Status, strings.TrimSpace(string(data))) + } + return nil +} diff --git a/lib/uffdpager/complete_linux_test.go b/lib/uffdpager/complete_linux_test.go new file mode 100644 index 00000000..e602e17a --- /dev/null +++ b/lib/uffdpager/complete_linux_test.go @@ -0,0 +1,50 @@ +//go:build linux + +package uffdpager + +import ( + "testing" + + "golang.org/x/sys/unix" +) + +// TestUffdioUnregisterValue guards the hand-computed ioctl number against typos +// by recomputing it from the asm-generic _IOR encoding used on the amd64/arm64 +// hosts Hypeman runs on (the same encoding uffdioCopy relies on). +func TestUffdioUnregisterValue(t *testing.T) { + const ( + iocRead = 2 + dirShift = 30 + sizeShift = 16 + typeShift = 8 + uffdioMagic = 0xAA + unregisterNr = 1 + rangeSize = 16 // sizeof(struct uffdio_range) + ) + want := uintptr(iocRead< 0 { + t.Fatalf("expected wake pipe drained, read %d bytes", n) + } +} diff --git a/lib/uffdpager/server_faults_linux.go b/lib/uffdpager/server_faults_linux.go index 17acb6c2..fbdb56ac 100644 --- a/lib/uffdpager/server_faults_linux.go +++ b/lib/uffdpager/server_faults_linux.go @@ -50,7 +50,10 @@ type uffdEvent struct { func (s *session) handleFaults(mappings []guestRegionUffdMapping) { fd := s.uffdFD _ = unix.SetNonblock(fd, true) - pollFDs := []unix.PollFd{{Fd: int32(fd), Events: unix.POLLIN}} + pollFDs := []unix.PollFd{ + {Fd: int32(fd), Events: unix.POLLIN}, + {Fd: int32(s.wakeR), Events: unix.POLLIN}, + } buf := make([]byte, uffdMsgSize) var deferred []uffdEvent for { @@ -62,6 +65,14 @@ func (s *session) handleFaults(mappings []guestRegionUffdMapping) { } return } + if pollFDs[1].Revents&unix.POLLIN != 0 { + drainWake(s.wakeR) + // Populate and unregister in this goroutine; only exit (and tear + // down the session) when completion fully succeeds. + if s.takeCompletion() { + return + } + } if n == 0 || pollFDs[0].Revents&unix.POLLIN == 0 { if pollFDs[0].Revents&(unix.POLLHUP|unix.POLLERR|unix.POLLNVAL) != 0 { return diff --git a/lib/uffdpager/server_linux.go b/lib/uffdpager/server_linux.go index d007ee6a..83605781 100644 --- a/lib/uffdpager/server_linux.go +++ b/lib/uffdpager/server_linux.go @@ -59,6 +59,14 @@ type session struct { closeOnce sync.Once uffdFD int conn *net.UnixConn + + // Graduation: the fault loop watches wakeR alongside the uffd fd, so a + // /complete request handed to completeReqCh can interrupt an idle poll and + // run completion in the same goroutine that owns the uffd. + mappings []guestRegionUffdMapping + wakeR int + wakeW int + completeReqCh chan *completeRequest } func Main(args []string) error { @@ -110,6 +118,7 @@ func (s *server) run() error { router.Get("/stats", s.handleStats) router.Post("/sessions", s.handleCreateSession) router.Post("/sessions/{id}/close", s.handleCloseSession) + router.Post("/sessions/{id}/complete", s.handleComplete) router.Post("/drain", s.handleDrain) s.httpServer = &http.Server{Handler: router} diff --git a/lib/uffdpager/server_sessions_linux.go b/lib/uffdpager/server_sessions_linux.go index cfb14273..0d2f4337 100644 --- a/lib/uffdpager/server_sessions_linux.go +++ b/lib/uffdpager/server_sessions_linux.go @@ -31,6 +31,13 @@ func (s *server) createSession(req CreateSessionRequest) (*session, error) { _ = os.Remove(socketPath) return nil, fmt.Errorf("open backing memory for uffd session %s: %w", id, err) } + wakeR, wakeW, err := newWakePipe() + if err != nil { + _ = backingFile.Close() + _ = listener.Close() + _ = os.Remove(socketPath) + return nil, fmt.Errorf("create wake pipe for uffd session %s: %w", id, err) + } sess := &session{ id: id, @@ -43,6 +50,9 @@ func (s *server) createSession(req CreateSessionRequest) (*session, error) { done: make(chan struct{}), backingFile: backingFile, uffdFD: -1, + wakeR: wakeR, + wakeW: wakeW, + completeReqCh: make(chan *completeRequest, 1), } s.mu.Lock() @@ -135,11 +145,14 @@ func (s *session) run() { sort.Slice(mappings, func(i, j int) bool { return mappings[i].BaseHostVirtAddr < mappings[j].BaseHostVirtAddr }) + s.mappings = mappings s.handleFaults(mappings) } func (s *session) close() { s.closeOnce.Do(func() { + // Closing s.done releases any /complete waiter still blocked on this + // session; the wake pipe fds are closed below. if s.listener != nil { _ = s.listener.Close() } @@ -149,6 +162,12 @@ func (s *session) close() { if s.uffdFD >= 0 { _ = unix.Close(s.uffdFD) } + if s.wakeR >= 0 { + _ = unix.Close(s.wakeR) + } + if s.wakeW >= 0 { + _ = unix.Close(s.wakeW) + } if s.backingFile != nil { _ = s.backingFile.Close() } diff --git a/lib/uffdpager/supervisor_linux.go b/lib/uffdpager/supervisor_linux.go index c7fb4c47..201d01fc 100644 --- a/lib/uffdpager/supervisor_linux.go +++ b/lib/uffdpager/supervisor_linux.go @@ -252,18 +252,24 @@ func (s *Supervisor) clientForVersion(versionKey string) *http.Client { if client := s.clients[versionKey]; client != nil { return client } - socketPath := pagerControlSocket(s.dataDir, versionKey) - client := &http.Client{ + client := newUnixHTTPClient(pagerControlSocket(s.dataDir, versionKey), 10*time.Second) + s.clients[versionKey] = client + return client +} + +// newUnixHTTPClient builds an HTTP client bound to a single unix control socket. +// A timeout of zero leaves the request bounded only by its context, which the +// completion call relies on because it can run longer than a normal request. +func newUnixHTTPClient(socketPath string, timeout time.Duration) *http.Client { + return &http.Client{ Transport: &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { var d net.Dialer return d.DialContext(ctx, "unix", socketPath) }, }, - Timeout: 10 * time.Second, + Timeout: timeout, } - s.clients[versionKey] = client - return client } func pagerVersionDir(dataDir, versionKey string) string { diff --git a/lib/uffdpager/supervisor_unsupported.go b/lib/uffdpager/supervisor_unsupported.go index ece0704c..52ee5f09 100644 --- a/lib/uffdpager/supervisor_unsupported.go +++ b/lib/uffdpager/supervisor_unsupported.go @@ -29,6 +29,10 @@ func (s *Supervisor) CloseSessionVersion(context.Context, string, string) error return nil } +func (s *Supervisor) CompleteSessionVersion(context.Context, string, string) error { + return fmt.Errorf("uffd pager is only supported on linux") +} + func (s *Supervisor) Stats(context.Context) (*Stats, error) { return nil, fmt.Errorf("uffd pager is only supported on linux") } diff --git a/lib/uffdpager/types.go b/lib/uffdpager/types.go index 99736983..b521b435 100644 --- a/lib/uffdpager/types.go +++ b/lib/uffdpager/types.go @@ -1,5 +1,7 @@ package uffdpager +import "errors" + const ( BackendFile = "file" BackendUFFD = "uffd" @@ -7,6 +9,10 @@ const ( defaultCacheMaxBytes = int64(4 << 30) ) +// ErrSessionNotFound reports that a pager no longer has the requested session, +// so a completion request had nothing to act on. +var ErrSessionNotFound = errors.New("uffd pager session not found") + // CreateSessionRequest describes one Firecracker UFFD restore session. type CreateSessionRequest struct { SessionID string `json:"session_id,omitempty"`