Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,14 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
}
}

for _, c := range agentConfig.Server.AdmissionControllers.External {
conf.AdmissionControllers.External = append(conf.AdmissionControllers.External, config.ExternalController{
Name: c.Name,
Endpoint: c.Endpoint,
NodePool: c.NodePool,
})
}

if err := conf.NodeIntroductionConfig.Validate(); err != nil {
return nil, fmt.Errorf("invalid server.client_introduction configuration: %w", err)
}
Expand Down
23 changes: 23 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,31 @@ func (a *ACLConfig) Copy() *ACLConfig {
return &na
}

type BuiltinController struct {
Enabled bool `hcl:"enabled"`
}

type ExternalController struct {
Name string `hcl:",key"`
Endpoint string `hcl:"endpoint"`
NodePool string `hcl:"node_pool"`
}

type AdmissionControllers struct {
// In-tree controllers would be added here

// Out-of-tree controllers
External []ExternalController `hcl:"external,block"`
}

// ServerConfig is configuration specific to the server mode
type ServerConfig struct {
// Enabled controls if we are a server
Enabled bool `hcl:"enabled"`

// AdmissionControllers are used for custom job mutation
AdmissionControllers *AdmissionControllers `hcl:"admission_controllers"`

// AuthoritativeRegion is used to control which region is treated as
// the source of truth for global tokens and ACL policies.
AuthoritativeRegion string `hcl:"authoritative_region"`
Expand Down Expand Up @@ -2509,6 +2529,9 @@ func (a *ACLConfig) Merge(b *ACLConfig) *ACLConfig {
func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
result := *s

if b.AdmissionControllers != nil {
result.AdmissionControllers = b.AdmissionControllers
}
if b.Enabled {
result.Enabled = true
}
Expand Down
1 change: 1 addition & 0 deletions command/agent/config_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func ParseConfigFile(path string) (*Config, error) {
ClientIntroduction: &ClientIntroduction{},
PlanRejectionTracker: &PlanRejectionTracker{},
ServerJoin: &ServerJoin{},
AdmissionControllers: &AdmissionControllers{},
},
ACL: &ACLConfig{},
RPC: &RPCConfig{},
Expand Down
12 changes: 12 additions & 0 deletions nomad/admissioncontrollers/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package admission

import (
"context"

"github.com/hashicorp/nomad/nomad/structs"
)

type AdmissionController interface {
Start(context.Context)
AdmitJob(*structs.Job) ([]error, error)
}
77 changes: 77 additions & 0 deletions nomad/admissioncontrollers/external_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package admission

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/hashicorp/nomad/nomad/structs"
)

type External struct {
name string
endpoint string
nodePool string
}

func NewExternalController(name, endpoint, nodePool string) *External {
return &External{
name: name,
endpoint: endpoint,
nodePool: nodePool,
}
}

// Start is a noop for external controllers
func (c *External) Start(ctx context.Context) {}

func (c *External) Name() string {
return c.name
}

// AdmitJob is used to send the job to an external process for processing.
//
// This is a basic implementation for proof of concept.
func (c *External) AdmitJob(job *structs.Job) (warnings []error, err error) {
if job.NodePool != c.nodePool {
return nil, nil
}

data, err := json.Marshal(job)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", c.endpoint, bytes.NewReader(data))
if err != nil {
return nil, err
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("received bad response code, %d", res.StatusCode)
}

newJob := &structs.Job{}
b, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

err = json.Unmarshal(b, newJob)
if err != nil {
return nil, err
}

job = newJob

return nil, nil
}
5 changes: 5 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,10 @@ type Config struct {
// requests and perform the appropriate enforcement actions.
NodeIntroductionConfig *structs.NodeIntroductionConfig

// AdmissionControllers are the set of configured controllers to
// be invoked during job registration.
AdmissionControllers *config.AdmissionControllers

// LogFile is used by MonitorExport to stream a server's log file
LogFile string `hcl:"log_file"`
}
Expand Down Expand Up @@ -586,6 +590,7 @@ func DefaultConfig() *Config {
}

c := &Config{
AdmissionControllers: &config.AdmissionControllers{},
Region: DefaultRegion,
AuthoritativeRegion: DefaultRegion,
Datacenter: DefaultDC,
Expand Down
17 changes: 15 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
metrics "github.com/hashicorp/go-metrics/compat"
"github.com/hashicorp/go-multierror"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-set/v3"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
admission "github.com/hashicorp/nomad/nomad/admissioncontrollers"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -60,6 +61,8 @@ type Job struct {
// builtin admission controllers
mutators []jobMutator
validators []jobValidator

ctrls []admission.AdmissionController
}

// NewJobEndpoints creates a new job endpoint with builtin admission controllers
Expand Down Expand Up @@ -91,6 +94,7 @@ func NewJobEndpoints(s *Server, ctx *RPCContext) *Job {
jobNumaHook{},
&jobSchedHook{},
},
ctrls: s.admissionControllers,
}
}

Expand Down Expand Up @@ -143,6 +147,15 @@ func (j *Job) doRegister(aclObj *acl.ACL, additionalAllowedPermissions []string,
// Run the submission controller
warnings = append(warnings, j.submissionController(args))

for _, ctrl := range j.ctrls {
warn, err := ctrl.AdmitJob(args.Job)
if err != nil {
return err
}

warnings = append(warnings, warn...)
}

// Attach the user token's accessor ID so that deploymentwatcher can
// reference the token later in multiregion deployments. We can't auth once
// and then use the leader ACL because the leader ACLs aren't shared across
Expand Down Expand Up @@ -311,7 +324,7 @@ func (j *Job) doRegister(aclObj *acl.ACL, additionalAllowedPermissions []string,
args.Job.SubmitTime = now

// If the job is periodic or parameterized, we don't create an eval.
if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) {
if !(args.Job.IsPeriodic() || args.Job.IsParameterized() || args.Job.SkipEvalCreation) {

// Initially set the eval priority to that of the job priority. If the
// user supplied an eval priority override, we subsequently use this.
Expand Down
24 changes: 24 additions & 0 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/lib/auth/oidc"
admission "github.com/hashicorp/nomad/nomad/admissioncontrollers"
"github.com/hashicorp/nomad/nomad/auth"
"github.com/hashicorp/nomad/nomad/deploymentwatcher"
"github.com/hashicorp/nomad/nomad/drainer"
Expand Down Expand Up @@ -310,6 +311,8 @@ type Server struct {
// MAY BE nil! Issuer must be explicitly configured by the end user.
oidcDisco *structs.OIDCDiscoveryConfig

admissionControllers []admission.AdmissionController

// EnterpriseState is used to fill in state for Pro/Ent builds
EnterpriseState

Expand Down Expand Up @@ -450,6 +453,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
// 6 minutes is 1 minute longer than the JWT expiration time in the cap lib.
s.oidcRequestCache = oidc.NewRequestCache(6 * time.Minute)

// Admission controllers must be setup before the RPC handlers since
// they are copied to the JobEndpoint handler.
s.setupAdmissionControllers()

// Initialize the RPC layer
if err := s.setupRPC(tlsWrap); err != nil {
s.Shutdown()
Expand Down Expand Up @@ -570,6 +577,23 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigFunc
return s, nil
}

func (s *Server) setupAdmissionControllers() {
ctrl := s.config.AdmissionControllers
// Add enabled in-tree controllers

// Add enabled external controllers
for _, c := range ctrl.External {
// external controllers run as a separate process so do not need to be "started"
s.admissionControllers = append(s.admissionControllers, admission.NewExternalController(c.Name, c.Endpoint, c.NodePool))
}
}

func (s *Server) startAdmissionControllers() {
for _, ctrl := range s.admissionControllers {
go ctrl.Start(s.shutdownCtx)
}
}

// startRPCListener starts the server's the RPC listener
func (s *Server) startRPCListener() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
18 changes: 18 additions & 0 deletions nomad/structs/config/admission_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package config

type BuiltinController struct {
Enabled bool
}

type ExternalController struct {
Name string
Endpoint string
NodePool string
}

type AdmissionControllers struct {
// In-tree controllers

// Out-of-tree controllers
External []ExternalController
}
7 changes: 7 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4489,6 +4489,13 @@ type Job struct {
// used to register this version of the job. Used by deploymentwatcher.
NomadTokenID string

// SkipEvalCreation is used to signify that an evaluation should not be
// created for this job during registration. This field could be used for
// parameterized/periodic jobs, or for external processes like admission
// controllers to use Nomad to store jobs via Raft, but still control when
// they are run.
SkipEvalCreation bool

// Job status
Status string

Expand Down
Loading