Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 102 additions & 17 deletions packages/datasources/duckdb/index.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ const {
TypeFidelity,
asyncIterableToBatchedAsyncGenerator,
cleanQuery,
exhaustStream
exhaustStream,
splitSQLStatement
} = require('@evidence-dev/db-commons');
const { Database } = require('duckdb-async');
const { DuckDBInstance } = require('@duckdb/node-api');
const path = require('path');
const fs = require('fs/promises');


/**
* Converts BigInt values to Numbers in an object.
* @param {Record<string, unknown>} obj - The input object with potential BigInt values.
Expand Down Expand Up @@ -125,11 +127,15 @@ const runQuery = async (queryString, database, batchSize = 100000) => {
} else if (database.directory) {
// Local database stored in source directory
filename = path.join(database.directory, database.filename);
} else {
// filename provided without a directory; use as given (may be absolute or relative)
filename = database.filename;
}
}

const mode = filename !== ':memory:' ? 'READ_ONLY' : 'READ_WRITE';
const db = await Database.create(filename, {
// Create a DuckDB instance and connection using the new node-api
const db = await DuckDBInstance.create(filename, {
access_mode: mode,
custom_user_agent: 'evidence-dev'
});
Expand All @@ -141,27 +147,106 @@ const runQuery = async (queryString, database, batchSize = 100000) => {
const initScript = await fs.readFile(path.resolve(database.directory, 'initialize.sql'), {
encoding: 'utf-8'
});
await conn.exec(initScript);
// run the initialization script
await conn.run(initScript).catch(() => null);
}
}

const stream = conn.stream(queryString);
// Split the incoming SQL into statements and treat all but the last as prefix
// statements which should be executed before metadata queries or the main
// streaming query. This allows SET/USE/CREATE statements to affect the
// session.
const statements = splitSQLStatement(queryString);
const prefixStatements = statements.slice(0, -1);
const mainStatement = statements.length > 0 ? statements[statements.length - 1] : '';

const count_query = `WITH root as (${cleanQuery(queryString)}) SELECT COUNT(*) FROM root`;
const expected_count = await db.all(count_query).catch(() => null);
const expected_row_count = expected_count?.[0]['count_star()'];
const count_query = mainStatement
? `WITH root as (${cleanQuery(mainStatement)}) SELECT COUNT(*) FROM root`
: null;
let expected_row_count = null;
try {
// Execute prefix statements first so they apply to the session
for (const ps of prefixStatements) {
try {
await conn.run(ps).catch(() => null);
} catch (err) {
// ignore errors from prefix statements to mimic existing behavior
}
}

const column_query = `DESCRIBE ${cleanQuery(queryString)}`;
const column_types = await db
.all(column_query)
.then(duckdbDescribeToEvidenceType)
.catch(() => null);
if (count_query) {
const countReader = await conn.runAndReadAll(count_query);
await countReader.readAll();
const countRow = countReader.getRowObjectsJS()?.[0];
if (countRow) {
// take the first value from the row (column name may vary)
expected_row_count = Object.values(countRow)[0];
}
}
} catch (e) {
expected_row_count = null;
}

const results = await asyncIterableToBatchedAsyncGenerator(stream, batchSize, {
mapResultsToEvidenceColumnTypes:
column_types == null ? mapResultsToEvidenceColumnTypes : undefined,
const column_query = mainStatement ? `DESCRIBE ${cleanQuery(mainStatement)}` : null;
let column_types = null;
try {
if (column_query) {
const colReader = await conn.runAndReadAll(column_query);
await colReader.readAll();
const describeRows = colReader.getRowObjectsJS();
column_types = duckdbDescribeToEvidenceType(describeRows);
}
} catch (e) {
column_types = null;
}

// Create an async generator that yields batches using the DuckDBResultReader
// Stream only the main statement (last statement). If there is no main
// statement, create an empty generator.
let reader = null;
if (mainStatement) {
reader = await conn.streamAndRead(mainStatement);
}

const rowsAsyncIterable = (async function* () {
try {
let prev = 0;
if (reader) {
// keep reading until done
while (!reader.done) {
const target = prev + batchSize;
await reader.readUntil(target);
const allRows = reader.getRowObjectsJS();
const newRows = allRows.slice(prev).map(standardizeRow);
// yield rows one-by-one (asyncIterableToBatchedAsyncGenerator expects single-row yields)
for (const r of newRows) {
yield r;
}
prev = reader.currentRowCount;
}
}
} finally {
// disconnect the connection and close the instance synchronously
try {
conn.disconnectSync();
} catch (err) {}
try {
db.closeSync();
} catch (err) {}
}
})();

const results = await asyncIterableToBatchedAsyncGenerator(rowsAsyncIterable, batchSize, {
mapResultsToEvidenceColumnTypes: column_types == null ? mapResultsToEvidenceColumnTypes : undefined,
standardizeRow,
closeConnection: () => db.close()
closeConnection: () => {
try {
conn.disconnectSync();
} catch (err) {}
try {
db.closeSync();
} catch (err) {}
}
});
if (column_types != null) {
results.columnTypes = column_types;
Expand Down
2 changes: 1 addition & 1 deletion packages/datasources/duckdb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
"dependencies": {
"@evidence-dev/db-commons": "workspace:*",
"duckdb-async": "1.1.3"
"@duckdb/node-api": "^1.4.1-r.5"
},
"devDependencies": {
"dotenv": "^16.0.1"
Expand Down
159 changes: 159 additions & 0 deletions packages/datasources/duckdb/test/test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import { test } from 'uvu';
import * as assert from 'uvu/assert';
import path from 'path';
import { fileURLToPath } from 'url';
import runQuery from '../index.cjs';
import { batchedAsyncGeneratorToArray, TypeFidelity } from '@evidence-dev/db-commons';
import fs from 'fs/promises';
import 'dotenv/config';

test('basic select from needful_things.duckdb', async () => {
// Resolve the database file path relative to the repository root (from this test file)
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const dbPath = path.join(__dirname, '..', '..', '..', '..', 'needful_things.duckdb');
// Select the rows from sqlite_master; this test database contains 7 entries
const query = 'SELECT name FROM sqlite_master';
const { rows: rowGen, expectedRowCount } = await runQuery(query, { filename: dbPath });
const rows = await batchedAsyncGeneratorToArray(rowGen);
assert.instance(rows, Array);
assert.type(rows[0], 'object');
// Expect exactly 7 rows in this test database
assert.equal(rows.length, 7);
});

// Types to test
// BOOLEAN
// TINYINT
Expand Down Expand Up @@ -181,6 +199,20 @@ test('query runs', async () => {
}
});

test('expectedRowCount present for UNION with NULL-first-row', async () => {
// Simple reproduction: first row has NULL, second row has a value
const query = `select NULL as a UNION ALL select 1 as a`;
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
// consume rows to ensure the stream runs
const arr = [];
for await (const batch of rows()) {
arr.push(...batch);
}
assert.equal(arr.length, 2);
assert.type(expectedRowCount, 'number');
assert.equal(expectedRowCount, 2);
});

test('query batches results properly', async () => {
try {
const { rows, expectedRowCount } = await runQuery(
Expand All @@ -203,4 +235,131 @@ test('query batches results properly', async () => {
}
});

test('handles leading SET statements before main query', async () => {
try {
const query = `
SET VARIABLE VAR1 = DATE '2023-10-04';
SET VARIABLE VAR2 = '23';
select 1 union all select 2 union all select 3;
`;
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
const arr = [];
for await (const batch of rows()) {
arr.push(batch);
}
// should batch into [2,1]
assert.equal(arr[0].length, 2);
assert.equal(arr[1].length, 1);
assert.equal(expectedRowCount, 3);
} catch (e) {
throw Error(e);
}
});

test('semicolon inside single-quoted string should not split statements', async () => {
try {
const query = "SET V='this;is;a;string'; select 1 union all select 2;";
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
const arr = [];
for await (const batch of rows()) {
arr.push(batch);
}
assert.equal(expectedRowCount, 2);
assert.equal(arr[0].length, 2);
} catch (e) {
throw Error(e);
}
});

test('semicolon inside block comment should not split statements', async () => {
try {
const query = "/* comment; still comment; */ select 1 union all select 2;";
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
const arr = [];
for await (const batch of rows()) {
arr.push(batch);
}
assert.equal(expectedRowCount, 2);
assert.equal(arr[0].length, 2);
} catch (e) {
throw Error(e);
}
});

test('handles leading SET statements before main query', async () => {
try {
const query = `
SET VARIABLE VAR1 = DATE '2023-10-04';
SET VARIABLE VAR2 = '23';
select 1 union all select 2 union all select 3;
`;
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
const arr = [];
for await (const batch of rows()) {
arr.push(batch);
}
// should batch into [2,1]
assert.equal(arr[0].length, 2);
assert.equal(arr[1].length, 1);
assert.equal(expectedRowCount, 3);
} catch (e) {
throw Error(e);
}
});

test('semicolon inside single-quoted string should not split statements', async () => {
try {
const query = "SET V='this;is;a;string'; select 1 union all select 2;";
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
const arr = [];
for await (const batch of rows()) {
arr.push(batch);
}
assert.equal(expectedRowCount, 2);
assert.equal(arr[0].length, 2);
} catch (e) {
throw Error(e);
}
});

test('semicolon inside block comment should not split statements', async () => {
try {
const query = '/* comment; still comment; */ select 1 union all select 2;';
const { rows, expectedRowCount } = await runQuery(query, undefined, 2);
const arr = [];
for await (const batch of rows()) {
arr.push(batch);
}
assert.equal(expectedRowCount, 2);
assert.equal(arr[0].length, 2);
} catch (e) {
throw Error(e);
}
});

test('USE statement before query should apply to metadata queries', async () => {
try {
// Create a temporary on-disk DuckDB file to test that USE affects
// subsequent queries in the same session. We create a schema and table
// in prefix statements, then USE the schema and select from the table.
// Use an in-memory database so we can CREATE schema/table in the test
const dbFile = ':memory:';
const query = `
CREATE SCHEMA s;
CREATE TABLE s.t AS SELECT 1 AS x UNION ALL SELECT 2 AS x;
USE s;
SELECT x FROM t;
`;
const { rows, expectedRowCount } = await runQuery(query, { filename: dbFile }, 2);
const arr = [];
for await (const batch of rows()) {
arr.push(batch);
}
assert.equal(expectedRowCount, 2);
assert.equal(arr[0].length, 2);
} catch (e) {
throw Error(e);
}
});

test.run();
Loading
Loading