Skip to content

Commit a8ddd1f

Browse files
committed
feat: use postcard encoding
1 parent 2a4168d commit a8ddd1f

File tree

7 files changed

+44
-39
lines changed

7 files changed

+44
-39
lines changed

binaries/cli/src/command/build/distributed.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use communication_layer_request_reply::{
2-
Transport, encoding::JsonEncoding, transport::FramedTransport,
3-
};
1+
use communication_layer_request_reply::{Transport, transport::FramedTransport};
42
use dora_core::descriptor::Descriptor;
53
use dora_message::{
64
BuildId,
7-
cli_to_coordinator::{CliToCoordinatorClient, CliToCoordinatorRequest},
5+
cli_to_coordinator::{
6+
CliToCoordinatorClient, CliToCoordinatorEncoding, CliToCoordinatorRequest,
7+
},
88
common::{GitSource, LogMessage},
99
id::NodeId,
1010
};
@@ -45,7 +45,7 @@ pub fn wait_until_dataflow_built(
4545
let mut log_session = FramedTransport::new(
4646
TcpStream::connect(coordinator_socket).wrap_err("failed to connect to dora coordinator")?,
4747
)
48-
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(JsonEncoding);
48+
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(CliToCoordinatorEncoding);
4949
log_session
5050
.send(&CliToCoordinatorRequest::BuildLogSubscribe {
5151
build_id,

binaries/cli/src/command/logs.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use crate::{
99
output::print_log_message,
1010
};
1111
use clap::Args;
12-
use communication_layer_request_reply::{
13-
Transport, encoding::JsonEncoding, transport::FramedTransport,
14-
};
12+
use communication_layer_request_reply::{Transport, transport::FramedTransport};
1513
use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST};
1614
use dora_message::{
17-
cli_to_coordinator::{CliToCoordinatorClient, CliToCoordinatorRequest},
15+
cli_to_coordinator::{
16+
CliToCoordinatorClient, CliToCoordinatorEncoding, CliToCoordinatorRequest,
17+
},
1818
common::LogMessage,
1919
id::NodeId,
2020
};
@@ -90,7 +90,7 @@ pub fn logs(
9090
let mut log_session = FramedTransport::new(
9191
TcpStream::connect(coordinator_addr).wrap_err("failed to connect to dora coordinator")?,
9292
)
93-
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(JsonEncoding);
93+
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(CliToCoordinatorEncoding);
9494
log_session.send(&CliToCoordinatorRequest::LogSubscribe {
9595
dataflow_id: uuid,
9696
level: log_level,

binaries/cli/src/command/start/attach.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
use communication_layer_request_reply::{
2-
Transport, encoding::JsonEncoding, transport::FramedTransport,
3-
};
1+
use communication_layer_request_reply::{Transport, transport::FramedTransport};
42
use dora_core::descriptor::{CoreNodeKind, Descriptor, DescriptorExt, resolve_path};
53
use dora_message::cli_to_coordinator::{
6-
CheckResp, CliToCoordinatorClient, CliToCoordinatorRequest, DataflowStopped,
4+
CheckResp, CliToCoordinatorClient, CliToCoordinatorEncoding, CliToCoordinatorRequest,
5+
DataflowStopped,
76
};
87
use dora_message::common::LogMessage;
98
use dora_message::id::{NodeId, OperatorId};
@@ -142,7 +141,7 @@ pub fn attach_dataflow(
142141
let mut log_session = FramedTransport::new(
143142
TcpStream::connect(coordinator_socket).wrap_err("failed to connect to dora coordinator")?,
144143
)
145-
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(JsonEncoding);
144+
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(CliToCoordinatorEncoding);
146145
log_session
147146
.send(&CliToCoordinatorRequest::LogSubscribe {
148147
dataflow_id,

binaries/cli/src/command/start/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ use crate::{
99
output::print_log_message,
1010
session::DataflowSession,
1111
};
12-
use communication_layer_request_reply::{
13-
Transport, encoding::JsonEncoding, transport::FramedTransport,
14-
};
12+
use communication_layer_request_reply::{Transport, transport::FramedTransport};
1513
use dora_core::{
1614
descriptor::{Descriptor, DescriptorExt},
1715
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
1816
};
1917
use dora_message::{
20-
cli_to_coordinator::{CliToCoordinatorClient, CliToCoordinatorRequest, StartReq},
18+
cli_to_coordinator::{
19+
CliToCoordinatorClient, CliToCoordinatorEncoding, CliToCoordinatorRequest, StartReq,
20+
},
2121
common::LogMessage,
2222
};
2323
use eyre::Context;
@@ -148,7 +148,7 @@ fn wait_until_dataflow_started(
148148
let mut log_session = FramedTransport::new(
149149
TcpStream::connect(coordinator_addr).wrap_err("failed to connect to dora coordinator")?,
150150
)
151-
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(JsonEncoding);
151+
.with_encoding::<_, CliToCoordinatorRequest, LogMessage>(CliToCoordinatorEncoding);
152152
log_session
153153
.send(&CliToCoordinatorRequest::LogSubscribe {
154154
dataflow_id,

binaries/coordinator/src/control.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
use crate::{
22
Coordinator, handler::CliRequestHandler, log_subscriber::LogSubscriber, send_log_message,
33
};
4-
use communication_layer_request_reply::{
5-
AsyncTransport, encoding::JsonEncoding, transport::FramedTransport,
6-
};
4+
use communication_layer_request_reply::{AsyncTransport, transport::FramedTransport};
75
use dora_message::cli_to_coordinator::{
8-
CliToCoordinator, CliToCoordinatorRequest, CliToCoordinatorResponse,
6+
CliToCoordinator, CliToCoordinatorEncoding, CliToCoordinatorRequest, CliToCoordinatorResponse,
97
};
108
use eyre::{Context, eyre};
119
use futures::{
@@ -84,7 +82,9 @@ pub(crate) async fn listen(
8482
async fn handle_requests(state: Arc<ListenState>, connection: TcpStream) {
8583
let peer_addr = connection.peer_addr().ok();
8684
let mut transport = FramedTransport::new(connection)
87-
.with_encoding::<_, CliToCoordinatorResponse, CliToCoordinatorRequest>(JsonEncoding);
85+
.with_encoding::<_, CliToCoordinatorResponse, CliToCoordinatorRequest>(
86+
CliToCoordinatorEncoding,
87+
);
8888
loop {
8989
let next_request = transport.receive().map(Either::Left);
9090
let coordinator_stopped = state.cancel_token.cancelled().map(Either::Right);
@@ -147,7 +147,7 @@ async fn handle_requests(state: Arc<ListenState>, connection: TcpStream) {
147147

148148
log_subscribers.push(LogSubscriber::new(
149149
level,
150-
transport.into_inner().into_inner(),
150+
transport.into_inner(),
151151
));
152152
let buffered = std::mem::take(buffered_log_messages);
153153
for message in buffered {
Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
1-
use dora_message::coordinator_to_cli::LogMessage;
1+
use communication_layer_request_reply::{
2+
AsyncTransport, EncodedTransport, transport::FramedTransport,
3+
};
4+
use dora_message::{cli_to_coordinator::CliToCoordinatorEncoding, coordinator_to_cli::LogMessage};
25
use eyre::{Context, ContextCompat};
3-
4-
use crate::tcp_utils::tcp_send;
6+
use tokio::net::TcpStream;
57

68
pub struct LogSubscriber {
79
pub level: log::LevelFilter,
8-
connection: Option<tokio::net::TcpStream>,
10+
transport: Option<
11+
EncodedTransport<FramedTransport<TcpStream>, CliToCoordinatorEncoding, LogMessage, ()>,
12+
>,
913
}
1014

1115
impl LogSubscriber {
12-
pub fn new(level: log::LevelFilter, connection: tokio::net::TcpStream) -> Self {
16+
pub fn new(level: log::LevelFilter, transport: FramedTransport<TcpStream>) -> Self {
1317
Self {
1418
level,
15-
connection: Some(connection),
19+
transport: Some(transport.with_encoding(CliToCoordinatorEncoding)),
1620
}
1721
}
1822

@@ -26,19 +30,19 @@ impl LogSubscriber {
2630
dora_core::build::LogLevelOrStdout::Stdout => {}
2731
}
2832

29-
let message = serde_json::to_vec(&message)?;
30-
let connection = self.connection.as_mut().context("connection is closed")?;
31-
tcp_send(connection, &message)
33+
let connection = self.transport.as_mut().context("connection is closed")?;
34+
connection
35+
.send(message)
3236
.await
3337
.context("failed to send message")?;
3438
Ok(())
3539
}
3640

3741
pub fn is_closed(&self) -> bool {
38-
self.connection.is_none()
42+
self.transport.is_none()
3943
}
4044

4145
pub fn close(&mut self) {
42-
self.connection = None;
46+
self.transport = None;
4347
}
4448
}

libraries/message/src/cli_to_coordinator.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use crate::{
1515
id::{NodeId, OperatorId},
1616
};
1717

18+
pub use communication_layer_request_reply::encoding::JsonEncoding as CliToCoordinatorEncoding;
19+
1820
#[dora_schema_macro::dora_schema]
1921
pub trait CliToCoordinator {
2022
/// Builds a dataflow by distributing build tasks to appropriate daemons.
@@ -48,10 +50,10 @@ pub trait CliToCoordinator {
4850
/// spawning of nodes across connected daemons and returns a [`UUID`] for tracking
4951
/// the dataflow execution.
5052
///
51-
/// # Response
53+
/// # Response
5254
/// A [`UUID`] that identifies this specific dataflow execution
5355
///
54-
/// # Error Handling
56+
/// # Error Handling
5557
///
5658
/// Returns `ControlRequestReply::Error` if:
5759
/// - Dataflow name is already in use

0 commit comments

Comments
 (0)