Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ func (cli *CLI) ParseFlags(args []string) (*Config, []string, bool, bool, error)
return nil
}), "wait", "")

flags.Var((funcBoolVar)(func(b bool) error {
c.DeleteKey = config.Bool(b)
return nil
}), "delete", "")

flags.BoolVar(&isVersion, "v", false, "")
flags.BoolVar(&isVersion, "version", false, "")

Expand Down Expand Up @@ -618,6 +623,10 @@ Options:
Sets the 'min(:max)' amount of time to wait before writing a template (and
triggering a command)

-delete=<boolean>
Enables deletion of keys in the destination datacenter that do not exist
in the source datacenter. Defaults to true

-v, -version
Print the version of this daemon
`
18 changes: 18 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (

// DefaultStatusDir is the default directory to post status information.
DefaultStatusDir = "service/consul-replicate/statuses"
// DefaultDeleteKey is the default value for deleting destination keys if source keys are empty.
DefaultDeleteKey = true
)

// Config is used to configure Consul ENV
Expand Down Expand Up @@ -74,6 +76,8 @@ type Config struct {

// Wait is the quiescence timers.
Wait *config.WaitConfig `mapstructure:"wait"`

DeleteKey *bool `mapstructure:"delete_key"`
}

// Copy returns a deep copy of the current configuration. This is useful because
Expand Down Expand Up @@ -113,6 +117,8 @@ func (c *Config) Copy() *Config {
o.Wait = c.Wait.Copy()
}

o.DeleteKey = c.DeleteKey

return &o
}

Expand Down Expand Up @@ -174,6 +180,10 @@ func (c *Config) Merge(o *Config) *Config {
r.Wait = r.Wait.Merge(o.Wait)
}

if o.DeleteKey != nil {
r.DeleteKey = o.DeleteKey
}

return r
}

Expand All @@ -195,6 +205,7 @@ func (c *Config) GoString() string {
"StatusDir:%s, "+
"Syslog:%s, "+
"Wait:%s"+
"DeleteKey:%s"+
"}",
c.Consul.GoString(),
c.Excludes.GoString(),
Expand All @@ -207,19 +218,22 @@ func (c *Config) GoString() string {
config.StringGoString(c.StatusDir),
c.Syslog.GoString(),
c.Wait.GoString(),
config.BoolGoString(c.DeleteKey),
)
}

// DefaultConfig returns the default configuration struct. Certain environment
// variables may be set which control the values for the default configuration.
func DefaultConfig() *Config {

return &Config{
Consul: config.DefaultConsulConfig(),
Excludes: DefaultExcludeConfigs(),
Prefixes: DefaultPrefixConfigs(),
StatusDir: config.String(DefaultStatusDir),
Syslog: config.DefaultSyslogConfig(),
Wait: config.DefaultWaitConfig(),
DeleteKey: config.Bool(DefaultDeleteKey),
}
}

Expand Down Expand Up @@ -284,6 +298,10 @@ func (c *Config) Finalize() {
c.Wait = config.DefaultWaitConfig()
}
c.Wait.Finalize()

if c.DeleteKey == nil {
c.DeleteKey = config.Bool(DefaultDeleteKey)
}
}

// Parse parses the given string contents as a config
Expand Down
9 changes: 5 additions & 4 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (r *Runner) Run() error {

// Replicate each prefix in a goroutine
for _, prefix := range prefixes {
go r.replicate(prefix, r.config.Excludes, doneCh, errCh)
go r.replicate(prefix, r.config.Excludes, r.config.DeleteKey, doneCh, errCh)
}

var errs *multierror.Error
Expand Down Expand Up @@ -268,7 +268,7 @@ func (r *Runner) get(prefix *PrefixConfig) (*watch.View, bool) {
// replicate performs replication into the current datacenter from the given
// prefix. This function is designed to be called via a goroutine since it is
// expensive and needs to be parallelized.
func (r *Runner) replicate(prefix *PrefixConfig, excludes *ExcludeConfigs, doneCh chan struct{}, errCh chan error) {
func (r *Runner) replicate(prefix *PrefixConfig, excludes *ExcludeConfigs, deleteKey *bool, doneCh chan struct{}, errCh chan error) {
// Ensure we are not self-replicating
info, err := r.clients.Consul().Agent().Self()
if err != nil {
Expand Down Expand Up @@ -376,7 +376,6 @@ func (r *Runner) replicate(prefix *PrefixConfig, excludes *ExcludeConfigs, doneC
}
for _, key := range localKeys {
excluded := false

// Ignore if the key falls under an excluded prefix
if len(*excludes) > 0 {
sourceKey := strings.Replace(key, config.StringVal(prefix.Destination), config.StringVal(prefix.Source), -1)
Expand All @@ -389,13 +388,15 @@ func (r *Runner) replicate(prefix *PrefixConfig, excludes *ExcludeConfigs, doneC
}
}

if _, ok := usedKeys[key]; !ok && !excluded {
if _, ok := usedKeys[key]; !ok && !excluded && *deleteKey {
if _, err := kv.Delete(key, nil); err != nil {
errCh <- fmt.Errorf("failed to delete %q: %s", key, err)
return
}
log.Printf("[DEBUG] (runner) deleted %q", key)
deletes++
} else if _, ok := usedKeys[key]; !ok && !excluded && !*deleteKey {
log.Printf("DEBUG (runner) %q key exists in destination and not source", key)
}
}

Expand Down