Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion biome.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@
"indentWidth": 2
},
"files": {
"ignore": ["**/dist", "**/node_modules"]
"ignore": ["**/dist", "**/node_modules", "**/build"]
}
}
83 changes: 83 additions & 0 deletions stream-helpers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,89 @@ function* example() {
}
```

### Take

The `take` helper creates a stream transformer that yields the first `n` values
from the source stream, then closes with the last taken value.

```typescript
import { take, forEach, streamOf } from "@effectionx/stream-helpers";
import { pipe } from "remeda";

function* example() {
const stream = streamOf([1, 2, 3, 4, 5]);
// yields 1, 2, then closes with 3
const closeValue = yield* forEach(function* (value) {
console.log(value); // 1, then 2
}, take(3)(stream));
console.log(closeValue); // 3
}

// Works with pipe
const limited = pipe(source, take(3));
```

### TakeWhile

The `takeWhile` helper creates a stream transformer that yields values while
the predicate returns true. When the predicate returns false, the stream closes
immediately (the failing value is not included).

```typescript
import { takeWhile, forEach, streamOf } from "@effectionx/stream-helpers";
import { pipe } from "remeda";

function* example() {
const stream = streamOf([1, 2, 3, 4, 5]);
// yields 1, 2 (stops when value >= 3)
yield* forEach(function* (value) {
console.log(value); // 1, then 2
}, takeWhile((x) => x < 3)(stream));
}

// Works with pipe
const filtered = pipe(source, takeWhile((x) => x.isValid));
```

### TakeUntil

The `takeUntil` helper creates a stream transformer that yields values until
the predicate returns true. When the predicate matches, the stream closes with
the matching value. This is useful for "iterate until a condition is met"
patterns.

```typescript
import { takeUntil, forEach } from "@effectionx/stream-helpers";
import { pipe } from "remeda";

function* example() {
// Iterate validation progress until we get a terminal status
const result = yield* forEach(
function* (progress) {
showSpinner(progress.status);
},
takeUntil((p) => p.status === "valid" || p.status === "invalid")(
validationStream,
),
);

// result is the validation object with terminal status
if (result.status === "valid") {
console.log("Validation passed!");
}
}

// Works with pipe
const untilDone = pipe(source, takeUntil((x) => x.done));
```

**Key difference between `takeWhile` and `takeUntil`:**

| Helper | Yields | Closes With | Use Case |
| ----------- | ----------------------- | -------------- | ----------------------- |
| `takeWhile` | While predicate is true | `undefined` | "Keep going while good" |
| `takeUntil` | Until predicate is true | Matching value | "Stop when you find it" |

### ForEach

The `forEach` helper invokes a function for each item passing through a stream.
Expand Down
3 changes: 3 additions & 0 deletions stream-helpers/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ export * from "./reduce.ts";
export * from "./drain.ts";
export * from "./first.ts";
export * from "./last.ts";
export * from "./take.ts";
export * from "./take-while.ts";
export * from "./take-until.ts";
113 changes: 113 additions & 0 deletions stream-helpers/take-until.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { describe, it } from "@effectionx/bdd";
import { expect } from "expect";
import { pipe } from "remeda";

import { forEach } from "./for-each.ts";
import { streamOf } from "./stream-of.ts";
import { takeUntil } from "./take-until.ts";

describe("takeUntil", () => {
it("should yield values until predicate is true, then close with matching value", function* () {
const values: { status: string }[] = [];

const closeValue = yield* forEach(
function* (value) {
values.push(value);
},
takeUntil((x: { status: string }) => x.status === "valid")(
streamOf([
{ status: "pending" },
{ status: "checking" },
{ status: "valid" },
{ status: "extra" },
]),
),
);

expect(values).toEqual([{ status: "pending" }, { status: "checking" }]);
expect(closeValue).toEqual({ status: "valid" });
});

it("should return source close value if stream ends before predicate matches", function* () {
const values: { status: string }[] = [];

const stream = streamOf(
(function* () {
yield { status: "pending" };
yield { status: "checking" };
return "no-match";
})(),
);

const closeValue = yield* forEach(
function* (value) {
values.push(value);
},
takeUntil((x: { status: string }) => x.status === "valid")(stream),
);

expect(values).toEqual([{ status: "pending" }, { status: "checking" }]);
expect(closeValue).toBe("no-match");
});

it("should close immediately with first value if it matches predicate", function* () {
const values: { status: string }[] = [];

const closeValue = yield* forEach(
function* (value) {
values.push(value);
},
takeUntil((x: { status: string }) => x.status === "valid")(
streamOf([{ status: "valid" }, { status: "extra" }]),
),
);

expect(values).toEqual([]);
expect(closeValue).toEqual({ status: "valid" });
});

it("should work with validation progress pattern", function* () {
type ValidationProgress =
| { status: "validating" }
| { status: "checking-inventory" }
| { status: "valid"; data: string }
| { status: "invalid"; errors: string[] };

const progressStatuses: string[] = [];

const result = yield* forEach(
function* (progress) {
progressStatuses.push(progress.status);
},
takeUntil(
(p: ValidationProgress) =>
p.status === "valid" || p.status === "invalid",
)(
streamOf<ValidationProgress, void>([
{ status: "validating" },
{ status: "checking-inventory" },
{ status: "valid", data: "ok" },
]),
),
);

expect(progressStatuses).toEqual(["validating", "checking-inventory"]);
expect(result).toEqual({ status: "valid", data: "ok" });
});

it("should work with pipe", function* () {
const values: number[] = [];

const stream = pipe(
streamOf([1, 2, 3, 4, 5]),
takeUntil((x) => x === 4),
);

const closeValue = yield* forEach(function* (value) {
values.push(value);
}, stream);

expect(values).toEqual([1, 2, 3]);
expect(closeValue).toBe(4);
});
});
76 changes: 76 additions & 0 deletions stream-helpers/take-until.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import type { Stream } from "effection";

/**
* Creates a stream transformer that yields values from the source stream
* until the predicate returns true. Closes with the matching value when
* the predicate returns true.
*
* This is useful for "iterate until a condition is met" patterns, where
* the matching value is meaningful (e.g., a terminal status).
*
* If the source stream closes before the predicate returns true, the
* resulting stream closes with the source's close value.
*
* @template T - The type of items in the stream
* @template TClose - The type of the close value
* @param predicate - A function that returns true to stop taking values
* @returns A stream transformer that yields values until predicate is true,
* closing with the matching value
*
* @example
* ```typescript
* import { takeUntil, forEach } from "@effectionx/stream-helpers";
*
* // Iterate validation progress until we get a terminal status
* const result = yield* forEach(function*(progress) {
* showSpinner(progress.status);
* }, takeUntil((p) => p.status === "valid" || p.status === "invalid")(channel));
*
* // result is the validation object with terminal status
* if (result.status === "valid") {
* // proceed
* }
* ```
*
* @example
* ```typescript
* import { takeUntil, map } from "@effectionx/stream-helpers";
* import { pipe } from "remeda";
*
* const limited = pipe(
* source,
* takeUntil((x) => x.done),
* );
* ```
*/
export function takeUntil<T>(
predicate: (item: T) => boolean,
): <TClose>(stream: Stream<T, TClose>) => Stream<T, T | TClose> {
return <TClose>(stream: Stream<T, TClose>): Stream<T, T | TClose> => ({
*[Symbol.iterator]() {
const subscription = yield* stream;
let done = false;

return {
*next() {
if (done) {
return { done: true, value: undefined as unknown as T | TClose };
}

const result = yield* subscription.next();
if (result.done) {
return result;
}

if (predicate(result.value)) {
done = true;
// Close with the matching value
return { done: true, value: result.value };
}

return result;
},
};
},
});
}
89 changes: 89 additions & 0 deletions stream-helpers/take-while.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { describe, it } from "@effectionx/bdd";
import { expect } from "expect";
import { pipe } from "remeda";

import { forEach } from "./for-each.ts";
import { streamOf } from "./stream-of.ts";
import { takeWhile } from "./take-while.ts";

describe("takeWhile", () => {
it("should yield values while predicate is true", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
function* (value) {
values.push(value);
},
takeWhile((x: number) => x < 3)(streamOf([1, 2, 3, 4, 5])),
);

expect(values).toEqual([1, 2]);
expect(closeValue).toBe(undefined);
});

it("should return source close value if stream ends before predicate fails", function* () {
const values: number[] = [];

const stream = streamOf(
(function* () {
yield 1;
yield 2;
return "early-close";
})(),
);

const closeValue = yield* forEach(
function* (value) {
values.push(value);
},
takeWhile((x: number) => x < 10)(stream),
);

expect(values).toEqual([1, 2]);
expect(closeValue).toBe("early-close");
});

it("should not include the failing value", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
function* (value) {
values.push(value);
},
takeWhile((x: number) => x < 50)(streamOf([1, 2, 100, 3])),
);

expect(values).toEqual([1, 2]);
expect(closeValue).toBe(undefined);
});

it("should stop immediately if first value fails predicate", function* () {
const values: number[] = [];

const closeValue = yield* forEach(
function* (value) {
values.push(value);
},
takeWhile((x: number) => x < 50)(streamOf([100, 1, 2])),
);

expect(values).toEqual([]);
expect(closeValue).toBe(undefined);
});

it("should work with pipe", function* () {
const values: number[] = [];

const stream = pipe(
streamOf([1, 2, 3, 4, 5]),
takeWhile((x) => x < 4),
);

const closeValue = yield* forEach(function* (value) {
values.push(value);
}, stream);

expect(values).toEqual([1, 2, 3]);
expect(closeValue).toBe(undefined);
});
});
Loading
Loading