Skip to content

Commit 5a8b425

Browse files
committed
fix: enhance CastToStringE to handle time.Time and *time.Time types
1 parent d8e3bf4 commit 5a8b425

File tree

4 files changed

+205
-0
lines changed

4 files changed

+205
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Test for mixed-case record key references in transform expressions
2+
3+
source: mysql
4+
target: local
5+
6+
hooks:
7+
start:
8+
# Create test table with mixed-case column names (MySQL uses backticks for identifiers)
9+
- type: query
10+
connection: '{source.name}'
11+
query: |
12+
DROP TABLE IF EXISTS mysql.test_record_key_casing;
13+
CREATE TABLE mysql.test_record_key_casing (
14+
id INTEGER,
15+
`DateAdded` TIMESTAMP NULL,
16+
`LastChanged` TIMESTAMP NULL,
17+
`_sling_deleted_at` TIMESTAMP NULL,
18+
value TEXT
19+
);
20+
INSERT INTO mysql.test_record_key_casing (id, `DateAdded`, `LastChanged`, `_sling_deleted_at`, value)
21+
VALUES
22+
(1, '2024-01-15 10:00:00', '2024-01-20 15:30:00', NULL, 'row1'),
23+
(2, '2024-02-10 08:00:00', '2024-02-25 12:00:00', '2024-03-01 09:00:00', NULL),
24+
(3, '2024-03-05 09:00:00', '2024-03-10 18:00:00', NULL, 'row3');
25+
26+
- type: command
27+
command: mkdir -p '{env.output_dir}'
28+
29+
end:
30+
# Check if errored, do not proceed
31+
- type: check
32+
check: execution.status.error == 0
33+
on_failure: break
34+
35+
# Verify output parquet file exists and has data
36+
- type: query
37+
connection: duckdb
38+
query: "SELECT * FROM read_parquet('{env.output_dir}/output.parquet')"
39+
into: result
40+
41+
- type: log
42+
message: |
43+
Parquet output: {store.result}
44+
45+
# Verify true_changed_at column was computed correctly
46+
- type: check
47+
check: length(store.result) == 3
48+
success_message: "SUCCESS: All 3 rows exported successfully with mixed-case column transform"
49+
50+
# Verify the computed column has correct values (should be the greatest of the three timestamps)
51+
- type: check
52+
check: store.result[0].true_changed_at != nil
53+
success_message: "SUCCESS: true_changed_at column was computed correctly"
54+
55+
# Cleanup
56+
- type: query
57+
connection: '{source.name}'
58+
query: DROP TABLE IF EXISTS mysql.test_record_key_casing;
59+
60+
- type: command
61+
command: rm -rf '{env.output_dir}'
62+
63+
streams:
64+
mysql.test_record_key_casing:
65+
object: "file://{output_dir}/output.parquet"
66+
mode: full-refresh
67+
target_options:
68+
format: parquet
69+
transforms:
70+
# IMPORTANT: Record keys are normalized to lowercase internally
71+
# Even though the column is named "DateAdded", use record.dateadded (lowercase)
72+
# This reproduces the customer's exact expression pattern with _sling_deleted_at
73+
- true_changed_at: >
74+
greatest(record.dateadded, record.lastchanged, record._sling_deleted_at)
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Test for mixed-case record key references in transforms (MySQL to BigQuery)
2+
# Uses the same source table created by r.86 test
3+
4+
source: mysql
5+
target: bigquery
6+
7+
hooks:
8+
start:
9+
# Create test table with mixed-case column names (MySQL uses backticks for identifiers)
10+
- type: query
11+
connection: '{source.name}'
12+
query: |
13+
DROP TABLE IF EXISTS mysql.test_record_key_casing;
14+
CREATE TABLE mysql.test_record_key_casing (
15+
id INTEGER,
16+
`DateAdded` TIMESTAMP NULL,
17+
`LastChanged` TIMESTAMP NULL,
18+
`_sling_deleted_at` TIMESTAMP NULL,
19+
value TEXT
20+
);
21+
INSERT INTO mysql.test_record_key_casing (id, `DateAdded`, `LastChanged`, `_sling_deleted_at`, value)
22+
VALUES
23+
(1, '2024-01-15 10:00:00', '2024-01-20 15:30:00', NULL, 'row1'),
24+
(2, '2024-02-10 08:00:00', '2024-02-25 12:00:00', '2024-03-01 09:00:00', NULL),
25+
(3, '2024-03-05 09:00:00', '2024-03-10 18:00:00', NULL, 'row3');
26+
27+
end:
28+
# Check if errored, do not proceed
29+
- type: check
30+
check: execution.status.error == 0
31+
on_failure: break
32+
33+
# Verify BigQuery table has data
34+
- type: query
35+
connection: '{target.name}'
36+
query: "SELECT * FROM public.test_record_key_casing ORDER BY id"
37+
into: result
38+
39+
- type: log
40+
message: |
41+
BigQuery output: {store.result}
42+
43+
# Verify row count
44+
- type: check
45+
check: length(store.result) == 3
46+
success_message: "SUCCESS: All 3 rows exported successfully with mixed-case column transform (BigQuery)"
47+
48+
# Verify the computed column has correct values
49+
- type: check
50+
check: store.result[0].true_changed_at != nil
51+
success_message: "SUCCESS: true_changed_at column was computed correctly (BigQuery)"
52+
53+
# Cleanup BigQuery table
54+
- type: query
55+
connection: '{target.name}'
56+
query: DROP TABLE IF EXISTS public.test_record_key_casing
57+
58+
# Cleanup MySQL table
59+
- type: query
60+
connection: '{source.name}'
61+
query: DROP TABLE IF EXISTS mysql.test_record_key_casing
62+
63+
streams:
64+
mysql.test_record_key_casing:
65+
object: public.test_record_key_casing
66+
mode: full-refresh
67+
target_options:
68+
direct_insert: true
69+
# transforms:
70+
# - true_changed_at: >
71+
# greatest(
72+
# cast(record.dateadded, "timestamp"),
73+
# cast(record.lastchanged, "timestamp"),
74+
# cast(record._sling_deleted_at, "timestamp")
75+
# )
76+
77+
# transforms:
78+
# - true_changed_at: >
79+
# greatest(
80+
# cast(replace(record.dateadded, "\"", ""), "timestamp"),
81+
# cast(replace(record.lastchanged, "\"", ""), "timestamp"),
82+
# cast(replace(record._sling_deleted_at, "\"", ""), "timestamp")
83+
# )
84+
85+
transforms:
86+
# - dateadded: log(greatest(record.dateadded, record.lastchanged))
87+
88+
- true_changed_at: >
89+
greatest(
90+
date_parse(record.dateadded),
91+
date_parse(record.lastchanged),
92+
date_parse(record._sling_deleted_at)
93+
)
94+
95+
# - true_changed_at: >
96+
# greatest(
97+
# date_parse(replace(record.dateadded, "\"", "")),
98+
# date_parse(replace(record.lastchanged, "\"", "")),
99+
# date_parse(replace(record._sling_deleted_at, "\"", ""))
100+
# )
101+
102+
# transforms:
103+
# - true_changed_at: >
104+
# date_parse(
105+
# greatest(record.dateadded, record.lastchanged. record._sling_deleted_at)
106+
# )

cmd/sling/tests/suite.cli.yaml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,26 @@
13951395
- "SUCCESS: IDs 9,10 have _sling_synced_op='D' (soft deleted)"
13961396
- "SUCCESS: IDs 1-8 have _sling_synced_op='U' (updated)"
13971397

1398+
- id: 156
1399+
name: Test mixed-case record key references in transforms (MySQL to local parquet)
1400+
run: 'sling run -d -r cmd/sling/tests/replications/r.86.record_key_casing.yaml'
1401+
streams: 1
1402+
rows: 3
1403+
output_contains:
1404+
- 'SUCCESS: All 3 rows exported successfully with mixed-case column transform'
1405+
- 'SUCCESS: true_changed_at column was computed correctly'
1406+
- 'execution succeeded'
1407+
1408+
- id: 157
1409+
name: Test mixed-case record key references in transforms (MySQL to BigQuery)
1410+
run: 'sling run -d -r cmd/sling/tests/replications/r.87.record_key_casing_bigquery.yaml'
1411+
streams: 1
1412+
rows: 3
1413+
output_contains:
1414+
- 'SUCCESS: All 3 rows exported successfully with mixed-case column transform (BigQuery)'
1415+
- 'SUCCESS: true_changed_at column was computed correctly (BigQuery)'
1416+
- 'execution succeeded'
1417+
13981418
# # Test PostGIS (PostgreSQL) to GeoJSON as target with geometry column name "geom"
13991419
# - id: 150
14001420
# name: Test PostGIS (PostgreSQL) to GeoJSON as target

core/dbio/iop/stream_processor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,10 @@ func (sp *StreamProcessor) CastToStringE(val any) (valString string, err error)
11111111
if err != nil {
11121112
return "", g.Error(err, "could not cast to string: %#v", v)
11131113
}
1114+
case time.Time:
1115+
valString = v.Format(time.RFC3339Nano)
1116+
case *time.Time:
1117+
valString = v.Format(time.RFC3339Nano)
11141118
case chJSON: // Clickhouse JSON / Variant or any with MarshalJSON()
11151119
var sBytes []byte
11161120
sBytes, err = v.MarshalJSON()
@@ -1582,6 +1586,7 @@ func (sp *StreamProcessor) CastRow(row []any, columns Columns) []any {
15821586
for i, val := range row {
15831587
col := &columns[i]
15841588
row[i] = sp.CastVal(i, val, col)
1589+
// g.Warn("%d | col %s | nVal => %#v", sp.N, col.Name, row[i])
15851590
if row[i] != nil && row[i] != "" {
15861591
sp.colStats[i].LastVal = row[i]
15871592
}

0 commit comments

Comments
 (0)