Skip to content

feat(stream-helpers): add take, takeWhile, and takeUntil helpers#136

Open
taras wants to merge 2 commits intomainfrom
add-take-helpers
Open

feat(stream-helpers): add take, takeWhile, and takeUntil helpers#136
taras wants to merge 2 commits intomainfrom
add-take-helpers

Conversation

@taras
Copy link
Member

@taras taras commented Jan 30, 2026

Summary

Adds three new stream transformer helpers for limiting/filtering streams:

  • take(n): Yields the first n values, then closes with the nth value as the close value.

  • takeWhile(predicate): Yields values while the predicate returns true. Closes immediately when the predicate fails (does not include the failing value).

  • takeUntil(predicate): Yields values until the predicate returns true. Closes with the matching value (includes it as the close value).

All helpers use curried style for pipe compatibility.

Use Cases

take - Limit stream to first N values

import { take, streamOf } from "@effectionx/stream-helpers";
import { pipe } from "remeda";

const stream = streamOf([1, 2, 3, 4, 5]);
const limited = take(3)(stream);
// yields 1, 2, then closes with 3

// Works with pipe
const piped = pipe(source, take(3));

takeWhile - Take values while condition holds

import { takeWhile, streamOf } from "@effectionx/stream-helpers";
import { pipe } from "remeda";

const stream = streamOf([1, 2, 3, 4, 5]);
const limited = takeWhile((x) => x < 3)(stream);
// yields 1, 2, then closes (3 fails predicate, not included)

// Works with pipe
const piped = pipe(source, takeWhile((x) => x.isValid));

takeUntil - Iterate until terminal condition (wizard validation pattern)

import { takeUntil, forEach } from "@effectionx/stream-helpers";
import { pipe } from "remeda";

// Iterate validation progress until we get a terminal status
const result = yield* forEach(function*(progress) {
  showSpinner(progress.status);
}, takeUntil((p) => p.status === "valid" || p.status === "invalid")(channel));

// result is the validation object with terminal status
if (result.status === "valid") {
  // proceed
}

// Works with pipe
const piped = pipe(source, takeUntil((x) => x.done));

Key Difference: takeWhile vs takeUntil

Helper Yields Closes With Use Case
takeWhile While predicate is true undefined "Keep going while good"
takeUntil Until predicate is true Matching value "Stop when you find it"

The takeUntil pattern is particularly useful for validation flows where the matching value (e.g., {status: "valid", data: ...}) carries meaningful data you want to use.

Notes

  • All helpers use curried style for pipe compatibility: take(n)(stream)
  • Tests use streamOf() for simpler, synchronous test cases
  • takeWhile no longer drains the stream when predicate fails

@pkg-pr-new
Copy link

pkg-pr-new bot commented Jan 30, 2026

Open in StackBlitz

npm i https://pkg.pr.new/thefrontside/effectionx/@effectionx/stream-helpers@136

commit: cd20b59

@taras taras requested a review from cowboyd January 30, 2026 08:50
Adds three new stream transformer helpers for limiting/filtering streams:

- take(n): Yields first n values, closes with nth value. Curried for pipe compatibility.
- takeWhile(predicate): Yields while predicate is true, closes with source close value.
- takeUntil(predicate): Yields until predicate is true, closes with matching value.

These helpers enable composable stream limiting patterns, particularly
useful for validation flows where you iterate until a terminal status.

Also adds build directories to biome ignore list.
@taras
Copy link
Member Author

taras commented Feb 3, 2026

@claude can you review this PR for policy compliance

@claude
Copy link

claude bot commented Feb 3, 2026

Policy Compliance Assessment

Summary

Metric Count
Total items reviewed 9
Compliant 9
Violations 0
✅ All items compliant - no violations found

Items Reviewed

Test Files

  • stream-helpers/take-until.test.ts - No-Sleep Test Synchronization policy
  • stream-helpers/take-while.test.ts - No-Sleep Test Synchronization policy
  • stream-helpers/take.test.ts - No-Sleep Test Synchronization policy

Implementation Files

  • stream-helpers/take.ts - Stateless Stream Operations policy
  • stream-helpers/take-while.ts - Stateless Stream Operations policy
  • stream-helpers/take-until.ts - Stateless Stream Operations policy

Documentation & Configuration

  • stream-helpers/README.md - Documentation is clear and accurate
  • stream-helpers/mod.ts - Export additions follow existing patterns
  • biome.json - Configuration change has no policy implications

Policy Compliance Details

No-Sleep Test Synchronization Policy (Recommended)

All three test files properly use streamOf() for creating test data streams. No instances of sleep() used for test synchronization. All tests are deterministic.

Examples of compliant patterns found:

  • takeUntil.test.ts:15-24 - Uses streamOf() with predicate matching
  • takeWhile.test.ts:10-17 - Uses streamOf() for value filtering tests
  • take.test.ts:10-18 - Uses streamOf() for count-based limiting tests

Stateless Stream Operations Policy (Recommended)

All three implementation files correctly use the *[Symbol.iterator]() pattern for creating reusable, stateless stream operations.

Examples of compliant patterns found:

  • take.ts:38 - return <TClose>(stream: Stream<T, TClose>): Stream<T, T | TClose> => ({ *[Symbol.iterator]() { ... } })
  • take-while.ts:47 - Same pattern applied correctly
  • take-until.ts:49 - Same pattern applied correctly

Conclusion

This PR demonstrates excellent adherence to both applicable policies:

  • Tests use deterministic patterns with streamOf() instead of sleep()
  • Stream operations follow the stateless *[Symbol.iterator] pattern
  • Code is consistent with existing patterns in the codebase

No violations found. ✅


Policy review completed by Policy Compliance Agent
Policies: No-Sleep Test Synchronization, Stateless Stream Operations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant