Skip to content

ydb-platform/ydb-materializer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

447 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

License Maven metadata URL Publish

YDB materialized view processor

The YDB Materializer is a Java application that ensures data population for user-managed materialized views in YDB.

Each "materialized view" (MV) technically is a regular YDB table, which is updated using this application. The source information for populating the MV is retrieved from a set of other tables, which are linked together through SQL-style JOINs. To support online synchronization of changes from the source tables into the MV, YDB Change Data Capture streams are used.

The destination tables for MVs, source tables, required indexes and CDC streams should be created prior to using the application in MV synchronization mode. The application may help to generate DDL parts for some objects — for example, it reports missing indexes and can generate the proposed structure of destination tables.

See the Releases page for downloads.

Minimal runnable example

Use these files for a single-table MV example:

  • scripts/example-tables.sql (create tables and changefeed)
  • scripts/example-job1.sql (MV + handler definition)
  • scripts/example-job1.xml (application config)

Requirements and building

  • Java 17 or higher
  • YDB cluster 24.4+ with appropriate permissions
  • Network access to YDB cluster
  • Required system tables created in the database
  • Maven for building from source code

Building:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/openjdk-17.jdk/Contents/Home
mvn clean package -DskipTests=true

Usage

YDB Materializer can be embedded as a library in a user application, or used as a standalone application.

The description of materialized views and their processing jobs must be prepared using a special SQL-like language. The corresponding descriptions can be provided as a text file or as a database table. Connection settings and various technical parameters are provided as a set of properties (programmatically or as a Java Properties configuration file).

In standalone application mode, YDB Materializer implements:

  • validation of materialized view and job definitions, including their compliance with the structure of source database tables, and output of corresponding error and warning messages for user analysis;
  • generation and output of various SQL statements used by YDB Materializer, for further user analysis;
  • service mode, in which it synchronizes the changes from source tables into the materialized view tables.

In embedded library mode, YDB Materializer implements all the listed functions, providing the ability to call them programmatically through methods of the corresponding classes.

Maven dependency for embedding the YDB Materializer into the application:

        <dependency>
            <groupId>tech.ydb.apps</groupId>
            <artifactId>ydb-materializer</artifactId>
            <version>1.15</version>
        </dependency>

Materialized View Language Syntax

The YDB Materializer uses a custom SQL-like language to define materialized views and their processing handlers. This language is based on a subset of SQL with specific extensions for YDB materialized views.

Language Overview

The language supports two main statement types:

  1. Materialized View Definition - Defines the structure and logic of a materialized view
  2. Handler Definition - Defines how to process change streams to update the materialized view

Materialized View Definition

Basic materialized view:

CREATE ASYNC MATERIALIZED VIEW <view_name>
  [DESTINATION <connection_name>]
AS
  SELECT <column_definitions>
  FROM <main_table> AS <alias>
  [<join_clauses>]
  [WHERE <filter_expression>];
  • The SELECT clause defines the list of tables being used as sources for the MV, as well as the relations between the source tables, in the form of a limited SQL query.
  • The optional DESTINATION clause allows to put the MV into a separate database (see the related section below).

Composite materialized view:

CREATE ASYNC MATERIALIZED VIEW <view_name>
  [DESTINATION <connection_name>]
AS
  (SELECT <column_definitions>
   FROM <main_table1> AS <alias>
   [<join_clauses>]
   [WHERE <filter_expression>]) AS <select_alias1>
UNION ALL
  (SELECT <column_definitions>
   FROM <main_table2> AS <alias>
   [<join_clauses>]
   [WHERE <filter_expression>]) AS <select_alias2>
UNION ALL
   ...
   ;

A composite materialized view definition consists of two or more SELECT subqueries, each with the same syntax as a basic materialized view query, combined using the UNION ALL operator. Each subquery must also contain an alias that is unique within the composite materialized view and used to identify the subquery during its processing.

Destination databases

By default, materialized view tables are created and updated in the same YDB database from which the input data is read, and which is configured through the ydb.url connection setting.

Optionally, a materialized view can be associated with a separate destination database by using the DESTINATION clause:

CREATE ASYNC MATERIALIZED VIEW <view_name>
  DESTINATION <connection_name>
AS
  SELECT ...
  • <connection_name> is a logical name of an additional YDB connection.
  • Source tables for the view are still read from the main database.
  • All writes (UPSERT / DELETE) for the materialized view are executed against the destination database.

To use a non-default destination, define a separate connection in the configuration by prefixing standard YDB settings with <connection_name>., for example:

<!-- Main database (source tables) -->
<entry key="ydb.url">grpcs://ydb01.localdomain:2135/cluster1/testdb</entry>

<!-- Separate destination database for MV `pk_test/mv1` -->
<entry key="altdest1.ydb.url">grpcs://ydb01.localdomain:2135/cluster1/testdb_mv</entry>
<entry key="altdest1.ydb.cafile">/path/to/ca.crt</entry>
<entry key="altdest1.ydb.auth.mode">STATIC</entry>
<entry key="altdest1.ydb.auth.username">root</entry>
<entry key="altdest1.ydb.auth.password">your_password</entry>

In the materialized view definition, reference this connection name:

CREATE ASYNC MATERIALIZED VIEW `pk_test/mv1`
  DESTINATION `altdest1`
AS
  SELECT ...

If the DESTINATION clause is omitted, the materialized view is written to the main database connection.

Column Definitions

Each column in the SELECT clause can be:

  • Direct column reference: table_alias.column_name AS column_alias
  • Computed expression: #[<yql_expression>]# AS column_alias
  • Computed expression with column references: COMPUTE ON table_alias.column_name, ... #[<yql_expression>]# AS column_alias

Join Clauses

[INNER | LEFT] JOIN <table_name> AS <alias>
  ON <join_condition> [AND <join_condition>]*

Join conditions support:

  • Column equality: table1.col1 = table2.col2
  • Constant equality: table1.col1 = 'value' or table1.col1 = 123

Filter Expressions

The WHERE clause supports opaque (to the application) YQL expressions that are substituted unchanged directly into the generated queries:

WHERE COMPUTE ON table_alias.column_name, ... #[<yql_expression>]#

The presence of references to specific table and column names allows correct generation of derived SQL statements using opaque expressions that rely on specific columns of source tables.

Handler Definition

CREATE ASYNC HANDLER <handler_name>
  [CONSUMER <consumer_name>]
  PROCESS <materialized_view_name>,
  [PROCESS <materialized_view_name>,]
  INPUT <table_name> CHANGEFEED <changefeed_name> AS [STREAM|BATCH],
  [INPUT <table_name> CHANGEFEED <changefeed_name> AS [STREAM|BATCH], ...];

Handler Components

  • PROCESS: Specifies which materialized views this handler updates
  • INPUT: Defines input tables and their changefeed streams
    • STREAM: Real-time processing of individual changes
    • BATCH: Batch processing of accumulated changes
  • CONSUMER: Optional consumer name for the changefeed

Opaque Expressions

The language supports opaque expressions wrapped in #[ and ]# delimiters. These contain YQL (Yandex Query Language) code that is passed through to the database without parsing:

-- In SELECT clause
SELECT #[Substring(main.c20, 3, 5)]# AS c11,
       #[CAST(NULL AS Int32?)]# AS c12

-- In WHERE clause
WHERE #[main.c6=7 AND (sub2.c7 IS NULL OR sub2.c7='val2'u)]#

-- With COMPUTE ON clause
COMPUTE ON main, sub2 #[main.c6=7 AND (sub2.c7 IS NULL OR sub2.c7='val2'u)]#

Complete Example

-- Define a materialized view
CREATE ASYNC MATERIALIZED VIEW `test1/mv1` AS
  SELECT main.id AS id,
         main.c1 AS c1,
         main.c2 AS c2,
         main.c3 AS c3,
         sub1.c8 AS c8,
         sub2.c9 AS c9,
         sub3.c10 AS c10,
         #[Substring(main.c20, 3, 5)]# AS c11,
         #[CAST(NULL AS Int32?)]# AS c12
  FROM `test1/main_table` AS main
  INNER JOIN `test1/sub_table1` AS sub1
    ON main.c1 = sub1.c1 AND main.c2 = sub1.c2
  LEFT JOIN `test1/sub_table2` AS sub2
    ON main.c3 = sub2.c3 AND 'val1'u = sub2.c4
  INNER JOIN `test1/sub_table3` AS sub3
    ON sub3.c5 = 58
  WHERE #[main.c6=7 AND (sub2.c7 IS NULL OR sub2.c7='val2'u)]#;

-- Define a handler to process changes
CREATE ASYNC HANDLER h1 CONSUMER h1_consumer
  PROCESS `test1/mv1`,
  INPUT `test1/main_table` CHANGEFEED cf1 AS STREAM,
  INPUT `test1/sub_table1` CHANGEFEED cf2 AS STREAM,
  INPUT `test1/sub_table2` CHANGEFEED cf3 AS STREAM,
  INPUT `test1/sub_table3` CHANGEFEED cf4 AS BATCH;

Language Features

  • Case-insensitive keywords: All SQL keywords are case-insensitive
  • Quoted identifiers: Use backticks for identifiers with special characters: `table/name`
  • String literals: Single-quoted strings with optional type suffixes ('value'u for Utf8)
  • Comments: Both -- line comments and /* */ block comments
  • Semicolon termination: Statements must be terminated with semicolons

Supported Data Types

The language works with standard YDB data types:

  • Text: String data (use 'value'u for Utf8 strings)
  • Numeric: Int32, Int64, Decimal, etc.
  • Temporal: Timestamp, Date, etc.
  • Complex: JsonDocument, etc.

Command Line Syntax

java -jar ydb-materializer-*.jar <config.xml> <MODE>

Parameters:

  • <config.xml> - Path to the XML configuration file
  • <MODE> - Operation mode, as listed below

The application supports the following operational modes:

  • CHECK: configuration validation;
  • SQL: generating SQL statements representing the materialization logic;
  • SQL_DEBUG: generating internal SQL statements used by the Materializer;
  • STREAMS: creating missing CDC streams and consumers required by handlers;
  • LOCAL: single-instance execution of actual MV synchronization;
  • JOB: actual MV synchronization under the control of the distributed job manager.

Operation Modes

CHECK Mode

Validates materialized view definitions and reports any issues:

java -jar ydb-materializer-*.jar config.xml CHECK

SQL Mode

Generates and outputs the SQL statements for materialized views:

java -jar ydb-materializer-*.jar config.xml SQL

SQL_DEBUG Mode

Generates and outputs additional internal SQL statements used by the Materializer:

java -jar ydb-materializer-*.jar config.xml SQL_DEBUG

STREAMS Mode

Creates missing CDC streams and consumers in the working database for all configured handler inputs:

java -jar ydb-materializer-*.jar config.xml STREAMS

LOCAL Mode

Starts a local, single-node materialized view processing service:

java -jar ydb-materializer-*.jar config.xml LOCAL

JOB Mode

Starts a distributed materialized view processing service:

java -jar ydb-materializer-*.jar config.xml JOB

Configuration File

The configuration file is an XML properties file that defines connection parameters and job settings. Here's an example configuration:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>YDB Materializer sample configuration</comment>

<!-- *** Connection parameters *** -->
<entry key="ydb.url">grpcs://ydb01.localdomain:2135/cluster1/testdb</entry>
<entry key="ydb.cafile">/path/to/ca.crt</entry>
<entry key="ydb.poolSize">1000</entry>
<entry key="ydb.preferLocalDc">false</entry>

<!-- Authentication mode: NONE, ENV, STATIC, METADATA, SAKEY -->
<entry key="ydb.auth.mode">STATIC</entry>
<entry key="ydb.auth.username">root</entry>
<entry key="ydb.auth.password">your_password</entry>

<!-- Input mode: FILE or TABLE -->
<entry key="job.input.mode">FILE</entry>
<entry key="job.input.file">example-job1.sql</entry>
<entry key="job.input.table">mv/statements</entry>

<!-- Handler configuration -->
<entry key="job.handlers">h1,h2,h3</entry>
<entry key="job.scan.rate">10000</entry>
<entry key="job.scan.table">mv/scans_state</entry>
<entry key="job.coordination.path">mv/coordination</entry>
<entry key="job.coordination.timeout">10</entry>

<!-- Dictionary scanner configuration -->
<entry key="job.dict.consumer">dictionary</entry>
<entry key="job.dict.hist.table">mv/dict_hist</entry>
<entry key="job.dict.scan.seconds">28800</entry>

<!-- Performance tuning -->
<entry key="job.apply.partitioning">HASH</entry>
<entry key="job.cdc.threads">4</entry>
<entry key="job.apply.threads">4</entry>
<entry key="job.apply.queue">10000</entry>
<entry key="job.batch.select">1000</entry>
<entry key="job.batch.upsert">500</entry>
<entry key="job.max.row.changes">100000</entry>
<entry key="job.query.seconds">30</entry>

<!-- Management settings -->
<entry key="mv.jobs.table">mv/jobs</entry>
<entry key="mv.scans.table">mv/job_scans</entry>
<entry key="mv.runners.table">mv/runners</entry>
<entry key="mv.runner.jobs.table">mv/runner_jobs</entry>
<entry key="mv.commands.table">mv/commands</entry>
<entry key="mv.scan.period.ms">5000</entry>
<entry key="mv.report.period.ms">10000</entry>
<entry key="mv.runner.timeout.ms">30000</entry>
<entry key="mv.coord.startup.ms">90000</entry>
<entry key="mv.coord.runners.count">1</entry>

</properties>

Configuration Parameters Reference

Database Connection

  • ydb.url - YDB connection string (required)
  • ydb.cafile - Path to TLS certificate file (optional)
  • ydb.poolSize - Connection pool size (default: 2 × CPU cores)
  • ydb.preferLocalDc - Prefer local data center (default: false)

Authentication

  • ydb.auth.mode - Authentication mode:
    • NONE - No authentication
    • ENV - Environment variables
    • STATIC - Username/password
    • METADATA - VM metadata
    • SAKEY - Service account key file
  • ydb.auth.username - Username (for STATIC mode)
  • ydb.auth.password - Password (for STATIC mode)
  • ydb.auth.sakey - Path to service account key file (for SAKEY mode)

Job Configuration

  • job.input.mode - Input source: FILE or TABLE
  • job.input.file - Path to SQL file (for FILE mode)
  • job.input.table - Table name for statements (for TABLE mode)
  • job.handlers - Comma-separated list of handler names to activate
  • job.scan.table - Scan position control table name
  • job.dict.hist.table - Dictionary history table name
  • job.coordination.path - Coordination service node path
  • job.coordination.timeout - Lock timeout for job coordination in seconds

Dictionary scanner configuration

  • job.dict.consumer - consumer name to be used for dictionary table changefeeds
  • job.dict.hist.table - alternative name for mv/dict_hist table
  • job.dict.scan.seconds - period between the dictionary changes checks

Performance Tuning

  • job.apply.partitioning - HASH (default) or RANGE partitioning of apply tasks
  • job.cdc.threads - Number of CDC reader threads
  • job.apply.threads - Number of apply worker threads
  • job.apply.queue - Max elements in apply queue per thread
  • job.batch.select - Batch size for SELECT operations
  • job.batch.upsert - Batch size for UPSERT or DELETE operations
  • job.max.row.changes - Maximum number of changes per individual table processed in one iteration
  • job.query.seconds — Maximum query execution time for SELECT, UPSERT or DELETE operations, seconds
  • job.scan.rate - Speed limit for scan operations, in rows per second

Management Settings

  • mv.jobs.table - Custom mv/jobs table name
  • mv.scans.table - Custom mv/job_scans table name
  • mv.runners.table - Custom mv/runners table name
  • mv.runner.jobs.table - Custom mv/runner_jobs table name
  • mv.commands.table - Custom mv/commands table name
  • mv.scan.period.ms - Runner and Coordinator re-scan period, in milliseconds
  • mv.report.period.ms - Runner status report period, in milliseconds
  • mv.runner.timeout.ms - Runner and Coordinator missing timeout period, in milliseconds
  • mv.coord.startup.ms - The delay between the Coordinator startup and job distribution activation, milliseconds
  • mv.coord.runners.count - The minimal number of Runners for job distribution

Metrics

  • metrics.enabled - Enable Prometheus metrics endpoint (default: false)
  • metrics.port - Port for Prometheus metrics endpoint (default: 7311)
  • metrics.host - Host/interface to bind metrics endpoint (default: 0.0.0.0)

For a ready-to-use Prometheus + Grafana stack, see monitoring/README.md.

Collected metrics

When metrics are enabled (metrics.enabled=true), the application exposes the following Prometheus metrics. All metric names use the ydbmv_ prefix.

Handler (job) metrics

See the table below for metric definitions.

Metric Type Description
ydbmv_handler_active Gauge 1 if the handler (job) is active, 0 otherwise, for each handler
ydbmv_handler_locked Gauge 1 if the active handler cannot progress due to some processing issue, 0 otherwise
ydbmv_handler_threads Gauge Number of worker threads for the handler
ydbmv_handler_queue_size Gauge Current size of the input queue for the handler
ydbmv_handler_queue_limit Gauge Maximum allowed size of the input queue
ydbmv_handler_queue_wait_millis Histogram Time of waits on the full queue during the message submission

Labels description is provided below.

Label Description
handler Handler name

CDC metrics

See the table below for metric definitions.

Metric Type Description
ydbmv_cdc_records_read Counter Number of CDC records read from topics
ydbmv_cdc_records_submitted Counter Number of parsed CDC records submitted for processing
ydbmv_cdc_parse_errors Counter Number of CDC message parsing errors
ydbmv_cdc_parse_seconds Histogram Time spent parsing CDC messages
ydbmv_cdc_submit_seconds Histogram Time spent submitting CDC messages to the queue, including the time waiting for space in the queue to become available

Labels description is provided below.

Label Description
handler Handler name
consumer Name of the CDC consumer
topic Full CDC topic path

Scan metrics

See the table below for metric definitions.

Metric Type Description
ydbmv_scan_records_submitted Counter Records submitted by initial/backfill scan
ydbmv_scan_delay_millis Counter Total milliseconds of scan delays caused by the rate limiter

Labels description is provided below.

Label Description
handler Handler name
target Name of MV being processed
alias Name of the MV component for UNION ALL style MVs

Processing metrics

See the table below for metric definitions.

Metric Type Description
ydbmv_processing_records Counter Records successfully processed per action
ydbmv_processing_errors Counter Processing errors per action
ydbmv_processing_seconds Histogram End-to-end processing time per action
ydbmv_sql_seconds Histogram SQL execution time per action

Labels description is provided below.

Label Description
type Processing stage (e.g. filter, grabKeys, transform, sync)
handler Handler name
target Name of MV being processed
alias Name of the MV component for UNION ALL style MVs
source Name of the input table for the processing stage
action Action name (select, upsert, delete for SQL times, and all for other metrics)

JVM metrics

When the default Prometheus server is used, standard JVM metrics (memory, GC, threads, etc.) are also registered automatically.

Distributed Job Management (JOB Mode)

The JOB mode provides distributed job management capabilities, allowing to manage materialized view processing tasks across multiple instances. This mode is invoked using:

java -jar ydb-materializer.jar config.xml JOB

Architecture Overview

The distributed job management system consists of two main components:

  1. MvRunner - Executes jobs locally on each instance
  2. MvCoordinator - Manages job distribution and coordination across runners

Each job is a running instance of the "handler" defined in the configuration.

On job startup, the configuration is re-read and re-validated by the application, and used in the particular job.

Control Tables

The system uses several YDB tables to manage distributed operations:

Configuration Tables

mv/jobs - Job definitions and desired state

CREATE TABLE `mv/jobs` (
    job_name Text NOT NULL,           -- Handler name
    job_settings JsonDocument,        -- Handler configuration
    should_run Bool,                  -- Whether job should be running
    PRIMARY KEY(job_name)
);

mv/job_scans - Scan requests for specific targets

CREATE TABLE `mv/job_scans` (
    job_name Text NOT NULL,           -- Handler name
    target_name Text NOT NULL,        -- Target table name
    scan_settings JsonDocument,       -- Scan configuration
    requested_at Timestamp,           -- When scan was requested
    accepted_at Timestamp,            -- When scan was accepted
    runner_id Text,                   -- Assigned runner ID
    command_no Uint64,                -- Command number
    PRIMARY KEY(job_name, target_name)
);

Working Tables

mv/runners - Active runner instances

CREATE TABLE `mv/runners` (
    runner_id Text NOT NULL,          -- Unique runner identifier
    runner_identity Text,             -- Host, PID, start time info
    updated_at Timestamp,             -- Last status update
    PRIMARY KEY(runner_id)
);

mv/runner_jobs - Currently running jobs per runner

CREATE TABLE `mv/runner_jobs` (
    runner_id Text NOT NULL,          -- Runner identifier
    job_name Text NOT NULL,           -- Job name
    job_settings JsonDocument,        -- Job configuration
    started_at Timestamp,             -- When job started
    INDEX ix_job_name GLOBAL SYNC ON (job_name),
    PRIMARY KEY(runner_id, job_name)
);

mv/commands - Command queue for runners

CREATE TABLE `mv/commands` (
    runner_id Text NOT NULL,          -- Target runner
    command_no Uint64 NOT NULL,       -- Command sequence number
    created_at Timestamp,             -- Command creation time
    command_type Text,                -- START/STOP/SCAN/NOSCAN
    job_name Text,                    -- Target job name
    target_name Text,                 -- Target table (for scans)
    job_settings JsonDocument,        -- Job configuration
    command_status Text,              -- CREATED/TAKEN/SUCCESS/ERROR
    command_diag Text,                -- Error diagnostics
    INDEX ix_command_no GLOBAL SYNC ON (command_no),
    INDEX ix_command_status GLOBAL SYNC ON (command_status, runner_id),
    PRIMARY KEY(runner_id, command_no)
);

Job Management Operations

Adding Jobs

To add a new job, insert a record into the mv/jobs table:

INSERT INTO `mv/jobs` (job_name, job_settings, should_run)
VALUES ('my_handler', NULL, true);

The coordinator will automatically detect the new job and assign it to an available runner.

The job_settings can be omitted (so that the default parameters will be used, loaded from global settings) or specified as a JSON document of the following format:

{ # comment indicates the corresponding global setting
    "cdcReaderThreads": 4,                # job.cdc.threads
    "applyThreads": 4,                    # job.apply.threads
    "applyQueueSize": 10000,              # job.apply.queue
    "selectBatchSize": 1000,              # job.batch.select
    "upsertBatchSize": 500,               # job.batch.upsert
    "dictionaryScanSeconds": 28800,       # job.dict.scan.seconds
    "queryTimeoutSeconds": 30             # job.query.seconds
}

The example above shows the defaults for regular jobs. For special dictionary scanner job the following settings can be specified:

{
    "upsertBatchSize": 500,               # job.batch.upsert
    "cdcReaderThreads": 4,                # job.cdc.threads
    "rowsPerSecondLimit": 10000,          # job.scan.rate
    "maxChangeRowsScanned": 100000        # job.max.row.changes
}

Removing Jobs

To stop and remove a job:

UPDATE `mv/jobs` SET should_run = false WHERE job_name = 'my_handler';
-- or
DELETE FROM `mv/jobs` WHERE job_name = 'my_handler';

Requesting Scans

To request a scan of a specific target table:

INSERT INTO `mv/job_scans` (job_name, target_name, scan_settings, requested_at)
VALUES ('my_handler', 'target_table', '{"rowsPerSecondLimit": 5000}', CurrentUtcTimestamp());

Monitoring Operations

Check running jobs:

SELECT rj.runner_id, rj.job_name, rj.started_at, r.runner_identity
FROM `mv/runner_jobs` rj
JOIN `mv/runners` r ON rj.runner_id = r.runner_id;

Check job status:

SELECT j.job_name, j.should_run,
       CASE WHEN rj.job_name IS NOT NULL THEN 'RUNNING'u ELSE 'STOPPED'u END as status
FROM `mv/jobs` j
LEFT JOIN `mv/runner_jobs` rj ON j.job_name = rj.job_name;

Check command queue:

SELECT runner_id, command_no, command_type, job_name, command_status, created_at
FROM `mv/commands`
WHERE command_status IN ('CREATED'u, 'TAKEN'u)
ORDER BY created_at;

Check runner status:

SELECT runner_id, runner_identity, updated_at
FROM `mv/runners`
ORDER BY updated_at DESC;

Command Types

The system supports four types of commands:

  • START - Start a job on a runner
  • STOP - Stop a job on a runner
  • SCAN - Start scanning a specific target table
  • NOSCAN - Stop the already running scan for a specific target table

Job Names

Job name for regular jobs refer to the handler name. There are two special job names:

  • ydbmv$dictionary - Dictionary scanner (manually managed)
  • ydbmv$coordinator - Coordinator job (automatically managed)

These special names cannot be used for regular handlers (in fact handler name cannot start with prefix "ydbmv").

Fault Tolerance

The system provides automatic fault tolerance:

  1. Runner Failure Detection - Runners report status periodically; inactive runners are automatically cleaned up
  2. Job Rebalancing - When runners fail, their jobs are automatically reassigned to available runners
  3. Command Retry - Failed commands remain in the queue for retry
  4. Leader Election - Only one coordinator instance is active at a time

Deployment

  1. Create Control Tables - Use the provided scripts/example-tables.sql script. Table names can be customized as needed
  2. Deploy Runners - Start multiple instances with JOB mode
  3. Configure Jobs - Insert job definitions into mv/jobs table. Add the scan definitions to the mv/job_scans table
  4. Monitor Operations - Use the monitoring queries listed above

The system will automatically distribute jobs across available runners and maintain the desired state.

Performance tuning

In practice, overall performance is primarily determined by the complexity of the queries that define a materialized view, rather than only by the number of worker threads. Expensive joins, large intermediate result sets, complex WHERE conditions (including opaque #[ ... ]# YQL expressions), and multiple UNION ALL branches directly increase the cost of each SELECT / UPSERT / DELETE statement issued by the YDB Materializer.

Handler job parameters below control how aggressively the YDB Materializer consumes changefeeds and applies updates for both STREAM and BATCH style handlers (including scan‑style batch operations and dictionary jobs):

  • job.cdc.threads / cdcReaderThreads

    • Sets the number of parallel threads reading from CDC changefeeds.
    • STREAM mode: more threads increase the rate at which new events are read from YDB, which can improve end‑to‑end latency but also increases pressure on downstream apply workers and the database.
    • BATCH mode: more threads speed up reading and saving the accumulated changes into the intermediate table, but do not directly affect the end-to-end latency (which is mostly affected by job.dict.scan.seconds setting). Please note that the setting defined for the dictionary handler job is used (or the global setting if the specific setting for the dictionary handler is not defined) instead of the setting configured for the regular handler involved.
    • Scan processing: this setting does not affect scans, including the BATCH-induced scans inside the dictionary handler.
  • job.apply.threads / applyThreads

    • Sets the number of worker threads that execute generated SELECT / UPSERT / DELETE statements to update the MV tables. The keys being processed are sent to the particular worker depending on the key value, so the contention on the single key never happens.
    • STREAM mode: more threads allow processing more change batches in parallel, improving throughput; too many threads may cause resource contention and overload the YDB cluster.
    • BATCH mode: same as for the STREAM mode. Using a significant number of threads combined with a large number of rows affected by the dictionary changes may lead to a large spike in the resource usage when the dictionary scan is activated.
    • Scan processing: same as for the STREAM mode. The total time required for scan execution depends on the amount of work (MV size) and scan speed, and the latter is limited both by the job.scan.rate setting and by the performance of the apply process, and the latter depends on the number of threads involved.
  • job.apply.queue / applyQueueSize

    • Maximum number of changes queued for processing (size of the buffer between CDC readers and apply workers, as well as between the apply workers performing the key fetch operation and the apply workers performing the MV refresh). Each handler uses a counter (one per handler) for the total number of queued elements, and uses its value to pause reading extra input data from the CDC topics when the queues become too large. This effectively limits the memory used by the intermediate data inside the instance of the Materializer running the particular handler.
    • STREAM mode: a larger queue smooths short‑term spikes in incoming CDC traffic; if it is too small, CDC readers are throttled more often and end‑to‑end latency increases. If it is too large, the job may accumulate many pending changes in memory, increasing the memory usage and the time needed to drain the backlog.
    • BATCH mode and scans: defines how many prepared batches can wait for execution. Larger queues help keep apply workers busy but also increase memory footprint.
  • job.batch.select / selectBatchSize

    • Limits how many source rows are read in a single SELECT statement when collecting data for a batch of changes.
    • STREAM mode: moderate values help group small reads efficiently while keeping read latency acceptable; increasing this value can reduce SQL overhead per row but may raise latency for individual changes, because larger batches wait longer to be processed.
    • BATCH mode and scans: directly affects the size of read operations during full or partial resynchronization. Larger batches improve throughput (fewer queries, better amortization of network round‑trips) but produce heavier queries for YDB.
  • job.batch.upsert / upsertBatchSize

    • Controls how many rows are written in a single UPSERT or DELETE operation to destination MV tables.
    • STREAM mode: moderate values help group small updates efficiently while keeping write latency acceptable; very small values increase per‑row overhead, very large values may cause long‑running write statements that are more likely to hit timeouts or increase the overall latency of MV updates.
    • BATCH mode and scans: larger batches significantly increase write throughput during bulk refresh operations, but also amplify the impact of each individual statement (locks, resource usage, and potential retries). It is recommended to increase this value gradually and monitor YDB latency, error rates, and resource utilization.
  • job.query.seconds / queryTimeoutSeconds

    • The maximum allowed time for the query to be executed. CLIENT_DEADLINE_EXCEEDED error is generated when the timeout is reached, and the query gets re-started. This parameter is a safeguard against hanging queries which could slow down the overall processing if allowed to run for the arbitrary amount of time. Should be chosen to allow for potentially complex queries to be executed, while still not allowing to consume too much execution time in the worst case, like getting executed on the overloaded YDB node.
  • job.dict.scan.seconds / dictionaryScanSeconds

    • The interval between the checks for dictionary changes potentially affecting the MVs within the particular handler job.
    • STREAM mode and scans: no effect.
    • BATCH mode: larger time between the checks means more rare checks for the changes, which allows those changes to accumulate. Accumulating more changes allows to process those changes in a single scan, instead of running multiple scans for each smaller portion of changes. The related setting job.max.row.changes limits the total amount of the changes allowed to be processed in the single batch, which helps to ensure that too many dictionary updates will not overflow the memory of the current YDB Materializer instance.

When tuning these settings, start from the defaults, observe YDB metrics (latency, throughput, CPU, memory, query timeouts), then adjust one parameter at a time. For most workloads, it is safer to keep batch sizes and thread counts moderate for STREAM handlers (favoring predictable latency), and to use more aggressive values only for planned BATCH or scan operations where higher short‑term load is acceptable.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages