Skip to content

Commit 7a944fc

Browse files
committed
refactor: replace hardcoded column names with reserved fields
1 parent 6623fa4 commit 7a944fc

File tree

6 files changed

+33
-25
lines changed

6 files changed

+33
-25
lines changed

core/dbio/iop/duckdb.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,8 +1426,7 @@ func (duck *DuckDb) MakeScanQuery(format dbio.FileType, uri string, fsc FileStre
14261426
}
14271427

14281428
// reserved word to use for timestamp comparison (when listing)
1429-
const slingLoadedAtColumn = "_sling_loaded_at"
1430-
if fsc.IncrementalKey != "" && fsc.IncrementalKey != slingLoadedAtColumn &&
1429+
if fsc.IncrementalKey != "" && fsc.IncrementalKey != env.ReservedFields.LoadedAt &&
14311430
fsc.IncrementalValue != "" {
14321431
incrementalWhereCond = g.F("%s > %s", dbio.TypeDbDuckDb.Quote(fsc.IncrementalKey), fsc.IncrementalValue)
14331432
where = g.F("where %s", incrementalWhereCond)

core/env/env.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,23 @@ var (
4949
setupOtel = func() {}
5050

5151
ReservedFields = struct {
52-
SyncedOp string
52+
SyncedOp string
53+
LoadedAt string
54+
SyncedAt string
55+
DeletedAt string
56+
StreamURL string
57+
RowNum string
58+
RowID string
59+
ExecID string
5360
}{
54-
SyncedOp: "_sling_synced_op",
61+
SyncedOp: "_sling_synced_op",
62+
LoadedAt: "_sling_loaded_at",
63+
SyncedAt: "_sling_synced_at",
64+
DeletedAt: "_sling_deleted_at",
65+
StreamURL: "_sling_stream_url",
66+
RowNum: "_sling_row_num",
67+
RowID: "_sling_row_id",
68+
ExecID: "_sling_exec_id",
5569
}
5670
)
5771

core/sling/config.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/slingdata-io/sling-cli/core/dbio/connection"
1616
"github.com/slingdata-io/sling-cli/core/dbio/database"
1717
"github.com/slingdata-io/sling-cli/core/dbio/filesys"
18+
"github.com/slingdata-io/sling-cli/core/env"
1819
"github.com/spf13/cast"
1920

2021
"github.com/flarco/g"
@@ -161,7 +162,7 @@ func (cfg *Config) SetDefault() {
161162
if val := os.Getenv("SLING_SYNCED_AT_COLUMN"); val != "" {
162163
if cast.ToBool(val) {
163164
cfg.MetadataSyncedAt = g.Bool(true)
164-
slingDeletedAtColumn = slingSyncedAtColumn // deleted_at becomes synched_at
165+
env.ReservedFields.DeletedAt = env.ReservedFields.SyncedAt // deleted_at becomes synched_at
165166
} else {
166167
cfg.MetadataSyncedAt = g.Bool(false)
167168
}
@@ -366,10 +367,10 @@ func (cfg *Config) DetermineType() (Type JobType, err error) {
366367
// OK, no need for update key
367368
} else if srcApiProvided {
368369
// OK, no need for update key/pk, API uses SLING_STATE for tracking
369-
} else if srcFileProvided && cfg.Source.UpdateKey == slingLoadedAtColumn {
370+
} else if srcFileProvided && cfg.Source.UpdateKey == env.ReservedFields.LoadedAt {
370371
// need to loaded_at column for file incremental
371372
cfg.MetadataLoadedAt = g.Bool(true)
372-
} else if srcFileProvided && cfg.Source.UpdateKey == slingSyncedAtColumn {
373+
} else if srcFileProvided && cfg.Source.UpdateKey == env.ReservedFields.SyncedAt {
373374
cfg.MetadataSyncedAt = g.Bool(true)
374375
} else if cfg.Source.UpdateKey == "" && len(cfg.Source.PrimaryKey()) == 0 {
375376
err = g.Error("must specify value for 'update_key' and/or 'primary_key' for incremental mode. See docs for more details: https://docs.slingdata.io/sling-cli/run/configuration")

core/sling/task.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -342,33 +342,33 @@ func (t *TaskExecution) GetRate(secWindow int) (rowRate, byteRate int64) {
342342

343343
func (t *TaskExecution) setGetMetadata() (metadata iop.Metadata) {
344344
if t.Config.MetadataSyncedAt != nil && *t.Config.MetadataSyncedAt {
345-
metadata.SyncedAt.Key = slingSyncedAtColumn
345+
metadata.SyncedAt.Key = env.ReservedFields.SyncedAt
346346
metadata.SyncedAt.Value = *t.StartTime // only timestamp
347347
metadata.SyncedOp.Key = env.ReservedFields.SyncedOp
348348
metadata.SyncedOp.Value = "I" // default to insert operation
349349
} else if t.Config.MetadataLoadedAt != nil && *t.Config.MetadataLoadedAt {
350-
metadata.SyncedAt.Key = slingLoadedAtColumn
350+
metadata.SyncedAt.Key = env.ReservedFields.LoadedAt
351351
if os.Getenv("SLING_LOADED_AT_COLUMN") == "timestamp" {
352352
metadata.SyncedAt.Value = *t.StartTime
353353
} else {
354354
metadata.SyncedAt.Value = t.StartTime.Unix()
355355
}
356356
}
357357
if t.Config.MetadataStreamURL {
358-
metadata.StreamURL.Key = slingStreamURLColumn
358+
metadata.StreamURL.Key = env.ReservedFields.StreamURL
359359
}
360360

361361
if t.Config.MetadataRowID {
362-
metadata.RowID.Key = slingRowIDColumn
362+
metadata.RowID.Key = env.ReservedFields.RowID
363363
}
364364

365365
if t.Config.MetadataExecID {
366-
metadata.ExecID.Key = slingExecIDColumn
366+
metadata.ExecID.Key = env.ReservedFields.ExecID
367367
metadata.ExecID.Value = t.ExecID
368368
}
369369

370370
if t.Config.MetadataRowNum {
371-
metadata.RowNum.Key = slingRowNumColumn
371+
metadata.RowNum.Key = env.ReservedFields.RowNum
372372
}
373373

374374
// StarRocks: add _sling_row_id column if there is no primary,
@@ -390,8 +390,8 @@ func (t *TaskExecution) setGetMetadata() (metadata iop.Metadata) {
390390
}
391391

392392
if addRowIDCol {
393-
metadata.RowID.Key = slingRowIDColumn
394-
t.Config.Target.Options.TableKeys[iop.HashKey] = []string{slingRowIDColumn}
393+
metadata.RowID.Key = env.ReservedFields.RowID
394+
t.Config.Target.Options.TableKeys[iop.HashKey] = []string{env.ReservedFields.RowID}
395395
}
396396
}
397397

core/sling/task_run.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,7 @@ import (
2525
)
2626

2727
var (
28-
start time.Time
29-
slingLoadedAtColumn = "_sling_loaded_at"
30-
slingSyncedAtColumn = "_sling_synced_at"
31-
slingDeletedAtColumn = "_sling_deleted_at"
32-
slingStreamURLColumn = "_sling_stream_url"
33-
slingRowNumColumn = "_sling_row_num"
34-
slingRowIDColumn = "_sling_row_id"
35-
slingExecIDColumn = "_sling_exec_id"
28+
start time.Time
3629
)
3730

3831
var deleteMissing func(*TaskExecution, database.Connection, database.Connection) error = func(_ *TaskExecution, _, _ database.Connection) error {
@@ -484,7 +477,7 @@ func (t *TaskExecution) runFileToDB() (err error) {
484477
t.Context.Map.Set("incremental_value", t.Config.IncrementalValStr)
485478
} else if t.isIncrementalWithUpdateKey() && !t.Config.IsIncrementalWithRange() {
486479
if t.Config.Source.UpdateKey == "." {
487-
t.Config.Source.UpdateKey = slingLoadedAtColumn
480+
t.Config.Source.UpdateKey = env.ReservedFields.LoadedAt
488481
}
489482
t.SetProgress("getting checkpoint value (%s)", t.Config.Source.UpdateKey)
490483

core/sling/task_run_read.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/slingdata-io/sling-cli/core/dbio/database"
1313
"github.com/slingdata-io/sling-cli/core/dbio/filesys"
1414
"github.com/slingdata-io/sling-cli/core/dbio/iop"
15+
"github.com/slingdata-io/sling-cli/core/env"
1516
"github.com/spf13/cast"
1617
)
1718

@@ -273,7 +274,7 @@ func (t *TaskExecution) ReadFromFile(cfg *Config) (df *iop.Dataflow, err error)
273274

274275
if t.Config.HasIncrementalVal() && !t.Config.IsFileStreamWithStateAndParts() {
275276
// file stream incremental mode
276-
if g.In(t.Config.Source.UpdateKey, slingLoadedAtColumn, slingSyncedAtColumn) {
277+
if g.In(t.Config.Source.UpdateKey, env.ReservedFields.LoadedAt, env.ReservedFields.SyncedAt) {
277278
options["SLING_FS_TIMESTAMP"] = t.Config.IncrementalValStr
278279
g.Debug(`file stream using file_sys_timestamp=%#v and update_key=%s`, t.Config.IncrementalValStr, t.Config.Source.UpdateKey)
279280
} else {

0 commit comments

Comments
 (0)