Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,28 @@ public class Config extends ConfigBase {
"Txn manager will reject coming txns."})
public static int max_running_txn_num_per_db = 10000;

@ConfField(mutable = true, masterOnly = true, description = {
"是否将事务的 edit log 写入移到写锁之外以减少锁竞争。"
+ "开启后,edit log 条目在写锁内入队(FIFO 保证顺序),"
+ "在写锁外等待持久化完成,从而降低写锁持有时间,提高并发事务吞吐量。"
+ "默认开启。关闭后使用传统的锁内同步写入模式。",
"Whether to move transaction edit log writes outside the write lock to reduce lock contention. "
+ "When enabled, edit log entries are enqueued inside the write lock (FIFO preserves ordering) "
+ "and awaited outside the lock, reducing write lock hold time "
+ "and improving concurrent transaction throughput. "
+ "Default is true. Set to false to use the traditional in-lock synchronous write mode."})
public static boolean enable_txn_log_outside_lock = true;

@ConfField(mutable = true, description = {
"是否启用按事务级别并行发布。开启后,同一数据库内的不同事务可以在不同的执行器线程上并行完成发布,"
+ "而不是按数据库顺序执行。关闭后回退到按数据库路由(旧行为),同一数据库内的事务顺序发布。",
"Whether to enable per-transaction parallel publish. When enabled, different transactions "
+ "in the same database can finish publishing in parallel across executor threads, "
+ "instead of being serialized per database. "
+ "When disabled, falls back to per-database routing (old behavior) "
+ "where transactions within a DB are published sequentially."})
public static boolean enable_per_txn_publish = true;

@ConfField(masterOnly = true, description = {"pending load task 执行线程数。这个配置可以限制当前等待的导入作业数。"
+ "并且应小于 `max_running_txn_num_per_db`。",
"The pending load task executor pool size. "
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -593,6 +594,11 @@ private static void fuzzyConfigs() {
LOG.info("fuzzy set random_add_cluster_keys_for_mow={}", Config.random_add_cluster_keys_for_mow);
}

Config.enable_txn_log_outside_lock = new Random().nextBoolean();
LOG.info("fuzzy set enable_txn_log_outside_lock={}", Config.enable_txn_log_outside_lock);
Config.enable_batch_editlog = new Random().nextBoolean();
LOG.info("fuzzy set enable_batch_editlog={}", Config.enable_batch_editlog);

setFuzzyForCatalog();
}

Expand Down
68 changes: 65 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,14 @@
public class EditLog {
public static final Logger LOG = LogManager.getLogger(EditLog.class);

// Helper class to hold log edit requests
private static class EditLogItem {
// Helper class to hold log edit requests.
// Public so that callers can enqueue inside a lock and await outside it.
public static class EditLogItem {
static AtomicLong nextUid = new AtomicLong(0);
final short op;
final Writable writable;
final Object lock = new Object();
boolean finished = false;
volatile boolean finished = false;
long logId = -1;
long uid = -1;

Expand All @@ -139,6 +140,24 @@ private static class EditLogItem {
this.writable = writable;
uid = nextUid.getAndIncrement();
}

/**
* Wait for this edit log entry to be flushed to persistent storage.
* Returns the assigned log ID.
*/
public long await() {
synchronized (lock) {
while (!finished) {
try {
lock.wait();
} catch (InterruptedException e) {
LOG.error("Fatal Error : write stream Exception");
System.exit(-1);
}
}
}
return logId;
}
}

private final BlockingQueue<EditLogItem> logEditQueue = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -1534,6 +1553,49 @@ public long logEditWithQueue(short op, Writable writable) {
return req.logId;
}

/**
* Submit an edit log entry to the batch queue without waiting for it to be flushed.
* The entry is enqueued in FIFO order, so calling this inside a write lock guarantees
* that edit log entries are ordered by lock acquisition order.
*
* <p>The caller MUST call {@link EditLogItem#await()} after releasing the lock to ensure
* the entry is persisted before proceeding.
*
* <p>If batch edit log is disabled, this falls back to a synchronous direct write
* and the returned item is already completed.
*
* @return an {@link EditLogItem} handle to await completion
*/
public EditLogItem submitEdit(short op, Writable writable) {
if (this.getNumEditStreams() == 0) {
LOG.error("Fatal Error : no editLog stream", new Exception());
throw new Error("Fatal Error : no editLog stream");
}

EditLogItem req = new EditLogItem(op, writable);
if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) {
while (true) {
try {
logEditQueue.put(req);
break;
} catch (InterruptedException e) {
LOG.warn("Interrupted during put, will sleep and retry.");
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
LOG.warn("interrupted during sleep, will retry.", ex);
}
}
}
} else {
// Non-batch mode: write directly (synchronous)
long logId = logEditDirectly(op, writable);
req.logId = logId;
req.finished = true;
}
return req;
}

private synchronized long logEditDirectly(short op, Writable writable) {
long logId = -1;
try {
Expand Down
Loading