Skip to content

Commit c25b702

Browse files
committed
refactor: stub coordinator CLI handlers
1 parent 37fc7af commit c25b702

File tree

8 files changed

+178
-56
lines changed

8 files changed

+178
-56
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,20 @@ arrow-data = { version = "54.2.1" }
8080
arrow-array = { version = "54.2.1" }
8181
arrow-json = { version = "54.2.1" }
8282
async-trait = "0.1.89"
83+
eyre = "0.6.12"
84+
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
8385
parquet = { version = "54.2.1" }
8486
pyo3 = { version = "0.23", features = [
8587
"eyre",
8688
"abi3-py37",
8789
"multiple-pymethods",
8890
] }
8991
pythonize = "0.23"
90-
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
92+
serde = "1.0.228"
93+
serde_json = "1.0.145"
9194
serde_yaml = "0.9.33"
95+
tokio = { version = "1.49.0", features = ["full"] }
9296
zenoh = "1.1.1"
93-
serde_json = "1.0.145"
9497

9598
[package]
9699
name = "dora-examples"
Lines changed: 115 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,128 @@
1-
use std::sync::Arc;
1+
use std::{collections::BTreeSet, time::Duration};
22

33
use async_trait::async_trait;
4-
use dora_core::uhlc::HLC;
54
use dora_message::{
65
BuildId,
7-
cli_to_coordinator::{BuildReq, CliToCoordinator},
6+
cli_to_coordinator::{
7+
BuildReq, CliAndDefaultDaemonIps, CliToCoordinator, StartReq, WaitForBuildResp,
8+
},
9+
common::DaemonId,
10+
coordinator_to_cli::{DataflowListEntry, NodeInfo},
11+
id::{NodeId, OperatorId},
812
};
13+
use eyre::Result;
14+
use uuid::Uuid;
915

10-
use crate::build_dataflow;
16+
pub struct CliRequestHandler;
1117

1218
#[async_trait]
1319
impl CliToCoordinator for CliRequestHandler {
14-
async fn build(self, req: BuildReq) -> eyre::Result<BuildId> {
15-
let BuildReq {
16-
session_id,
17-
dataflow,
18-
git_sources,
19-
prev_git_sources,
20-
local_working_dir,
21-
uv,
22-
} = req;
20+
async fn build(self, req: BuildReq) -> Result<BuildId> {
21+
// let BuildReq {
22+
// session_id,
23+
// dataflow,
24+
// git_sources,
25+
// prev_git_sources,
26+
// local_working_dir,
27+
// uv,
28+
// } = req;
2329

24-
// assign a random build id
25-
let build_id = BuildId::generate();
30+
// // assign a random build id
31+
// let build_id = BuildId::generate();
2632

27-
let result = build_dataflow(
28-
build_id,
29-
session_id,
30-
dataflow,
31-
git_sources,
32-
prev_git_sources,
33-
local_working_dir,
34-
&self.clock,
35-
uv,
36-
&mut daemon_connections,
37-
)
38-
.await;
39-
match result {
40-
Ok(build) => {
41-
running_builds.insert(build_id, build);
42-
Ok(build_id)
43-
}
44-
Err(err) => {
45-
let _ = reply_sender.send(Err(err));
46-
}
47-
}
33+
// let result = build_dataflow(
34+
// build_id,
35+
// session_id,
36+
// dataflow,
37+
// git_sources,
38+
// prev_git_sources,
39+
// local_working_dir,
40+
// &self.clock,
41+
// uv,
42+
// &mut daemon_connections,
43+
// )
44+
// .await;
45+
// match result {
46+
// Ok(build) => {
47+
// running_builds.insert(build_id, build);
48+
// Ok(build_id)
49+
// }
50+
// Err(err) => {
51+
// let _ = reply_sender.send(Err(err));
52+
// }
53+
// }
54+
todo!()
55+
}
56+
async fn wait_for_build(self, build_id: BuildId) -> Result<WaitForBuildResp> {
57+
todo!()
58+
}
59+
async fn start(self, req: StartReq) -> Result<Uuid> {
60+
todo!()
61+
}
62+
async fn wait_for_spawn(self, dataflow_id: Uuid) -> Result<Uuid> {
63+
todo!()
64+
}
65+
async fn reload(
66+
self,
67+
dataflow_id: Uuid,
68+
node_id: NodeId,
69+
operator_id: Option<OperatorId>,
70+
) -> Result<Uuid> {
71+
todo!()
72+
}
73+
async fn check(self, dataflow_uuid: Uuid) -> Result<Uuid> {
74+
todo!()
75+
}
76+
async fn stop(
77+
self,
78+
dataflow_uuid: Uuid,
79+
grace_duration: Option<Duration>,
80+
force: bool,
81+
) -> Result<()> {
82+
todo!()
83+
}
84+
async fn stop_by_name(
85+
self,
86+
name: String,
87+
grace_duration: Option<Duration>,
88+
force: bool,
89+
) -> Result<()> {
90+
todo!()
91+
}
92+
async fn logs(
93+
self,
94+
uuid: Option<Uuid>,
95+
name: Option<String>,
96+
node: String,
97+
tail: Option<usize>,
98+
) -> Result<Vec<u8>> {
99+
todo!()
100+
}
101+
async fn destroy(self) -> Result<()> {
102+
todo!()
103+
}
104+
async fn list(self) -> Result<Vec<DataflowListEntry>> {
105+
todo!()
106+
}
107+
async fn info(self, dataflow_uuid: Uuid) -> Result<()> {
108+
todo!()
109+
}
110+
async fn daemon_connected(self) -> Result<bool> {
111+
todo!()
112+
}
113+
async fn connected_machines(self) -> Result<BTreeSet<DaemonId>> {
114+
todo!()
115+
}
116+
async fn log_subscribe(self, dataflow_id: Uuid, level: log::LevelFilter) -> Result<()> {
117+
todo!()
118+
}
119+
async fn build_log_subscribe(self, build_id: BuildId, level: log::LevelFilter) -> Result<()> {
120+
todo!()
121+
}
122+
async fn cli_and_default_daemon_on_same_machine(self) -> Result<CliAndDefaultDaemonIps> {
123+
todo!()
124+
}
125+
async fn get_node_info(self) -> Result<Vec<NodeInfo>> {
126+
todo!()
48127
}
49128
}

libraries/dora-schema-macro/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,8 @@ quote = "1"
1515
serde = { version = "1.0", features = ["derive"] }
1616
serde_json.workspace = true
1717
syn = { version = "2", features = ["full", "parsing"] }
18+
19+
[dev-dependencies]
20+
async-trait = { workspace = true }
21+
eyre = { workspace = true }
22+
tokio = { workspace = true }

libraries/dora-schema-macro/src/lib.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,22 @@ use crate::syntax::SchemaInput;
2424
/// This will generate:
2525
/// - Protocol enums `MyProtocolRequest` and `MyProtocolResponse`
2626
/// - A client struct `MyProtocolClient` with methods `foo` and `bar`
27-
/// - A server trait `MyProtocol` with handler methods for `foo` and `bar
27+
/// - A server trait `MyProtocol` with handler methods for `foo` and `bar`
28+
///
29+
/// For example, the above trait will generate the following protocol enums:
30+
///
31+
/// ```ignore
32+
/// pub enum MyProtocolRequest {
33+
/// Foo{ foo: FooRequest },
34+
/// Bar{ bar: BarRequest },
35+
/// }
36+
///
37+
/// pub enum MyProtocolResponse {
38+
/// Foo(FooResponse),
39+
/// Bar(BarResponse),
40+
/// }
41+
/// ```
42+
///
2843
#[proc_macro_attribute]
2944
pub fn dora_schema(_attr: TokenStream, input: TokenStream) -> TokenStream {
3045
let schema = syn::parse_macro_input!(input as SchemaInput);

libraries/dora-schema-macro/src/protocol.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ pub fn generate_protocol(schema: &SchemaInput) -> proc_macro2::TokenStream {
3535

3636
// TODO: better eyre conversion
3737
quote! {
38-
#[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize)]
38+
#[derive(::std::fmt::Debug, ::std::clone::Clone, ::serde::Serialize, ::serde::Deserialize)]
3939
pub enum #request_enum {
4040
#(#request_variants,)*
4141
}
4242

43-
#[derive(Debug, Clone, ::serde::Serialize, ::serde::Deserialize)]
43+
#[derive(::std::fmt::Debug, ::std::clone::Clone, ::serde::Serialize, ::serde::Deserialize)]
4444
pub enum #response_enum {
4545
#(#response_variants,)*
4646
Error(::std::string::String),

libraries/dora-schema-macro/src/server.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use proc_macro2::Ident;
2-
use quote::{format_ident, quote};
1+
use quote::quote;
32

43
use crate::{
54
SchemaInput,
@@ -46,26 +45,22 @@ pub fn generate_server_trait(schema: &SchemaInput) -> proc_macro2::TokenStream {
4645
.collect::<Vec<_>>();
4746

4847
let handle_function = quote! {
49-
async fn handle<T>(self, request: #request_enum) -> ::eyre::Result<()>
48+
/// Handle a single request, returning a response.
49+
///
50+
/// Do the dispatching by default, override this method to implement custom dispatching logic.
51+
///
52+
async fn handle(self, request: #request_enum) -> ::eyre::Result<#response_enum>
5053
{
51-
use ::communication_layer_request_reply::AsyncTransport;
52-
// TODO: handle transport errors (e.g. serde errors) and return error responses instead of exiting
53-
if let Some(req) = transport.receive().await? {
54-
let resp = match req {
55-
#(#dispatch_arms)*
56-
};
57-
transport.send(&resp).await?;
58-
} else {
59-
::eyre::bail!("Transport closed while waiting for request");
60-
}
61-
Ok(())
54+
Ok(match request {
55+
#(#dispatch_arms)*
56+
})
6257
}
6358
};
6459

6560
quote! {
6661
#[::async_trait::async_trait]
6762
/// Note: the `serve` function will clone the handler for each request, use `Arc<Mutex<State>>` to share state between requests.
68-
pub trait #trait_name: ::std::marker::Sized {
63+
pub trait #trait_name: ::std::marker::Sized + ::std::marker::Send {
6964
#(#trait_methods)*
7065

7166
#handle_function
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#[derive(std::fmt::Debug, std::clone::Clone, serde::Serialize, serde::Deserialize)]
2+
pub struct FooRequest;
3+
#[derive(std::fmt::Debug, std::clone::Clone, serde::Serialize, serde::Deserialize)]
4+
pub struct FooResponse;
5+
6+
#[derive(std::fmt::Debug, std::clone::Clone, serde::Serialize, serde::Deserialize)]
7+
pub struct BarRequest;
8+
#[derive(std::fmt::Debug, std::clone::Clone, serde::Serialize, serde::Deserialize)]
9+
pub struct BarResponse;
10+
11+
#[dora_schema_macro::dora_schema]
12+
pub trait MyProtocol {
13+
#[allow(clippy::disallowed_names)]
14+
fn foo(foo: FooRequest) -> FooResponse;
15+
fn bar(bar: BarRequest) -> BarResponse;
16+
}
17+
18+
#[allow(unused)]
19+
async fn compile_check<H: MyProtocol>(handler: H) {
20+
let request: MyProtocolRequest = MyProtocolRequest::Foo { foo: FooRequest };
21+
let _: MyProtocolResponse = handler.handle(request).await.unwrap();
22+
}

0 commit comments

Comments
 (0)