From 85e5c9e54a7118a5bd40a1431b643299fad7aa10 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 11 Feb 2026 16:21:23 -0500 Subject: [PATCH] initial admission controller logic --- command/agent/agent.go | 8 ++ command/agent/config.go | 23 ++++++ command/agent/config_parse.go | 1 + nomad/admissioncontrollers/controller.go | 12 +++ .../external_controller.go | 77 +++++++++++++++++++ nomad/config.go | 5 ++ nomad/job_endpoint.go | 17 +++- nomad/server.go | 24 ++++++ nomad/structs/config/admission_controller.go | 18 +++++ nomad/structs/structs.go | 7 ++ 10 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 nomad/admissioncontrollers/controller.go create mode 100644 nomad/admissioncontrollers/external_controller.go create mode 100644 nomad/structs/config/admission_controller.go diff --git a/command/agent/agent.go b/command/agent/agent.go index 7eaa11020c8..9ca011bf058 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -679,6 +679,14 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { } } + for _, c := range agentConfig.Server.AdmissionControllers.External { + conf.AdmissionControllers.External = append(conf.AdmissionControllers.External, config.ExternalController{ + Name: c.Name, + Endpoint: c.Endpoint, + NodePool: c.NodePool, + }) + } + if err := conf.NodeIntroductionConfig.Validate(); err != nil { return nil, fmt.Errorf("invalid server.client_introduction configuration: %w", err) } diff --git a/command/agent/config.go b/command/agent/config.go index b559ab873ff..0980eaae5b2 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -532,11 +532,31 @@ func (a *ACLConfig) Copy() *ACLConfig { return &na } +type BuiltinController struct { + Enabled bool `hcl:"enabled"` +} + +type ExternalController struct { + Name string `hcl:",key"` + Endpoint string `hcl:"endpoint"` + NodePool string `hcl:"node_pool"` +} + +type AdmissionControllers struct { + // In-tree controllers would be added here + + // Out-of-tree controllers + External []ExternalController `hcl:"external,block"` +} + // ServerConfig is configuration specific to the server mode type ServerConfig struct { // Enabled controls if we are a server Enabled bool `hcl:"enabled"` + // AdmissionControllers are used for custom job mutation + AdmissionControllers *AdmissionControllers `hcl:"admission_controllers"` + // AuthoritativeRegion is used to control which region is treated as // the source of truth for global tokens and ACL policies. AuthoritativeRegion string `hcl:"authoritative_region"` @@ -2509,6 +2529,9 @@ func (a *ACLConfig) Merge(b *ACLConfig) *ACLConfig { func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result := *s + if b.AdmissionControllers != nil { + result.AdmissionControllers = b.AdmissionControllers + } if b.Enabled { result.Enabled = true } diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 754e4c531df..3a82aeaabad 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -57,6 +57,7 @@ func ParseConfigFile(path string) (*Config, error) { ClientIntroduction: &ClientIntroduction{}, PlanRejectionTracker: &PlanRejectionTracker{}, ServerJoin: &ServerJoin{}, + AdmissionControllers: &AdmissionControllers{}, }, ACL: &ACLConfig{}, RPC: &RPCConfig{}, diff --git a/nomad/admissioncontrollers/controller.go b/nomad/admissioncontrollers/controller.go new file mode 100644 index 00000000000..6a004e58dd2 --- /dev/null +++ b/nomad/admissioncontrollers/controller.go @@ -0,0 +1,12 @@ +package admission + +import ( + "context" + + "github.com/hashicorp/nomad/nomad/structs" +) + +type AdmissionController interface { + Start(context.Context) + AdmitJob(*structs.Job) ([]error, error) +} diff --git a/nomad/admissioncontrollers/external_controller.go b/nomad/admissioncontrollers/external_controller.go new file mode 100644 index 00000000000..4f3ed739727 --- /dev/null +++ b/nomad/admissioncontrollers/external_controller.go @@ -0,0 +1,77 @@ +package admission + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/hashicorp/nomad/nomad/structs" +) + +type External struct { + name string + endpoint string + nodePool string +} + +func NewExternalController(name, endpoint, nodePool string) *External { + return &External{ + name: name, + endpoint: endpoint, + nodePool: nodePool, + } +} + +// Start is a noop for external controllers +func (c *External) Start(ctx context.Context) {} + +func (c *External) Name() string { + return c.name +} + +// AdmitJob is used to send the job to an external process for processing. +// +// This is a basic implementation for proof of concept. +func (c *External) AdmitJob(job *structs.Job) (warnings []error, err error) { + if job.NodePool != c.nodePool { + return nil, nil + } + + data, err := json.Marshal(job) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", c.endpoint, bytes.NewReader(data)) + if err != nil { + return nil, err + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("received bad response code, %d", res.StatusCode) + } + + newJob := &structs.Job{} + b, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + err = json.Unmarshal(b, newJob) + if err != nil { + return nil, err + } + + job = newJob + + return nil, nil +} diff --git a/nomad/config.go b/nomad/config.go index 41f149402b9..cb5281e512d 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -457,6 +457,10 @@ type Config struct { // requests and perform the appropriate enforcement actions. NodeIntroductionConfig *structs.NodeIntroductionConfig + // AdmissionControllers are the set of configured controllers to + // be invoked during job registration. + AdmissionControllers *config.AdmissionControllers + // LogFile is used by MonitorExport to stream a server's log file LogFile string `hcl:"log_file"` } @@ -586,6 +590,7 @@ func DefaultConfig() *Config { } c := &Config{ + AdmissionControllers: &config.AdmissionControllers{}, Region: DefaultRegion, AuthoritativeRegion: DefaultRegion, Datacenter: DefaultDC, diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 94f2c523fa1..6846f9e5618 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -16,12 +16,13 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" metrics "github.com/hashicorp/go-metrics/compat" - "github.com/hashicorp/go-multierror" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" + admission "github.com/hashicorp/nomad/nomad/admissioncontrollers" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state/paginator" "github.com/hashicorp/nomad/nomad/structs" @@ -60,6 +61,8 @@ type Job struct { // builtin admission controllers mutators []jobMutator validators []jobValidator + + ctrls []admission.AdmissionController } // NewJobEndpoints creates a new job endpoint with builtin admission controllers @@ -91,6 +94,7 @@ func NewJobEndpoints(s *Server, ctx *RPCContext) *Job { jobNumaHook{}, &jobSchedHook{}, }, + ctrls: s.admissionControllers, } } @@ -143,6 +147,15 @@ func (j *Job) doRegister(aclObj *acl.ACL, additionalAllowedPermissions []string, // Run the submission controller warnings = append(warnings, j.submissionController(args)) + for _, ctrl := range j.ctrls { + warn, err := ctrl.AdmitJob(args.Job) + if err != nil { + return err + } + + warnings = append(warnings, warn...) + } + // Attach the user token's accessor ID so that deploymentwatcher can // reference the token later in multiregion deployments. We can't auth once // and then use the leader ACL because the leader ACLs aren't shared across @@ -311,7 +324,7 @@ func (j *Job) doRegister(aclObj *acl.ACL, additionalAllowedPermissions []string, args.Job.SubmitTime = now // If the job is periodic or parameterized, we don't create an eval. - if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) { + if !(args.Job.IsPeriodic() || args.Job.IsParameterized() || args.Job.SkipEvalCreation) { // Initially set the eval priority to that of the job priority. If the // user supplied an eval priority override, we subsequently use this. diff --git a/nomad/server.go b/nomad/server.go index a5cf7a9a792..e2cfbcc7637 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -40,6 +40,7 @@ import ( "github.com/hashicorp/nomad/helper/pool" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/lib/auth/oidc" + admission "github.com/hashicorp/nomad/nomad/admissioncontrollers" "github.com/hashicorp/nomad/nomad/auth" "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/drainer" @@ -310,6 +311,8 @@ type Server struct { // MAY BE nil! Issuer must be explicitly configured by the end user. oidcDisco *structs.OIDCDiscoveryConfig + admissionControllers []admission.AdmissionController + // EnterpriseState is used to fill in state for Pro/Ent builds EnterpriseState @@ -450,6 +453,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc // 6 minutes is 1 minute longer than the JWT expiration time in the cap lib. s.oidcRequestCache = oidc.NewRequestCache(6 * time.Minute) + // Admission controllers must be setup before the RPC handlers since + // they are copied to the JobEndpoint handler. + s.setupAdmissionControllers() + // Initialize the RPC layer if err := s.setupRPC(tlsWrap); err != nil { s.Shutdown() @@ -570,6 +577,23 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc return s, nil } +func (s *Server) setupAdmissionControllers() { + ctrl := s.config.AdmissionControllers + // Add enabled in-tree controllers + + // Add enabled external controllers + for _, c := range ctrl.External { + // external controllers run as a separate process so do not need to be "started" + s.admissionControllers = append(s.admissionControllers, admission.NewExternalController(c.Name, c.Endpoint, c.NodePool)) + } +} + +func (s *Server) startAdmissionControllers() { + for _, ctrl := range s.admissionControllers { + go ctrl.Start(s.shutdownCtx) + } +} + // startRPCListener starts the server's the RPC listener func (s *Server) startRPCListener() { ctx, cancel := context.WithCancel(context.Background()) diff --git a/nomad/structs/config/admission_controller.go b/nomad/structs/config/admission_controller.go new file mode 100644 index 00000000000..1165389affd --- /dev/null +++ b/nomad/structs/config/admission_controller.go @@ -0,0 +1,18 @@ +package config + +type BuiltinController struct { + Enabled bool +} + +type ExternalController struct { + Name string + Endpoint string + NodePool string +} + +type AdmissionControllers struct { + // In-tree controllers + + // Out-of-tree controllers + External []ExternalController +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index df142bab0dc..4f37fd03fe3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4489,6 +4489,13 @@ type Job struct { // used to register this version of the job. Used by deploymentwatcher. NomadTokenID string + // SkipEvalCreation is used to signify that an evaluation should not be + // created for this job during registration. This field could be used for + // parameterized/periodic jobs, or for external processes like admission + // controllers to use Nomad to store jobs via Raft, but still control when + // they are run. + SkipEvalCreation bool + // Job status Status string