Skip to content

Commit 2f21ab5

Browse files
authored
feat: new version and async appender (#183)
Signed-off-by: tison <wander4096@gmail.com>
1 parent a7823ed commit 2f21ab5

File tree

42 files changed

+928
-54
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+928
-54
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ jobs:
8989
cargo run --features="starter-log" --example json_stdout
9090
cargo run --features="starter-log" --example rolling_file
9191
cargo run --features="starter-log" --example single_file
92+
cargo run --features="starter-log,append-async" --example asynchronous
9293
cargo run --features="starter-log,diagnostic-fastrace,layout-google-cloud-logging" --example google_cloud_logging
9394
cargo run --features="starter-log,append-fastrace,diagnostic-fastrace" --example fastrace
9495

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/Cargo.lock
22
/target
3+
/logs

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ All notable changes to this project will be documented in this file.
6868
### New features
6969

7070
* `PlainTextLayout` is added to support plain text format without any extra dependency.
71+
* `Async` appender is added to support async logging with configurable buffer size and worker threads.
72+
* `Trap` trait and a default `DefaultTrap` is added to support handling internal errors.
7173

7274
## [0.27.0] 2025-08-18
7375

Cargo.toml

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
members = [
1717
"core",
1818
"logforth",
19-
2019
"appenders/*",
2120
"bridges/*",
2221
"diagnostics/*",
@@ -35,22 +34,25 @@ rust-version = "1.85.0"
3534

3635
[workspace.dependencies]
3736
# Workspace dependencies
38-
logforth-append-fastrace = { version = "0.1.0", path = "appenders/fastrace" }
39-
logforth-append-file = { version = "0.1.0", path = "appenders/file" }
40-
logforth-append-journald = { version = "0.1.0", path = "appenders/journald" }
41-
logforth-append-opentelemetry = { version = "0.1.0", path = "appenders/opentelemetry" }
42-
logforth-append-syslog = { version = "0.1.0", path = "appenders/syslog" }
43-
logforth-bridge-log = { version = "0.1.0", path = "bridges/log" }
44-
logforth-core = { version = "0.1.0", path = "core" }
45-
logforth-diagnostic-fastrace = { version = "0.1.0", path = "diagnostics/fastrace" }
46-
logforth-layout-google-cloud-logging = { version = "0.1.0", path = "layouts/google-cloud-logging" }
47-
logforth-layout-json = { version = "0.1.0", path = "layouts/json" }
48-
logforth-layout-logfmt = { version = "0.1.0", path = "layouts/logfmt" }
49-
logforth-layout-text = { version = "0.1.0", path = "layouts/text" }
37+
logforth-append-async = { version = "0.2.0", path = "appenders/async" }
38+
logforth-append-fastrace = { version = "0.2.0", path = "appenders/fastrace" }
39+
logforth-append-file = { version = "0.2.0", path = "appenders/file" }
40+
logforth-append-journald = { version = "0.2.0", path = "appenders/journald" }
41+
logforth-append-opentelemetry = { version = "0.2.0", path = "appenders/opentelemetry" }
42+
logforth-append-syslog = { version = "0.2.0", path = "appenders/syslog" }
43+
logforth-bridge-log = { version = "0.2.0", path = "bridges/log" }
44+
logforth-core = { version = "0.2.0", path = "core" }
45+
logforth-diagnostic-fastrace = { version = "0.2.0", path = "diagnostics/fastrace" }
46+
logforth-layout-google-cloud-logging = { version = "0.2.0", path = "layouts/google-cloud-logging" }
47+
logforth-layout-json = { version = "0.2.0", path = "layouts/json" }
48+
logforth-layout-logfmt = { version = "0.2.0", path = "layouts/logfmt" }
49+
logforth-layout-text = { version = "0.2.0", path = "layouts/text" }
5050

5151
# Crates.io dependencies
5252
anyhow = { version = "1.0" }
53+
arc-swap = { version = "1.7.1" }
5354
colored = { version = "3.0" }
55+
crossbeam-channel = { version = "0.5.15" }
5456
fastrace = { version = "0.7" }
5557
fasyslog = { version = "1.0.0" }
5658
insta = { version = "1.43.2" }

appenders/async/Cargo.toml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright 2024 FastLabs Developers
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
[package]
16+
name = "logforth-append-async"
17+
version = "0.2.0"
18+
19+
description = "Asynchronous appender for Logforth."
20+
keywords = ["logging", "log", "async"]
21+
22+
categories.workspace = true
23+
edition.workspace = true
24+
homepage.workspace = true
25+
license.workspace = true
26+
readme.workspace = true
27+
repository.workspace = true
28+
rust-version.workspace = true
29+
30+
[package.metadata.docs.rs]
31+
all-features = true
32+
rustdoc-args = ["--cfg", "docsrs"]
33+
34+
[dependencies]
35+
arc-swap = { workspace = true }
36+
crossbeam-channel = { workspace = true }
37+
logforth-core = { workspace = true }
38+
39+
[lints]
40+
workspace = true

appenders/async/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Async Appender
2+
3+
This appender is a remix of [spdlog-rs's AsyncPoolSink](https://docs.rs/spdlog-rs/*/spdlog/sink/struct.AsyncPoolSink.html), with several modifications to fit this crate's need:
4+
5+
* Instead of a thread pool, it uses a single background thread to drain the log queue.

appenders/async/src/append.rs

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright 2024 FastLabs Developers
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use logforth_core::Append;
18+
use logforth_core::Diagnostic;
19+
use logforth_core::Error;
20+
use logforth_core::Trap;
21+
use logforth_core::kv;
22+
use logforth_core::kv::Visitor;
23+
use logforth_core::record::Record;
24+
use logforth_core::trap::DefaultTrap;
25+
26+
use crate::Overflow;
27+
use crate::Task;
28+
use crate::state::AsyncState;
29+
use crate::worker::Worker;
30+
31+
/// A composable appender, logging and flushing asynchronously.
32+
#[derive(Debug)]
33+
pub struct Async {
34+
appends: Arc<[Box<dyn Append>]>,
35+
overflow: Overflow,
36+
state: AsyncState,
37+
trap: Arc<dyn Trap>,
38+
}
39+
40+
impl Append for Async {
41+
fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
42+
let mut diagnostics = vec![];
43+
44+
let mut collector = DiagnosticCollector(&mut diagnostics);
45+
for d in diags {
46+
d.visit(&mut collector)?;
47+
}
48+
49+
let overflow = self.overflow;
50+
let task = Task::Log {
51+
appends: self.appends.clone(),
52+
record: Box::new(record.to_owned()),
53+
diags: diagnostics,
54+
};
55+
self.state.send_task(task, overflow)
56+
}
57+
58+
fn flush(&self) -> Result<(), Error> {
59+
let overflow = self.overflow;
60+
let task = Task::Flush {
61+
appends: self.appends.clone(),
62+
};
63+
self.state.send_task(task, overflow)
64+
}
65+
66+
fn exit(&self) -> Result<(), Error> {
67+
// https://github.com/SpriteOvO/spdlog-rs/issues/64
68+
//
69+
// If the program is tearing down, this will be the final flush. `crossbeam`
70+
// uses thread-local internally, which is not supported in `atexit` callback.
71+
// This can be bypassed by flushing sinks directly on the current thread, but
72+
// before we do that we have to destroy the thread pool to ensure that any
73+
// pending log tasks are completed.
74+
self.state.destroy();
75+
for append in self.appends.iter() {
76+
if let Err(err) = append.exit() {
77+
self.trap.trap(&err);
78+
}
79+
}
80+
Ok(())
81+
}
82+
}
83+
84+
/// A builder for configuring an async appender.
85+
pub struct AsyncBuilder {
86+
thread_name: String,
87+
appends: Vec<Box<dyn Append>>,
88+
buffered_lines_limit: Option<usize>,
89+
trap: Arc<dyn Trap>,
90+
overflow: Overflow,
91+
}
92+
93+
impl AsyncBuilder {
94+
/// Create a new async appender builder.
95+
pub fn new(thread_name: impl Into<String>) -> AsyncBuilder {
96+
AsyncBuilder {
97+
thread_name: thread_name.into(),
98+
appends: vec![],
99+
buffered_lines_limit: None,
100+
trap: Arc::new(DefaultTrap::default()),
101+
overflow: Overflow::Block,
102+
}
103+
}
104+
105+
/// Set the buffer size of pending messages.
106+
pub fn buffered_lines_limit(mut self, buffered_lines_limit: Option<usize>) -> Self {
107+
self.buffered_lines_limit = buffered_lines_limit;
108+
self
109+
}
110+
111+
/// Set the overflow policy to block when the buffer is full.
112+
pub fn overflow_block(mut self) -> Self {
113+
self.overflow = Overflow::Block;
114+
self
115+
}
116+
117+
/// Set the overflow policy to drop incoming messages when the buffer is full.
118+
pub fn overflow_drop_incoming(mut self) -> Self {
119+
self.overflow = Overflow::DropIncoming;
120+
self
121+
}
122+
123+
/// Set the trap for this async appender.
124+
pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
125+
let trap = trap.into();
126+
self.trap = trap.into();
127+
self
128+
}
129+
130+
/// Add an appender to this async appender.
131+
pub fn append(mut self, append: impl Into<Box<dyn Append>>) -> Self {
132+
self.appends.push(append.into());
133+
self
134+
}
135+
136+
/// Build the async appender.
137+
pub fn build(self) -> Async {
138+
let Self {
139+
thread_name,
140+
appends,
141+
buffered_lines_limit,
142+
trap,
143+
overflow,
144+
} = self;
145+
146+
let appends = appends.into_boxed_slice().into();
147+
148+
let (sender, receiver) = match buffered_lines_limit {
149+
Some(limit) => crossbeam_channel::bounded(limit),
150+
None => crossbeam_channel::unbounded(),
151+
};
152+
153+
let worker = Worker::new(receiver, trap.clone());
154+
let thread_handle = std::thread::Builder::new()
155+
.name(thread_name)
156+
.spawn(move || worker.run())
157+
.expect("failed to spawn async appender thread");
158+
let state = AsyncState::new(sender, thread_handle);
159+
160+
Async {
161+
appends,
162+
overflow,
163+
state,
164+
trap,
165+
}
166+
}
167+
}
168+
169+
struct DiagnosticCollector<'a>(&'a mut Vec<(kv::KeyOwned, kv::ValueOwned)>);
170+
171+
impl<'a> Visitor for DiagnosticCollector<'a> {
172+
fn visit(&mut self, key: kv::Key, value: kv::Value) -> Result<(), Error> {
173+
self.0.push((key.to_owned(), value.to_owned()));
174+
Ok(())
175+
}
176+
}

appenders/async/src/lib.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2024 FastLabs Developers
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! A composable appender, logging and flushing asynchronously.
16+
17+
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
18+
19+
use std::sync::Arc;
20+
21+
use logforth_core::Append;
22+
use logforth_core::kv;
23+
use logforth_core::record::RecordOwned;
24+
25+
mod append;
26+
mod state;
27+
mod worker;
28+
29+
pub use self::append::Async;
30+
pub use self::append::AsyncBuilder;
31+
32+
enum Task {
33+
Log {
34+
appends: Arc<[Box<dyn Append>]>,
35+
record: Box<RecordOwned>,
36+
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
37+
},
38+
Flush {
39+
appends: Arc<[Box<dyn Append>]>,
40+
},
41+
}
42+
43+
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
44+
enum Overflow {
45+
/// Blocks until the channel is not full.
46+
Block,
47+
/// Drops the incoming operation.
48+
DropIncoming,
49+
}

0 commit comments

Comments
 (0)