Skip to content

Commit e23de23

Browse files
committed
Add splitSQLStatements to db-commons
1 parent 26c3d7a commit e23de23

File tree

3 files changed

+281
-25
lines changed

3 files changed

+281
-25
lines changed

packages/datasources/duckdb/index.cjs

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ const {
33
TypeFidelity,
44
asyncIterableToBatchedAsyncGenerator,
55
cleanQuery,
6-
exhaustStream
6+
exhaustStream,
7+
splitSQLStatements
78
} = require('@evidence-dev/db-commons');
89
const { DuckDBInstance } = require('@duckdb/node-api');
910
const path = require('path');
1011
const fs = require('fs/promises');
1112

13+
1214
/**
1315
* Converts BigInt values to Numbers in an object.
1416
* @param {Record<string, unknown>} obj - The input object with potential BigInt values.
@@ -150,49 +152,78 @@ const runQuery = async (queryString, database, batchSize = 100000) => {
150152
}
151153
}
152154

153-
// Prepare a reader that can stream chunks
154-
const count_query = `WITH root as (${cleanQuery(queryString)}) SELECT COUNT(*) FROM root`;
155+
// Split the incoming SQL into statements and treat all but the last as prefix
156+
// statements which should be executed before metadata queries or the main
157+
// streaming query. This allows SET/USE/CREATE statements to affect the
158+
// session.
159+
const statements = splitSQLStatements(queryString);
160+
const prefixStatements = statements.slice(0, -1);
161+
const mainStatement = statements.length > 0 ? statements[statements.length - 1] : '';
162+
163+
const count_query = mainStatement
164+
? `WITH root as (${cleanQuery(mainStatement)}) SELECT COUNT(*) FROM root`
165+
: null;
155166
let expected_row_count = null;
156167
try {
157-
const countReader = await conn.runAndReadAll(count_query);
158-
await countReader.readAll();
159-
const countRow = countReader.getRowObjectsJS()?.[0];
160-
if (countRow) {
161-
// take the first value from the row (column name may vary)
162-
expected_row_count = Object.values(countRow)[0];
168+
// Execute prefix statements first so they apply to the session
169+
for (const ps of prefixStatements) {
170+
try {
171+
await conn.run(ps).catch(() => null);
172+
} catch (err) {
173+
// ignore errors from prefix statements to mimic existing behavior
174+
}
175+
}
176+
177+
if (count_query) {
178+
const countReader = await conn.runAndReadAll(count_query);
179+
await countReader.readAll();
180+
const countRow = countReader.getRowObjectsJS()?.[0];
181+
if (countRow) {
182+
// take the first value from the row (column name may vary)
183+
expected_row_count = Object.values(countRow)[0];
184+
}
163185
}
164186
} catch (e) {
165187
expected_row_count = null;
166188
}
167189

168-
const column_query = `DESCRIBE ${cleanQuery(queryString)}`;
190+
const column_query = mainStatement ? `DESCRIBE ${cleanQuery(mainStatement)}` : null;
169191
let column_types = null;
170192
try {
171-
const colReader = await conn.runAndReadAll(column_query);
172-
await colReader.readAll();
173-
const describeRows = colReader.getRowObjectsJS();
174-
column_types = duckdbDescribeToEvidenceType(describeRows);
193+
if (column_query) {
194+
const colReader = await conn.runAndReadAll(column_query);
195+
await colReader.readAll();
196+
const describeRows = colReader.getRowObjectsJS();
197+
column_types = duckdbDescribeToEvidenceType(describeRows);
198+
}
175199
} catch (e) {
176200
column_types = null;
177201
}
178202

179203
// Create an async generator that yields batches using the DuckDBResultReader
180-
const reader = await conn.streamAndRead(queryString);
204+
// Stream only the main statement (last statement). If there is no main
205+
// statement, create an empty generator.
206+
let reader = null;
207+
if (mainStatement) {
208+
reader = await conn.streamAndRead(mainStatement);
209+
}
181210

182211
const rowsAsyncIterable = (async function* () {
183212
try {
184213
let prev = 0;
185-
// keep reading until done
186-
while (!reader.done) {
187-
const target = prev + batchSize;
188-
await reader.readUntil(target);
189-
const allRows = reader.getRowObjectsJS();
190-
const newRows = allRows.slice(prev).map(standardizeRow);
191-
// yield rows one-by-one (asyncIterableToBatchedAsyncGenerator expects single-row yields)
192-
for (const r of newRows) {
193-
yield r;
214+
if (reader) {
215+
// keep reading until done
216+
while (!reader.done) {
217+
const target = prev + batchSize;
218+
await reader.readUntil(target);
219+
const allRows = reader.getRowObjectsJS();
220+
const newRows = allRows.slice(prev).map(standardizeRow);
221+
// yield rows one-by-one (asyncIterableToBatchedAsyncGenerator expects single-row yields)
222+
for (const r of newRows) {
223+
yield r;
224+
}
225+
prev = reader.currentRowCount;
194226
}
195-
prev = reader.currentRowCount;
196227
}
197228
} finally {
198229
// disconnect the connection and close the instance synchronously

packages/datasources/duckdb/test/test.js

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import path from 'path';
44
import { fileURLToPath } from 'url';
55
import runQuery from '../index.cjs';
66
import { batchedAsyncGeneratorToArray, TypeFidelity } from '@evidence-dev/db-commons';
7+
import fs from 'fs/promises';
78
import 'dotenv/config';
89

910
test('basic select from needful_things.duckdb', async () => {
@@ -198,6 +199,20 @@ test('query runs', async () => {
198199
}
199200
});
200201

202+
test('expectedRowCount present for UNION with NULL-first-row', async () => {
203+
// Simple reproduction: first row has NULL, second row has a value
204+
const query = `select NULL as a UNION ALL select 1 as a`;
205+
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
206+
// consume rows to ensure the stream runs
207+
const arr = [];
208+
for await (const batch of rows()) {
209+
arr.push(...batch);
210+
}
211+
assert.equal(arr.length, 2);
212+
assert.type(expectedRowCount, 'number');
213+
assert.equal(expectedRowCount, 2);
214+
});
215+
201216
test('query batches results properly', async () => {
202217
try {
203218
const { rows, expectedRowCount } = await runQuery(
@@ -220,4 +235,131 @@ test('query batches results properly', async () => {
220235
}
221236
});
222237

238+
test('handles leading SET statements before main query', async () => {
239+
try {
240+
const query = `
241+
SET VARIABLE VAR1 = DATE '2023-10-04';
242+
SET VARIABLE VAR2 = '23';
243+
select 1 union all select 2 union all select 3;
244+
`;
245+
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
246+
const arr = [];
247+
for await (const batch of rows()) {
248+
arr.push(batch);
249+
}
250+
// should batch into [2,1]
251+
assert.equal(arr[0].length, 2);
252+
assert.equal(arr[1].length, 1);
253+
assert.equal(expectedRowCount, 3);
254+
} catch (e) {
255+
throw Error(e);
256+
}
257+
});
258+
259+
test('semicolon inside single-quoted string should not split statements', async () => {
260+
try {
261+
const query = "SET V='this;is;a;string'; select 1 union all select 2;";
262+
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
263+
const arr = [];
264+
for await (const batch of rows()) {
265+
arr.push(batch);
266+
}
267+
assert.equal(expectedRowCount, 2);
268+
assert.equal(arr[0].length, 2);
269+
} catch (e) {
270+
throw Error(e);
271+
}
272+
});
273+
274+
test('semicolon inside block comment should not split statements', async () => {
275+
try {
276+
const query = "/* comment; still comment; */ select 1 union all select 2;";
277+
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
278+
const arr = [];
279+
for await (const batch of rows()) {
280+
arr.push(batch);
281+
}
282+
assert.equal(expectedRowCount, 2);
283+
assert.equal(arr[0].length, 2);
284+
} catch (e) {
285+
throw Error(e);
286+
}
287+
});
288+
289+
test('handles leading SET statements before main query', async () => {
290+
try {
291+
const query = `
292+
SET VARIABLE VAR1 = DATE '2023-10-04';
293+
SET VARIABLE VAR2 = '23';
294+
select 1 union all select 2 union all select 3;
295+
`;
296+
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
297+
const arr = [];
298+
for await (const batch of rows()) {
299+
arr.push(batch);
300+
}
301+
// should batch into [2,1]
302+
assert.equal(arr[0].length, 2);
303+
assert.equal(arr[1].length, 1);
304+
assert.equal(expectedRowCount, 3);
305+
} catch (e) {
306+
throw Error(e);
307+
}
308+
});
309+
310+
test('semicolon inside single-quoted string should not split statements', async () => {
311+
try {
312+
const query = "SET V='this;is;a;string'; select 1 union all select 2;";
313+
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
314+
const arr = [];
315+
for await (const batch of rows()) {
316+
arr.push(batch);
317+
}
318+
assert.equal(expectedRowCount, 2);
319+
assert.equal(arr[0].length, 2);
320+
} catch (e) {
321+
throw Error(e);
322+
}
323+
});
324+
325+
test('semicolon inside block comment should not split statements', async () => {
326+
try {
327+
const query = '/* comment; still comment; */ select 1 union all select 2;';
328+
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
329+
const arr = [];
330+
for await (const batch of rows()) {
331+
arr.push(batch);
332+
}
333+
assert.equal(expectedRowCount, 2);
334+
assert.equal(arr[0].length, 2);
335+
} catch (e) {
336+
throw Error(e);
337+
}
338+
});
339+
340+
test('USE statement before query should apply to metadata queries', async () => {
341+
try {
342+
// Create a temporary on-disk DuckDB file to test that USE affects
343+
// subsequent queries in the same session. We create a schema and table
344+
// in prefix statements, then USE the schema and select from the table.
345+
// Use an in-memory database so we can CREATE schema/table in the test
346+
const dbFile = ':memory:';
347+
const query = `
348+
CREATE SCHEMA s;
349+
CREATE TABLE s.t AS SELECT 1 AS x UNION ALL SELECT 2 AS x;
350+
USE s;
351+
SELECT x FROM t;
352+
`;
353+
const { rows, expectedRowCount } = await runQuery(query, { filename: dbFile }, 2);
354+
const arr = [];
355+
for await (const batch of rows()) {
356+
arr.push(batch);
357+
}
358+
assert.equal(expectedRowCount, 2);
359+
assert.equal(arr[0].length, 2);
360+
} catch (e) {
361+
throw Error(e);
362+
}
363+
});
364+
223365
test.run();

packages/lib/db-commons/index.cjs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,88 @@ const cleanQuery = (query) => {
197197
return cleanedString + '\n';
198198
};
199199

200+
/**
201+
* Split a SQL script into individual statements, respecting single/double/backtick
202+
* quotes and block/line comments so semicolons inside those constructs don't split.
203+
* Returns an array of statements (strings) without trailing semicolons and trimmed.
204+
* @param {string} sql
205+
* @returns {string[]}
206+
*/
207+
const splitSQLStatements = function (sql) {
208+
const statements = [];
209+
let cur = '';
210+
let inSingle = false;
211+
let inDouble = false;
212+
let inBacktick = false;
213+
let inLineComment = false;
214+
let inBlockComment = false;
215+
for (let i = 0; i < sql.length; i++) {
216+
const ch = sql[i];
217+
const next = sql[i + 1];
218+
219+
if (inLineComment) {
220+
cur += ch;
221+
if (ch === '\n') {
222+
inLineComment = false;
223+
}
224+
continue;
225+
}
226+
if (inBlockComment) {
227+
cur += ch;
228+
if (ch === '*' && next === '/') {
229+
cur += next;
230+
i++;
231+
inBlockComment = false;
232+
}
233+
continue;
234+
}
235+
236+
// start of line comment
237+
if (!inSingle && !inDouble && !inBacktick && ch === '-' && next === '-') {
238+
cur += ch;
239+
inLineComment = true;
240+
continue;
241+
}
242+
// start of block comment
243+
if (!inSingle && !inDouble && !inBacktick && ch === '/' && next === '*') {
244+
cur += ch;
245+
inBlockComment = true;
246+
continue;
247+
}
248+
249+
// quotes
250+
if (!inDouble && !inBacktick && ch === "'") {
251+
inSingle = !inSingle;
252+
cur += ch;
253+
continue;
254+
}
255+
if (!inSingle && !inBacktick && ch === '"') {
256+
inDouble = !inDouble;
257+
cur += ch;
258+
continue;
259+
}
260+
if (!inSingle && !inDouble && ch === '`') {
261+
inBacktick = !inBacktick;
262+
cur += ch;
263+
continue;
264+
}
265+
266+
// semicolon splits statements only when not inside quotes or comments
267+
if (ch === ';' && !inSingle && !inDouble && !inBacktick && !inLineComment && !inBlockComment) {
268+
const s = cur.trim();
269+
if (s.length > 0) statements.push(s);
270+
cur = '';
271+
continue;
272+
}
273+
274+
cur += ch;
275+
}
276+
277+
const last = cur.trim();
278+
if (last.length > 0) statements.push(last);
279+
return statements;
280+
};
281+
200282
/**
201283
* @param {QueryResult} stream
202284
* @returns {Promise<void>}
@@ -217,5 +299,6 @@ exports.asyncIterableToBatchedAsyncGenerator = asyncIterableToBatchedAsyncGenera
217299
exports.batchedAsyncGeneratorToArray = batchedAsyncGeneratorToArray;
218300
exports.cleanQuery = cleanQuery;
219301
exports.exhaustStream = exhaustStream;
302+
exports.splitSQLStatements = splitSQLStatements;
220303

221304
exports.getEnv = require('./src/getEnv.cjs').getEnv;

0 commit comments

Comments
 (0)