Skip to content

Conversation

@sjfhsjfh
Copy link
Contributor

@sjfhsjfh sjfhsjfh commented Jan 7, 2026

Motivation & Background

  • Unify CLI / coordinator / daemon / node communication with a schema-driven RPC layer instead of hand-written matches and ad hoc message handling.

  • Support both shared memory and stream-based (Read/Write, AsyncRead/AsyncWrite, e.g., TcpStream) transports.

  • Improve extensibility (adding new messages), error handling, and testability.

  • Address the long-standing action item from Redesign: Create a dora-daemon as a communication broker #162: "Use RPC framework instead of custom TCP socket for coordinator <-> daemon <-> node communication," which has remained undone.

  • Client call-sites shrink from hand-written request/parse/match code to a single typed method call. Example:

    • Before:
      let build_id = {
          let reply_raw = session
              .request(&serde_json::to_vec(&ControlRequest::Build { ... })?)?;
          let result: ControlRequestReply = serde_json::from_slice(&reply_raw)?;
          match result {
              ControlRequestReply::DataflowBuildTriggered { build_id } => build_id,
              ControlRequestReply::Error(err) => bail!("{err}"),
              other => bail!("unexpected start dataflow reply: {other:?}"),
          }
      };
    • After:
      let build_id = coordinator_client.build(dora_message::cli_to_coordinator::BuildReq { ... })?;
      let build_id = build_id.build_id;

    The schema-driven client hides serialization, transport, and reply dispatch, reducing boilerplate and error surface.

  • Heads-up on conflicts: This refactor touches RPC schemas/transport and coordinator/daemon event-loop plumbing, so it may conflict with other branches in those areas. I will rebase regularly; if your work overlaps, please flag so we can coordinate.

Design decisions (settled)

The transport layer is intentionally minimal and message-agnostic. We expose two traits, one sync and one async, each defined only by send and receive; hitting EOF returns Ok(None) so callers can stop cleanly. Both traits offer with_encoding to lift a byte transport into a typed one, and a framed variant sits above any shared-memory channel or Read/Write / AsyncRead/AsyncWrite stream (e.g., TcpStream) to provide length-delimited frames without coupling to an encoding.

  • Sync: Transport<Req, Resp> with send(&Req) and receive() -> Option<Resp>
  • Async: AsyncTransport<Req, Resp> with async counterparts of the same methods

Encoding is composed via EncodedTransport<T, Encoding, Req, Resp>, which owns a scratch buffer, performs encode/decode, and delegates I/O to the inner transport. Four encodings are feature-gated today: json and yaml for readability (json is the current default; yaml is heavier and expected to be rare), and two binary codecs—postcard and bincode. Postcard is the preferred lean, allocation-light option going forward; bincode remains for compatibility, but its maintenance status is uncertain.

On top of transport and encoding sits a schema-first macro layer. A simple request–response DSL generates both sync and async typed clients. Dispatch glue is already implemented; the open question is the exact server-side handler trait shape. In async contexts, a self-consuming handler looks attractive to avoid borrowing pitfalls, and we will finalize ergonomics before wiring services to it. The design is influenced by google/tarpc, but tarpc lacks stream responses (see google/tarpc#397); for now we surface raw message enums so clients can parse subscribed events manually. Subscription APIs are few, so this trade-off is acceptable until we design a higher-level streaming abstraction.

Finally, we keep parallel clients for runtime safety: the node stays on the sync client because it is embedded via pyo3 and should not spin an internal Tokio runtime (risking nested-runtime panics in rust nodes), while CLI and other Rust async callers can use the async client against the same schemas and encodings.

Current progress

  • Sync & async transport definitions
  • Framed transport
  • Shared memory + Read/Write / AsyncRead/AsyncWrite support
  • Typed transport with encoding
  • Macros: request–response DSL
  • Macros: typed client generation (sync + async)
  • Macros: server-side handler trait generation + dispatch glue
  • Coordinator refactor (see dedicated section)
  • Daemon refactor (see dedicated section)
  • CLI refactor (async client)
  • Node refactor (sync client)
  • Tests for transport, encoding, client/server interactions, backward compatibility of message schemas, and benchmarking.
  • Documentation for schema authoring and usage examples.

Coordinator & Daemon refactor

Both coordinator and daemon suffer from giant, single-function match dispatch that makes control flow and state hard to reason about. The coordinator threads state through local variables and borrow-checked lifetimes; the daemon does have a state struct, but it is still driven by one large match, so the cognitive load is similar.

Plan for both:

  1. Split responsibilities into multiple files/modules (patterned after the CLI argument split in CLI Rework #979) to make state and ownership boundaries explicit.
  2. Move the handlers onto the generated server-side handler trait once its shape is finalized, eliminating the monolithic matches.

@haixuanTao
Copy link
Collaborator

just want to say that this looks like an extensive PR and although I agree that some of the current architecture is not great, it could be good to not try to build too big refactoring that also does not fix a use-case feature.

I think that we should either do small refactoring when possible, otherwise the risk of introducing bug as we can currently see with the CI is pretty big

@haixuanTao
Copy link
Collaborator

I think indeed tarps could help dora daemon, coordinator, and cli indeed.

@sjfhsjfh
Copy link
Contributor Author

sjfhsjfh commented Jan 8, 2026

I think indeed tarps could help dora daemon, coordinator, and cli indeed.

I agree that tarpc could be beneficial for the dora daemon, coordinator, and CLI. However, as noted in the PR description, tarpc doesn't support streaming responses.

The challenge here is that LogSubscribe (which requires streaming response) and other request-response messages are defined in the same message enum. If we adopt tarpc, we'd need to either:

  • Split the protocol and maintain two communication channels (tarpc + something else for streaming), or Redesign the message structure to separate streaming and non-streaming operations
  • Both approaches would likely break backward compatibility and add complexity. I'm open to suggestions if anyone has ideas on how to handle this gracefully.

@haixuanTao
Copy link
Collaborator

Oh sorry I misread, I somehow understood that tarpc could do streaming, sorry.

But I think that streaming also has it's own issue as if internet break for whatever reason it could be very difficult to recover.

Mivik and others added 2 commits January 12, 2026 16:49
@sjfhsjfh sjfhsjfh marked this pull request as ready for review January 21, 2026 07:22
@sjfhsjfh
Copy link
Contributor Author

This PR focuses on the CLI <=> coordinator RPC/transport refactor (schema-driven RPC, slimmer call sites). The rest (daemon, node, detailed tests & docs) will come in follow-up PRs to keep this one reviewable.

@phil-opp Mind taking a look at this part first? Would like to get the foundational part (macro & Transport) merged before moving on.

@haixuanTao
Copy link
Collaborator

When I use dora start in the past I used to get stdout but now I don't anymore:

(base) ~/D/w/d/e/python-log ❯❯❯ dora run dataflow.yaml --uv                                                                                                                                                                                                                                                                                                                                                              (base) schema-refactor ✭ ◼
2026-01-28T11:11:06.262638Z  INFO dora_core::descriptor::validate: skipping path check for node with build command
2026-01-28T11:11:06.262661Z  INFO dora_core::descriptor::validate: skipping path check for node with build command
2026-01-28T11:11:06.262844Z  INFO zenoh::net::runtime: Using ZID: 22cf93d3284594e7a59d9b701e7169c6
2026-01-28T11:11:06.263514Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::1]:56917
2026-01-28T11:11:06.263520Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::e188:8c59:3f32:54c1]:56917
2026-01-28T11:11:06.263521Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::1481:d67a:78e1:f6c6]:56917
2026-01-28T11:11:06.263523Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::4249:938:51c3:a048]:56917
2026-01-28T11:11:06.263524Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::4469:15ff:feff:849c]:56917
2026-01-28T11:11:06.263525Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::4469:15ff:feff:849c]:56917
2026-01-28T11:11:06.263527Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::be5c:7671:e24:7759]:56917
2026-01-28T11:11:06.263536Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::ce81:b1c:bd2c:69e]:56917
2026-01-28T11:11:06.263538Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::de31:8fae:f93b:473b]:56917
2026-01-28T11:11:06.263539Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/[fe80::ba95:7db6:a651:2c95]:56917
2026-01-28T11:11:06.263540Z  INFO zenoh::net::runtime::orchestrator: Zenoh can be reached at: tcp/172.18.130.122:56917
2026-01-28T11:11:06.263578Z  INFO zenoh::net::runtime::orchestrator: zenohd listening scout messages on 224.0.0.224:7446
12:11:06 DEBUG   receive_data_with_sleep: daemon::spawner  spawning node
12:11:06 DEBUG   send_data: daemon::spawner  spawning node
12:11:06 INFO    receive_data_with_sleep: spawner  spawning: uv run python -u /Users/xaviertao/Documents/work/dora/examples/python-log/receive_data.py
12:11:06 INFO    send_data: spawner  spawning: uv run python -u /Users/xaviertao/Documents/work/dora/examples/python-log/send_data.py
12:11:06 INFO    dora daemon  finished building nodes, spawning...
12:11:06 INFO    receive_data_with_sleep: spawner  spawning `uv` in `/Users/xaviertao/Documents/work/dora/examples/python-log`
12:11:06 DEBUG   receive_data_with_sleep: spawner  spawned node with pid 96574
12:11:06 INFO    send_data: spawner  spawning `uv` in `/Users/xaviertao/Documents/work/dora/examples/python-log`
12:11:06 DEBUG   send_data: spawner  spawned node with pid 96577
12:11:07 INFO    receive_data_with_sleep: daemon  node is ready
12:11:07 INFO    send_data: daemon  node is ready
12:11:07 INFO    daemon  all nodes are ready, starting dataflow
12:11:07 INFO    send_data: opentelemetry  Global meter provider is set. Meters can now be created using global::meter() or global::meter_with_scope().
12:11:07 INFO    receive_data_with_sleep: opentelemetry  Global meter provider is set. Meters can now be created using global::meter() or global::meter_with_scope().
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996304159250]
12:11:07 INFO    receive_data_with_sleep: dora  info [169996267353916]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996315017083]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996324047791]
12:11:07 INFO    receive_data_with_sleep: dora  info [169996334475541]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996344281000]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996354426541]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996364199833]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996375316833]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996384141750]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996394249875]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996405472416]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996414115708]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996425382541]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996434176333]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996444448291]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996454364958]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996464265250]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996474328416]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996485260541]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996495305833]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996504121333]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996514253958]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996525257500]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996534444375]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996545221333]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996554227250]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996565507458]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996574340458]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996585529416]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996594166791]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996604261250]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996614250708]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996625354166]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996634119125]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:07 INFO    receive_data_with_sleep: dora  info [169996644298166]
12:11:07 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996654311000]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996665779708]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996675308291]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996684243875]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996695431375]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996704074500]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996715653208]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996724311416]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996734542875]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996744148416]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996754330500]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996765344166]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996775277625]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996784164833]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996795378375]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996804289416]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996815173708]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996824320458]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996834146250]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996844382000]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996854374083]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996865560375]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996874361375]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996885279833]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996894246666]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996904331458]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996914255166]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996925382416]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996933990041]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996944287458]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996954319416]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996964152500]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996975346000]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996984101291]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169996995355458]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997004096041]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997014346125]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997025256541]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997034095875]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997044360416]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997054171041]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997064258041]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997075235541]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997084104875]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997094349666]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997104262375]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997114211291]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997124180625]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997134206291]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997144260583]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997155382541]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997164039041]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997174360041]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997184237750]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997194288791]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997204223875]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997215300083]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997225173833]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997234148541]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997244209416]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  info [169997254279000]
12:11:08 WARN    receive_data_with_sleep: dora  THIS IS A WARNING
12:11:08 INFO    receive_data_with_sleep: dora  done!
12:11:08 stdout  send_data:  
12:11:08 stdout  send_data:  
12:11:08 DEBUG   send_data: daemon  handling node stop with exit status Success (restart: false)
12:11:08 INFO    send_data: daemon  send_data finished successfully
12:11:08 stdout  receive_data_with_sleep:  
12:11:08 stdout  receive_data_with_sleep:  
12:11:08 DEBUG   receive_data_with_sleep: daemon  handling node stop with exit status Success (restart: false)
12:11:08 INFO    receive_data_with_sleep: daemon  receive_data_with_sleep finished successfully
12:11:08 INFO    daemon  dataflow finished on machine `7ed56d0e-9dc3-43ab-b9fa-0dd845fc22d5`
2026-01-28T11:11:08.636319Z  INFO run_inner: dora_daemon: exiting daemon because all required dataflows are finished self.daemon_id=DaemonId { machine_id: None, uuid: 7ed56d0e-9dc3-43ab-b9fa-0dd845fc22d5 }
2026-01-28T11:11:08.636357Z  INFO run_inner: zenoh::api::session: close session zid=22cf93d3284594e7a59d9b701e7169c6 self.daemon_id=DaemonId { machine_id: None, uuid: 7ed56d0e-9dc3-43ab-b9fa-0dd845fc22d5 }
(base) ~/D/w/d/e/python-log ❯❯❯ dora start dataflow.yaml                        # <---- @sjfhsjfh                                                                                                                                                                                                                                                                                                                                           (base) schema-refactor ✭ ◼
attaching to dataflow (use `--detach` to run in background)
(base) ~/D/w/d/e/python-log ❯❯❯                              

result: Ok(node_config),
} => Self::init(node_config),
} => Self::init(
serde_json::from_str(&node_config).context("failed to deserialize node config")?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is DaemonReply::NodeConfig returning a string?

NextEvents(Vec<Timestamped<NodeEvent>>),
NextDropEvents(Vec<Timestamped<NodeDropEvent>>),
NodeConfig { result: Result<NodeConfig, String> },
NodeConfig { result: Result<String, String> },
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

mod template;

pub use command::build;
#[allow(deprecated)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is already a bit big could we make everything that is related to clippy in a different PR?

.to_string()
.bold()
.color(word_to_color(&node_id.to_string()));
.color(word_to_color(node_id.as_ref()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll expect this to be related to clippy?

Comment on lines +22 to +23
#[cfg(feature = "postcard")]
mod postcard;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this?

#[cfg(feature = "postcard")]
mod postcard;
#[cfg(feature = "yaml")]
mod yaml;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to serialize and deserialize yaml?

impl<T, U> ShmemServer<T, U> {
pub unsafe fn new(memory: Shmem) -> eyre::Result<Self> {
#[allow(clippy::missing_safety_doc)]
impl ShmemChannel {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ShmemChannel impl some specific trait?

@phil-opp
Copy link
Collaborator

Thanks a lot for working on this! I agree that the current implementation is messy and error-prone.

Given the size of this PR and the number of changes I only took a quick look yet. One concern that I have is the big dora-schema-macro. It seems to implement a custom RPC framework with complex parsing and code generation logic. I fear that this will be difficult to maintain and evolve.

Could you clarify why it was necessary to create a custom RPC framework instead of using one of the many existing ones? You mentioned tarpc, but what about e.g. tonic?

@sjfhsjfh
Copy link
Contributor Author

sjfhsjfh commented Feb 1, 2026

@phil-opp thanks! The main reason for a custom (thin) RPC layer is our constraints:

  1. Custom transports: We need shared memory and Read/Write/AsyncRead/AsyncWrite with explicit framing. tonic is HTTP/2-bound.
  2. Sync + async clients: The node embeds via pyo3 without a Tokio runtime; tonic is async-only.
  3. Streaming: tarpc fits many needs but lacks streaming; dual channels would add complexity.
  4. Minimal macro: Just schema + request/response glue to remove handwritten matches; behavior stays close to today.

@phil-opp
Copy link
Collaborator

phil-opp commented Feb 4, 2026

Could you clarify how the custom RPC macro supports streaming? It looks like it also supports only the request/reply pattern. Or am I missing something?


Unfortunately, this PR is not reviewable in the current state. It does too many things at once and is way to big.

You claim that this is just about the CLI<->Coordinator refactor, but this PR also does completely unrelated things such as refactoring the shared memory communication (which is not used by the CLI) or adding support for new serialization formats.

@sjfhsjfh
Copy link
Contributor Author

sjfhsjfh commented Feb 4, 2026

Could you clarify how the custom RPC macro supports streaming? It looks like it also supports only the request/reply pattern. Or am I missing something?

You're right — the current implementation only supports request-reply. In order to keep the scope of the PR manageable, and also because streaming is only used under a certain scenario (log subscription), we made the RPC macro support only request-reply for now. Streaming is handled separately, just as it was before the refactor.

Unfortunately, this PR is not reviewable in the current state. It does too many things at once and is way too big.

...refactoring the shared memory communication... adding support for new serialization formats

I understand the concern. We did consider splitting out the CLI-coordinator protocol as a standalone PR, but the current protocol doesn't follow a clean request-reply pattern — some requests expect no response, some share a response type with other requests, and some can return multiple different response types. This ad-hoc design makes it hard to swap in a structured RPC layer without also refactoring how the coordinator handles these messages.

Nevertheless, we will try to break it down into smaller PRs. We'll start by isolating CLI-coordinator protocol changes.

@phil-opp
Copy link
Collaborator

phil-opp commented Feb 4, 2026

You're right — the current implementation only supports request-reply. In order to keep the scope of the PR manageable, and also because streaming is only used under a certain scenario (log subscription), we made the RPC macro support only request-reply for now. Streaming is handled separately, just as it was before the refactor.

Above you said that streaming support was the main reason for not choosing tarpc:

tarpc fits many needs but lacks streaming; dual channels would add complexity.

Is there any other reason why you still chose to implement a custom macro?

@Mivik
Copy link
Contributor

Mivik commented Feb 5, 2026

streaming support was the main reason for not choosing tarpc:

Is there any other reason why you still chose to implement a custom macro?

Let me rephrase. There are several major concerns regarding using existing RPC frameworks:

  1. Streaming support. tarpc does not provide native streaming support, but later we found it possible to directly use the underlying Transport trait so this is not a blocking point now.
  2. Async + sync support. Both tonic & tarpc rely on async framework and would require large refactoring of the node API.
  3. Backward compatibility. tonic enforces gRPC which would sacrafice backward compatibility.
  4. Overhead. By designing the underlying logic close to the existing dora approach the overhead is under control. tonic uses gRPC, which introduces HTTP layer, which might be of concern.

Given these concerns we leaned to writing a custom proc macro for RPC. However we are fully aware that this now causes difficulties for reviewing. Please let us know your opinion on this.

@phil-opp
Copy link
Collaborator

I took a closer look at tarpc and it looked quite promising, so I went ahead and redesigned the CLI<->coordinator connection to use tarpc in #1345. There is quite some similarity with your PR, for example the service trait looks quite similar. So thanks a lot for the work and inspiration here!

Sorry for creating an alternative implementation, but I think #1345 is the better approach because it:

  • does not need any custom proc macro
  • uses fine-grained locking for coordinator state instead of a big RwLock
  • doesn't touch the node API or the node<->daemon message format at all (apart from initializing a tokio runtime)
  • uses tokio tasks to handle requests concurrently
  • avoids the complexity of making the encoding configurable (we control both sides, so we can just decide for one format at compile time)

Limitation:


I would appreciate your review on #1345 if you have time!

For future changes, I would recommend to discuss the general design with us first before doing the implementation. Your work here is definitely valuable and appreciated, even if it might not be merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants