Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/witty-bobcats-care.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
'@evidence-dev/duckdb': major
'@evidence-dev/universal-sql': major
'@evidence-dev/db-commons': minor
'@evidence-dev/core-components': minor
---

Update DuckDB to latest packages:

- Switch to @duckdb/node-api from duckdb-async
- Update duckdb-wasm to latest release

This release also has small data fixes across several packages:

- Better handling of NULL values when discovering column types
- Fix batch processing of parquet files
- Fix error with temporary parquet files when reloading data in dev environment
127 changes: 111 additions & 16 deletions packages/datasources/duckdb/index.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ const {
TypeFidelity,
asyncIterableToBatchedAsyncGenerator,
cleanQuery,
exhaustStream
exhaustStream,
splitSQLStatement
} = require('@evidence-dev/db-commons');
const { Database } = require('duckdb-async');
const { DuckDBInstance } = require('@duckdb/node-api');
const path = require('path');
const fs = require('fs/promises');

Expand All @@ -29,6 +30,11 @@ function standardizeRow(obj) {
* @returns {EvidenceType | undefined}
*/
function nativeTypeToEvidenceType(data) {
// Handle null/undefined explicitly: let callers decide (they will mark as INFERRED)
if (data === null || data === undefined) return undefined;
// Some runtimes may yield bigint; normalize higher-level handling elsewhere but
// treat here as a number type.
if (typeof data === 'bigint') return EvidenceType.NUMBER;
switch (typeof data) {
case 'number':
return EvidenceType.NUMBER;
Expand All @@ -40,7 +46,9 @@ function nativeTypeToEvidenceType(data) {
if (data instanceof Date) {
return EvidenceType.DATE;
}
throw new Error(`Unsupported object type: ${data}`);
// For other object types (arrays, plain objects, buffers, etc.) return
// undefined so the caller will mark the column as inferred (STRING).
return undefined;
default:
return EvidenceType.STRING;
}
Expand All @@ -51,6 +59,9 @@ function nativeTypeToEvidenceType(data) {
* @returns {import('@evidence-dev/db-commons').ColumnDefinition[]}
*/
const mapResultsToEvidenceColumnTypes = function (rows) {
// If there are no rows, return an empty column list. Caller may provide DESCRIBE-based types.
if (!rows || rows.length === 0) return [];

return Object.entries(rows[0]).map(([name, value]) => {
/** @type {TypeFidelity} */
let typeFidelity = TypeFidelity.PRECISE;
Expand Down Expand Up @@ -125,11 +136,15 @@ const runQuery = async (queryString, database, batchSize = 100000) => {
} else if (database.directory) {
// Local database stored in source directory
filename = path.join(database.directory, database.filename);
} else {
// filename provided without a directory; use as given (may be absolute or relative)
filename = database.filename;
}
}

const mode = filename !== ':memory:' ? 'READ_ONLY' : 'READ_WRITE';
const db = await Database.create(filename, {
// Create a DuckDB instance and connection using the new node-api
const db = await DuckDBInstance.create(filename, {
access_mode: mode,
custom_user_agent: 'evidence-dev'
});
Expand All @@ -141,27 +156,107 @@ const runQuery = async (queryString, database, batchSize = 100000) => {
const initScript = await fs.readFile(path.resolve(database.directory, 'initialize.sql'), {
encoding: 'utf-8'
});
await conn.exec(initScript);
// run the initialization script
await conn.run(initScript).catch(() => null);
}
}

const stream = conn.stream(queryString);
// Split the incoming SQL into statements and treat all but the last as prefix
// statements which should be executed before metadata queries or the main
// streaming query. This allows SET/USE/CREATE statements to affect the
// session.
const statements = splitSQLStatement(queryString);
const prefixStatements = statements.slice(0, -1);
const mainStatement = statements.length > 0 ? statements[statements.length - 1] : '';

const count_query = `WITH root as (${cleanQuery(queryString)}) SELECT COUNT(*) FROM root`;
const expected_count = await db.all(count_query).catch(() => null);
const expected_row_count = expected_count?.[0]['count_star()'];
const count_query = mainStatement
? `WITH root as (${cleanQuery(mainStatement)}) SELECT COUNT(*) FROM root`
: null;
let expected_row_count = null;
try {
// Execute prefix statements first so they apply to the session
for (const ps of prefixStatements) {
try {
await conn.run(ps).catch(() => null);
} catch (err) {
// ignore errors from prefix statements to mimic existing behavior
}
}

const column_query = `DESCRIBE ${cleanQuery(queryString)}`;
const column_types = await db
.all(column_query)
.then(duckdbDescribeToEvidenceType)
.catch(() => null);
if (count_query) {
const countReader = await conn.runAndReadAll(count_query);
await countReader.readAll();
const countRow = countReader.getRowObjectsJS()?.[0];
if (countRow) {
// take the first value from the row (column name may vary)
expected_row_count = Object.values(countRow)[0];
}
}
} catch (e) {
expected_row_count = null;
}

const results = await asyncIterableToBatchedAsyncGenerator(stream, batchSize, {
const column_query = mainStatement ? `DESCRIBE ${cleanQuery(mainStatement)}` : null;
let column_types = null;
try {
if (column_query) {
const colReader = await conn.runAndReadAll(column_query);
await colReader.readAll();
const describeRows = colReader.getRowObjectsJS();
column_types = duckdbDescribeToEvidenceType(describeRows);
}
} catch (e) {
column_types = null;
}

// Create an async generator that yields batches using the DuckDBResultReader
// Stream only the main statement (last statement). If there is no main
// statement, create an empty generator.
let reader = null;
if (mainStatement) {
reader = await conn.streamAndRead(mainStatement);
}

const rowsAsyncIterable = (async function* () {
try {
let prev = 0;
if (reader) {
// keep reading until done
while (!reader.done) {
const target = prev + batchSize;
await reader.readUntil(target);
const allRows = reader.getRowObjectsJS();
const newRows = allRows.slice(prev).map(standardizeRow);
// yield rows one-by-one (asyncIterableToBatchedAsyncGenerator expects single-row yields)
for (const r of newRows) {
yield r;
}
prev = reader.currentRowCount;
}
}
} finally {
// disconnect the connection and close the instance synchronously
try {
conn.disconnectSync();
} catch (err) {}
try {
db.closeSync();
} catch (err) {}
}
})();

const results = await asyncIterableToBatchedAsyncGenerator(rowsAsyncIterable, batchSize, {
mapResultsToEvidenceColumnTypes:
column_types == null ? mapResultsToEvidenceColumnTypes : undefined,
standardizeRow,
closeConnection: () => db.close()
closeConnection: () => {
try {
conn.disconnectSync();
} catch (err) {}
try {
db.closeSync();
} catch (err) {}
}
});
if (column_types != null) {
results.columnTypes = column_types;
Expand Down
2 changes: 1 addition & 1 deletion packages/datasources/duckdb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
"dependencies": {
"@evidence-dev/db-commons": "workspace:*",
"duckdb-async": "1.1.3"
"@duckdb/node-api": "^1.4.2-r.1"
},
"devDependencies": {
"dotenv": "^16.0.1"
Expand Down
Loading