Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ add_library(dfly_core allocation_tracker.cc bloom.cc compact_object.cc dense_set
dragonfly_core.cc extent_tree.cc huff_coder.cc
interpreter.cc glob_matcher.cc mi_memory_resource.cc qlist.cc sds_utils.cc
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc
tx_queue.cc string_set.cc string_map.cc top_keys.cc detail/bitpacking.cc detail/listpack_wrap.cc
detail/listpack.cc count_min_sketch.cc oah_entry.cc)
tx_queue.cc string_set.cc string_map.cc top_keys.cc
detail/bitpacking.cc detail/listpack_wrap.cc detail/listpack.cc
count_min_sketch.cc oah_entry.cc)

cxx_link(dfly_core base dfly_search_core dfly_page_usage fibers2 jsonpath
absl::flat_hash_map absl::str_format absl::random_random redis_lib
Expand Down
40 changes: 40 additions & 0 deletions src/core/detail/gen_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2026, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <absl/random/random.h>
#include <absl/strings/str_cat.h>

#include <string>

namespace dfly {

inline std::string GetRandomHex(absl::InsecureBitGen& gen, size_t len, size_t len_deviation = 0) {
static_assert(std::is_same<uint64_t, decltype(gen())>::value);
if (len_deviation) {
len += (gen() % len_deviation);
}

std::string res(len, '\0');
size_t indx = 0;

for (size_t i = 0; i < len / 16; ++i) { // 2 chars per byte
absl::numbers_internal::FastHexToBufferZeroPad16(gen(), res.data() + indx);
indx += 16;
}

if (indx < res.size()) {
char buf[32];
absl::numbers_internal::FastHexToBufferZeroPad16(gen(), buf);

for (unsigned j = 0; indx < res.size(); indx++, j++) {
res[indx] = buf[j];
}
}

return res;
}

} // namespace dfly
1 change: 1 addition & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "common/backed_args.h"
#include "facade/connection_ref.h"
#include "facade/facade_stats.h"
#include "facade/facade_types.h"
#include "facade/parsed_command.h"
#include "io/io_buf.h"
Expand Down
1 change: 1 addition & 0 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "base/logging.h"
#include "facade/command_id.h"
#include "facade/error.h"
#include "facade/facade_stats.h"
#include "facade/parsed_command.h"
#include "facade/reply_builder.h"
#include "facade/resp_expr.h"
Expand Down
111 changes: 111 additions & 0 deletions src/facade/facade_stats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2026, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing copyright header. New files should include the standard DragonflyDB copyright header at the top of the file, similar to other files in the codebase.

Copilot uses AI. Check for mistakes.

#include <absl/container/flat_hash_map.h>

#include <atomic>
#include <cstdint>

namespace facade {

struct ConnectionStats {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of this is moved from facade_types.h

size_t read_buf_capacity = 0; // total capacity of input buffers
// Count of pending messages in dispatch queue
uint64_t dispatch_queue_entries = 0;
// Memory used by pending messages in dispatch queue
size_t dispatch_queue_bytes = 0;
// Count of pending parsed commands in the pipeline queue (Data Path)
uint64_t pipeline_queue_entries = 0;
// Memory used by pending parsed commands in the pipeline queue (Data Path)
size_t pipeline_queue_bytes = 0;
// total size of all publish messages (subset of dispatch_queue_bytes)
size_t dispatch_queue_subscriber_bytes = 0;

size_t pipeline_cmd_cache_bytes = 0;

uint64_t io_read_cnt = 0;
size_t io_read_bytes = 0;

uint64_t command_cnt_main = 0;
uint64_t command_cnt_other = 0;
uint64_t pipelined_cmd_cnt = 0;
uint64_t pipelined_cmd_latency = 0; // in microseconds

// in microseconds, time spent waiting for the pipelined commands to start executing
uint64_t pipelined_wait_latency = 0;
uint64_t conn_received_cnt = 0;

uint32_t num_conns_main = 0;
uint32_t num_conns_other = 0;
uint32_t num_blocked_clients = 0;

// number of times the connection yielded due to max_busy_read_usec limit
uint32_t num_read_yields = 0;
uint64_t num_migrations = 0;
uint64_t num_recv_provided_calls = 0;

// Number of times the tls connection was closed by the time we started reading from it.
uint64_t tls_accept_disconnects = 0; // number of TLS socket disconnects during the handshake
//
Comment on lines +51 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
uint64_t tls_accept_disconnects = 0; // number of TLS socket disconnects during the handshake
//
// number of TLS socket disconnects during the handshake
uint64_t tls_accept_disconnects = 0;

uint64_t handshakes_started = 0;
uint64_t handshakes_completed = 0;

// Number of events when the pipeline queue was over the limit and was throttled.
uint64_t pipeline_throttle_count = 0;
uint64_t pipeline_dispatch_calls = 0;
uint64_t pipeline_dispatch_commands = 0;
uint64_t pipeline_dispatch_flush_usec = 0;

uint64_t skip_pipeline_flushing = 0; // number of times we skipped flushing the pipeline

ConnectionStats& operator+=(const ConnectionStats& o);
};

struct ReplyStats {
struct SendStats {
int64_t count = 0;
int64_t total_duration = 0;

SendStats& operator+=(const SendStats& other) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not applicable here but n general, we also have the spaceship operator now :)

static_assert(sizeof(SendStats) == 16u);

count += other.count;
total_duration += other.total_duration;
return *this;
}
};

// Send() operations that are written to sockets
SendStats send_stats;

size_t io_write_cnt = 0;
size_t io_write_bytes = 0;
absl::flat_hash_map<std::string, uint64_t> err_count;
size_t script_error_count = 0;

// This variable can be updated directly from shard threads when they allocate memory for replies.
std::atomic<size_t> squashing_current_reply_size{0};

ReplyStats() = default;
ReplyStats(ReplyStats&& other) noexcept;
ReplyStats& operator+=(const ReplyStats& other);
ReplyStats& operator=(const ReplyStats& other);
};

struct FacadeStats {
ConnectionStats conn_stats;
ReplyStats reply_stats;

FacadeStats& operator+=(const FacadeStats& other) {
conn_stats += other.conn_stats;
reply_stats += other.reply_stats;
return *this;
}
};

inline thread_local FacadeStats* tl_facade_stats = nullptr;

} // namespace facade
101 changes: 0 additions & 101 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@

#pragma once

#include <absl/container/flat_hash_map.h>
#include <absl/types/span.h>

#include <optional>
#include <string>
#include <string_view>
#include <variant>

#include "base/iterator.h"
#include "common/arg_range.h"
#include "common/backed_args.h"
#include "facade/op_status.h"
Expand Down Expand Up @@ -152,101 +148,6 @@ inline std::string_view ArgS(ArgSlice args, size_t i) {
return args[i];
}

struct ConnectionStats {
size_t read_buf_capacity = 0; // total capacity of input buffers
// Count of pending messages in dispatch queue
uint64_t dispatch_queue_entries = 0;
// Memory used by pending messages in dispatch queue
size_t dispatch_queue_bytes = 0;
// Count of pending parsed commands in the pipeline queue (Data Path)
uint64_t pipeline_queue_entries = 0;
// Memory used by pending parsed commands in the pipeline queue (Data Path)
size_t pipeline_queue_bytes = 0;
// total size of all publish messages (subset of dispatch_queue_bytes)
size_t dispatch_queue_subscriber_bytes = 0;

size_t pipeline_cmd_cache_bytes = 0;

uint64_t io_read_cnt = 0;
size_t io_read_bytes = 0;

uint64_t command_cnt_main = 0;
uint64_t command_cnt_other = 0;
uint64_t pipelined_cmd_cnt = 0;
uint64_t pipelined_cmd_latency = 0; // in microseconds

// in microseconds, time spent waiting for the pipelined commands to start executing
uint64_t pipelined_wait_latency = 0;
uint64_t conn_received_cnt = 0;

uint32_t num_conns_main = 0;
uint32_t num_conns_other = 0;
uint32_t num_blocked_clients = 0;

// number of times the connection yielded due to max_busy_read_usec limit
uint32_t num_read_yields = 0;
uint64_t num_migrations = 0;
uint64_t num_recv_provided_calls = 0;

// Number of times the tls connection was closed by the time we started reading from it.
uint64_t tls_accept_disconnects = 0; // number of TLS socket disconnects during the handshake
//
uint64_t handshakes_started = 0;
uint64_t handshakes_completed = 0;

// Number of events when the pipeline queue was over the limit and was throttled.
uint64_t pipeline_throttle_count = 0;
uint64_t pipeline_dispatch_calls = 0;
uint64_t pipeline_dispatch_commands = 0;
uint64_t pipeline_dispatch_flush_usec = 0;

uint64_t skip_pipeline_flushing = 0; // number of times we skipped flushing the pipeline

ConnectionStats& operator+=(const ConnectionStats& o);
};

struct ReplyStats {
struct SendStats {
int64_t count = 0;
int64_t total_duration = 0;

SendStats& operator+=(const SendStats& other) {
static_assert(sizeof(SendStats) == 16u);

count += other.count;
total_duration += other.total_duration;
return *this;
}
};

// Send() operations that are written to sockets
SendStats send_stats;

size_t io_write_cnt = 0;
size_t io_write_bytes = 0;
absl::flat_hash_map<std::string, uint64_t> err_count;
size_t script_error_count = 0;

// This variable can be updated directly from shard threads when they allocate memory for replies.
std::atomic<size_t> squashing_current_reply_size{0};

ReplyStats() = default;
ReplyStats(ReplyStats&& other) noexcept;
ReplyStats& operator+=(const ReplyStats& other);
ReplyStats& operator=(const ReplyStats& other);
};

struct FacadeStats {
ConnectionStats conn_stats;
ReplyStats reply_stats;

FacadeStats& operator+=(const FacadeStats& other) {
conn_stats += other.conn_stats;
reply_stats += other.reply_stats;
return *this;
}
};

struct ErrorReply {
explicit ErrorReply(std::string&& msg, std::string_view kind = {})
: message{std::move(msg)}, kind{kind} {
Expand Down Expand Up @@ -303,8 +204,6 @@ constexpr unsigned long long operator""_KB(unsigned long long x) {
return 1024L * x;
}

inline thread_local FacadeStats* tl_facade_stats = nullptr;

void ResetStats();

using MemoryBytesFlag = strings::MemoryBytesFlag;
Expand Down
1 change: 1 addition & 0 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <optional>
#include <string_view>

#include "facade/facade_stats.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
#include "io/io.h"
Expand Down
1 change: 1 addition & 0 deletions src/facade/resp_validator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <cstdint>
#include <fstream>
#include <iostream>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses cout, but cout doesn't include it. It failed to build after I cleaned up common.h


#include "base/flags.h"
#include "base/init.h"
Expand Down
4 changes: 3 additions & 1 deletion src/facade/tls_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
#include <openssl/ssl.h>
#endif

#include <absl/functional/bind_front.h>

#include <string>

#include "absl/functional/bind_front.h"
#include "base/flags.h"
#include "base/logging.h"
#include "facade/facade_stats.h"
#include "facade/facade_types.h"

ABSL_FLAG(std::string, tls_cert_file, "", "cert file for tls connections");
Expand Down
1 change: 1 addition & 0 deletions src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// See LICENSE for licensing terms.
//

#include <absl/strings/ascii.h>
#include <absl/strings/match.h>

#include <bitset>
Expand Down
1 change: 1 addition & 0 deletions src/server/blocking_controller_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <gmock/gmock.h>

#include "base/logging.h"
#include "facade/facade_stats.h"
#include "server/acl/acl_commands_def.h"
#include "server/command_registry.h"
#include "server/engine_shard_set.h"
Expand Down
4 changes: 3 additions & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

#include "server/cluster/cluster_family.h"

#include <absl/cleanup/cleanup.h>
#include <absl/strings/ascii.h>

#include <memory>
#include <mutex>
#include <string>

#include "absl/cleanup/cleanup.h"
#include "base/flags.h"
#include "base/logging.h"
#include "facade/cmd_arg_parser.h"
Expand Down
1 change: 1 addition & 0 deletions src/server/cluster/cluster_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "absl/time/time.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "core/detail/gen_utils.h"
#include "facade/facade_test.h"
#include "server/test_utils.h"

Expand Down
Loading
Loading