Skip to content

Conversation

@SwekeR-463
Copy link

@SwekeR-463 SwekeR-463 commented Feb 7, 2026

Description

Fixes #1464

  • Preserve _stage_perf when stages return new task instances.
  • Define explicit name fields for audio stages to populate StagePerfStats stage names.

Snippet

After re running python benchmarking/run.py --config benchmarking/nightly-benchmark.yaml --entries audio_fleurs got this output.

image

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

@copy-pr-bot
Copy link

copy-pr-bot bot commented Feb 7, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 7, 2026

Greptile Overview

Greptile Summary

This PR focuses on making stage performance stats retain meaningful stage names after benchmarking. It does this by (1) propagating _stage_perf onto newly created task instances in several audio stages and (2) assigning explicit name values to legacy audio stages so StagePerfStats can display them.

The change also updates ProcessingStage.process_batch() to correctly handle None results (task filtering) by skipping them, aligning the default batch path with the documented process() contract.

Confidence Score: 3/5

  • This PR is close to mergeable but has a clear runtime crash in LegacySpeechStage perf propagation.
  • Most changes are straightforward propagation of _stage_perf and adding explicit stage names, and process_batch() now correctly skips None. However, LegacySpeechStage.process() now dereferences r._stage_perf without ensuring the attribute exists, which will raise AttributeError for stages that return new AudioBatch instances without _stage_perf.
  • nemo_curator/stages/audio/common.py

Important Files Changed

Filename Overview
nemo_curator/stages/audio/common.py Updates LegacySpeechStage perf propagation and adds explicit name fields to audio stages; contains a new crash when returned entries lack _stage_perf.
nemo_curator/stages/audio/inference/asr_nemo.py Propagates _stage_perf when returning a new AudioBatch from ASR inference; change is localized and consistent with intent.
nemo_curator/stages/audio/io/convert.py Adds explicit stage name and propagates _stage_perf to the produced DocumentBatch; no issues spotted.
nemo_curator/stages/audio/metrics/get_wer.py Adds explicit stage name for WER stage; still returns new AudioBatch without _stage_perf, relying on base propagation logic.
nemo_curator/stages/base.py Fixes process_batch() to correctly handle None results by skipping them; aligns behavior with process() contract.

Sequence Diagram

sequenceDiagram
    participant Exec as Executor
    participant Stage as ProcessingStage
    participant Legacy as LegacySpeechStage
    participant Task as Task/AudioBatch
    participant Out as Output Task

    Exec->>Stage: process_batch([task])
    Stage->>Stage: validate_input(task)
    Stage->>Stage: process(task)
    alt process() returns None (filtered)
        Stage-->>Exec: skip (continue)
    else process() returns list
        Stage-->>Exec: extend results
    else process() returns single task
        Stage-->>Exec: append result
    end

    Exec->>Legacy: process(AudioBatch)
    loop each entry in task.data
        Legacy->>Legacy: process_dataset_entry(entry)
        Legacy->>Out: propagate _stage_perf if missing/empty
    end
    Legacy-->>Exec: list[Task]
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines 182 to 188
result = self.process(task)
if isinstance(result, list):
for r in result:
if r is not task and hasattr(r, "_stage_perf") and not r._stage_perf:
r._stage_perf = list(task._stage_perf)
results.extend(result)
else:
if result is not task and hasattr(result, "_stage_perf") and not result._stage_perf:
result._stage_perf = list(task._stage_perf)
results.append(result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None result can crash

ProcessingStage.process() is documented to allow None for filtering, but process_batch() treats any non-list result as a task-like object and then does hasattr(result, "_stage_perf") / results.append(result). With this change, if process() returns None, the else branch will raise (at result is not task / hasattr(...)) and/or append None into results. This is a functional regression in the default batch path for any stage that filters tasks by returning None.

Fix by explicitly handling result is None before the list/non-list logic (skip or continue).

@SwekeR-463 SwekeR-463 force-pushed the fix/stage-name-propagate branch from 2a05acd to f6a5132 Compare February 7, 2026 05:51
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@praateekmahajan
Copy link
Contributor

Hey @SwekeR-463 thanks for the PR, since few of the classes where you added name extend from LegacySpeechStage I wonder if you can just make the change in the base class.

Can you try making the same changes as https://github.com/NVIDIA-NeMo/Curator/pull/1433/changes and seeing if it works

Also I'm not sure if the changes in stages/base.py are needed, can you make the changes as shown in #1433 and try running the pipeline without the changes in stages/base.py I'm hoping it works

@SwekeR-463
Copy link
Author

SwekeR-463 commented Feb 8, 2026

Can you try making the same changes as https://github.com/NVIDIA-NeMo/Curator/pull/1433/changes and seeing if it works

Also I'm not sure if the changes in stages/base.py are needed, can you make the changes as shown in #1433 and try running the pipeline without the changes in stages/base.py I'm hoping it works

No it didnt worked after making the changes you said for stages/base.py.

image

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines 32 to 34
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = self.__class__.__name__

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataclass init breakage
LegacySpeechStage is a base for @dataclass stages in this file (e.g., GetAudioDurationStage). With the new LegacySpeechStage.__init__(...), the dataclass-generated __init__ will pass its field names into super().__init__(...), which then forwards them to ProcessingStage. Since ProcessingStage doesn’t accept those extra keywords, instantiation will raise TypeError for unexpected keyword arguments.

@SwekeR-463
Copy link
Author

Can you try making the same changes as https://github.com/NVIDIA-NeMo/Curator/pull/1433/changes and seeing if it works

Worked with the having the same changes from #1433 and keeping my changes for stages/base.py.

Screenshot from 2026-02-08 08-42-23

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines 32 to 34
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = self.__class__.__name__
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataclass init breaks

LegacySpeechStage is a base for @dataclass stages (e.g., GetAudioDurationStage). With the new LegacySpeechStage.__init__(self, **kwargs) calling super().__init__(**kwargs), the dataclass-generated __init__ will pass its field args into super().__init__, and ProcessingStage doesn’t accept those extra keywords. This will raise TypeError during instantiation of those dataclass stages.

Comment on lines 32 to 34
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = self.__class__.__name__
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to only work as intended for PreserveByValueStage since that is the only child that does a super().__init__.

Comment on lines 183 to 192
if result is None:
continue
if isinstance(result, list):
for r in result:
if r is not task and hasattr(r, "_stage_perf") and not r._stage_perf:
r._stage_perf = list(task._stage_perf)
results.extend(result)
else:
if result is not task and hasattr(result, "_stage_perf") and not result._stage_perf:
result._stage_perf = list(task._stage_perf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am able to reproduce what @SwekeR-463 has seen with vs without this change. Although I would like to investigate if there is something that can be changed on the audio side instead of here...

"""

name = "AudioToDocumentStage"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I found the bug! The process function below does not explicitly forward the _stage_perf list when returning the DocumentBatch.

We may need to update this elsewhere too... then we should be able to remove the changes from nemo_curator/stages/base.py.

@SwekeR-463 let me know what you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: make sure _stage_perf=task._stage_perf is being propagated everywhere when an AudioBatch/DocumentBatch is being returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a ton @sarahyurick for debugging this 🙏

@SwekeR-463 here is an example of how other stages propogate it

_metadata=batch._metadata,
_stage_perf=batch._stage_perf,

Copy link
Author

@SwekeR-463 SwekeR-463 Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: make sure _stage_perf=task._stage_perf is being propagated everywhere when an AudioBatch/DocumentBatch is being returned.

made changes as per mentioned. if any other changes required please tell. 😄

SwekeR-463 and others added 7 commits February 10, 2026 06:50
Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
…e hardcoded names

Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
* Fuse document iterate and extract stages

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* ruff

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* fix bug

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* update docs and tutorial

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* save progress

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* update more tests

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* ruff

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* fix tests

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* ruff

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* update benchmark

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* move class

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* add missing import

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* update comment

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

---------

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Co-authored-by: Ayush Dattagupta <ayushdg95@gmail.com>
Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
* sdg ray docs init

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* header, tab fixes

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* style guide

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* release notes change, bump version

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* feedback

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* readme

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* updates

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* updates

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* updates

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* updates

Signed-off-by: Lawrence Lane <llane@nvidia.com>

* Update tutorials/synthetic/README.md

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: Lawrence Lane <llane@nvidia.com>

---------

Signed-off-by: Lawrence Lane <llane@nvidia.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
Signed-off-by: Dong Hyuk Chang <donghyukc@nvidia.com>
Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
…set stage names

Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
@SwekeR-463 SwekeR-463 force-pushed the fix/stage-name-propagate branch from 67db1ad to 3a413f5 Compare February 10, 2026 01:21
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 33 to 35
super().__init__()

def process(self, task: AudioBatch) -> list[Task]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_stage_perf not propagated
LegacySpeechStage.process() copies _stage_perf only when the returned item already has that attribute (hasattr(r, "_stage_perf")). Many audio stages return new tasks like AudioBatch(data=...) without passing _stage_perf, so they won’t receive the prior perf chain and the stage name still won’t show up in later aggregation. Consider unconditionally propagating when creating new task objects (e.g., construct AudioBatch(..., _stage_perf=task._stage_perf) in process_dataset_entry callers, or in this loop detect known Task types and set _stage_perf even if currently empty).

Comment on lines 66 to 68
text_key: str = "text"
pred_text_key: str = "pred_text"
wer_key: str = "wer"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New tasks drop perf
process_dataset_entry() returns a freshly-constructed AudioBatch(data=...) without _stage_perf, and LegacySpeechStage.process() only copies perf when hasattr(r, "_stage_perf"). This means the perf chain will be lost for this stage’s outputs. Pass _stage_perf=task._stage_perf when constructing the AudioBatch (and similarly for other stages returning new Task instances).

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Feb 10, 2026

Additional Comments (1)

nemo_curator/stages/audio/common.py
Dataclass init inconsistency
GetAudioDurationStage now adds __post_init__ to call super().__init__(), but other @dataclass subclasses of LegacySpeechStage (e.g., GetPairwiseWerStage) don’t, so LegacySpeechStage.__init__ may never run for them. If LegacySpeechStage.__init__ is required (even if empty today), consider making it unnecessary (remove it) or enforce a consistent pattern across all dataclass subclasses.

Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
@SwekeR-463 SwekeR-463 force-pushed the fix/stage-name-propagate branch from 3b0170e to 38cf9bc Compare February 10, 2026 01:47
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

31 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines 32 to 39
def process(self, task: AudioBatch) -> list[Task]:
result = []
for entry in task.data:
result.extend(self.process_dataset_entry(entry))
entries = self.process_dataset_entry(entry)
for r in entries:
if r is not task:
r._stage_perf = list(task._stage_perf)
result.extend(entries)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perf chain overwritten

LegacySpeechStage.process() unconditionally overwrites r._stage_perf for every returned task (r._stage_perf = list(task._stage_perf)). If a downstream stage’s process_dataset_entry() returns a task that already has its own perf entries (e.g., it appends to _stage_perf before returning, or returns an object that already accumulated perf), this assignment will drop those entries and lose timing/name data. Consider extending instead of overwriting (e.g., only copy when missing/empty, or merge with r._stage_perf).

Signed-off-by: SwekeR-463 <swekerswasti@gmail.com>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +35 to +38
entries = self.process_dataset_entry(entry)
for r in entries:
if r is not task and not r._stage_perf:
r._stage_perf = list(task._stage_perf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe _stage_perf access

LegacySpeechStage.process() assumes every returned entry has a _stage_perf attribute (not r._stage_perf), but several audio stages return freshly-constructed AudioBatch(data=...) without _stage_perf (e.g., GetAudioDurationStage.process_dataset_entry() at common.py:73). In that case this will raise AttributeError at runtime. Consider guarding with hasattr(r, "_stage_perf") (or using getattr(r, "_stage_perf", None)), and then propagating from task._stage_perf when missing/empty.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Audio stage names aren't being propogated as noticed in benchmark script

5 participants