Skip to content

Commit d9f8d4e

Browse files
authored
Merge branch 'main' into fix/allow-foreign-taps
2 parents 5705e26 + 3f17a13 commit d9f8d4e

File tree

4 files changed

+16
-6
lines changed

4 files changed

+16
-6
lines changed

sdk/src/events/fetching.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,20 @@ impl EventFetcher {
6262

6363
/// Start polling for events and sending them over the given channel. Return
6464
/// the JoinHandle of the polling task as well as the channel receiver.
65+
///
66+
/// The optional `inner_type` parameter allows filtering by a specific event
67+
/// type wrapped in `EventWrapper<T>`. When `None`, all `EventWrapper<*>`
68+
/// events are fetched. When `Some(type_tag)`, only events matching
69+
/// `EventWrapper<type_tag>` are returned by the GraphQL query.
70+
///
71+
/// This is useful for scenarios where multiple deployments share the same
72+
/// `primitives_pkg_id` but have different `workflow_pkg_id` values, as it
73+
/// allows precise filtering at the GraphQL level rather than client-side.
6574
pub fn poll_nexus_events(
6675
&self,
6776
from_cursor: Option<String>,
6877
from_checkpoint: Option<u64>,
78+
inner_type: Option<sui::types::TypeTag>,
6979
) -> (
7080
tokio::task::JoinHandle<()>,
7181
tokio::sync::mpsc::Receiver<anyhow::Result<EventPage>>,
@@ -90,7 +100,7 @@ impl EventFetcher {
90100
nexus_objects.primitives_pkg_id,
91101
primitives::Event::EVENT_WRAPPER.module,
92102
primitives::Event::EVENT_WRAPPER.name,
93-
vec![],
103+
inner_type.map(|t| vec![t]).unwrap_or_default(),
94104
);
95105

96106
loop {
@@ -266,7 +276,7 @@ mod tests {
266276

267277
let fetcher = EventFetcher::new(&format!("{}/graphql", &server.url()), Arc::new(objects));
268278

269-
let (_poller, mut receiver) = fetcher.poll_nexus_events(None, None);
279+
let (_poller, mut receiver) = fetcher.poll_nexus_events(None, None, None);
270280

271281
if let Some(Ok(page)) = receiver.recv().await {
272282
assert_eq!(page.next_cursor, "12345".to_string());
@@ -304,7 +314,7 @@ mod tests {
304314
let objects = sui_mocks::mock_nexus_objects();
305315
let fetcher = EventFetcher::new("http://invalid.url", Arc::new(objects));
306316

307-
let (_poller, mut receiver) = fetcher.poll_nexus_events(None, None);
317+
let (_poller, mut receiver) = fetcher.poll_nexus_events(None, None, None);
308318

309319
if let Some(result) = receiver.recv().await {
310320
assert!(result.is_err());

sdk/src/events/parsing.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -925,7 +925,7 @@ mod tests {
925925
objects.clone(),
926926
);
927927

928-
let (_poller, mut receiver) = fetcher.poll_nexus_events(None, None);
928+
let (_poller, mut receiver) = fetcher.poll_nexus_events(None, None, None);
929929
let page = receiver
930930
.recv()
931931
.await

sdk/src/nexus/crypto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ impl CryptoActions {
138138
let fetcher = self.client.event_fetcher();
139139
let timeout = tokio::time::sleep(Duration::from_secs(20));
140140

141-
let (_poller, mut next_page) = fetcher.poll_nexus_events(None, Some(checkpoint));
141+
let (_poller, mut next_page) = fetcher.poll_nexus_events(None, Some(checkpoint), None);
142142

143143
tokio::pin!(timeout);
144144

sdk/src/nexus/workflow.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ impl WorkflowActions {
270270

271271
tokio::spawn(async move {
272272
let (_poller, mut next_page) =
273-
fetcher.poll_nexus_events(None, Some(execution_checkpoint));
273+
fetcher.poll_nexus_events(None, Some(execution_checkpoint), None);
274274

275275
let timeout = tokio::time::sleep(timeout);
276276

0 commit comments

Comments
 (0)