-
-
Notifications
You must be signed in to change notification settings - Fork 936
Add iterator slice helpers and channel distinct/tee utilities #791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
ed15ab0
5da04ca
d303386
baf3460
1c8b999
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -325,3 +325,71 @@ func FanOut[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T { | |
|
|
||
| return channelsToReadOnly(downstreams) | ||
| } | ||
|
|
||
| // Distinct returns a channel with duplicate values removed. | ||
| // The first occurrence is preserved, and ordering is maintained. | ||
| // Memory usage grows with the number of distinct values and can be unbounded. | ||
| func Distinct[T comparable](upstream <-chan T) <-chan T { | ||
| unique := make(chan T) | ||
|
|
||
| go func() { | ||
| defer close(unique) | ||
| seenValues := make(map[T]struct{}) | ||
|
|
||
| for value := range upstream { | ||
| if _, ok := seenValues[value]; ok { | ||
| continue | ||
| } | ||
| seenValues[value] = struct{}{} | ||
| unique <- value | ||
| } | ||
| }() | ||
|
|
||
| return unique | ||
| } | ||
|
|
||
| // DistinctBy returns a channel with duplicate values removed based on the provided key. | ||
| // The first occurrence is preserved, and ordering is maintained. | ||
| // Memory usage grows with the number of distinct keys and can be unbounded. | ||
| func DistinctBy[T any, K comparable](upstream <-chan T, key func(item T) K) <-chan T { | ||
| unique := make(chan T) | ||
|
|
||
| go func() { | ||
| defer close(unique) | ||
| seenKeys := make(map[K]struct{}) | ||
|
|
||
| for value := range upstream { | ||
| keyValue := key(value) | ||
| if _, ok := seenKeys[keyValue]; ok { | ||
| continue | ||
| } | ||
| seenKeys[keyValue] = struct{}{} | ||
| unique <- value | ||
| } | ||
| }() | ||
|
|
||
| return unique | ||
| } | ||
|
|
||
| // Tee duplicates the stream into multiple output channels without blocking. | ||
| // If an output channel is full or not ready, the value is dropped for that channel. | ||
| func Tee[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T { | ||
|
Comment on lines
+374
to
+376
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In my opinion, this function is too specific.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean we should remove Tee because FanOut/FanIn already cover this, or keep it as a separate helper? |
||
| downStreams := createChannels[T](count, channelsBufferCap) | ||
|
|
||
| go func() { | ||
| for item := range upstream { | ||
| for i := range downStreams { | ||
| select { | ||
| case downStreams[i] <- item: | ||
| default: | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for i := range downStreams { | ||
| close(downStreams[i]) | ||
| } | ||
| }() | ||
|
|
||
| return channelsToReadOnly(downStreams) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| --- | ||
| name: Distinct | ||
| slug: distinct | ||
| sourceRef: channel.go#L329 | ||
| category: core | ||
| subCategory: channel | ||
| signatures: | ||
| - "func Distinct[T comparable](upstream <-chan T) <-chan T" | ||
| similarHelpers: | ||
| - core#slice#uniq | ||
| - core#slice#uniqby | ||
| position: 258 | ||
| --- | ||
|
|
||
| Removes duplicate values from a channel while preserving order. | ||
|
|
||
| ```go | ||
| ch := lo.SliceToChannel(10, []int{1, 2, 1, 3, 2, 4}) | ||
| distinct := lo.Distinct(ch) | ||
| var result []int | ||
| for v := range distinct { | ||
| result = append(result, v) | ||
| } | ||
| // result contains [1, 2, 3, 4] | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| --- | ||
| name: DistinctBy | ||
| slug: distinctby | ||
| sourceRef: channel.go#L350 | ||
| category: core | ||
| subCategory: channel | ||
| signatures: | ||
| - "func DistinctBy[T any, K comparable](upstream <-chan T, key func(item T) K) <-chan T" | ||
| similarHelpers: | ||
| - core#slice#uniq | ||
| - core#slice#uniqby | ||
| position: 259 | ||
| --- | ||
|
|
||
| Removes duplicate values from a channel based on a key function while preserving order. | ||
|
|
||
| ```go | ||
| type user struct { | ||
| ID int | ||
| Name string | ||
| } | ||
|
|
||
| ch := lo.SliceToChannel(10, []user{ | ||
| {ID: 1, Name: "Alice"}, | ||
| {ID: 2, Name: "Bob"}, | ||
| {ID: 1, Name: "Alicia"}, | ||
| }) | ||
|
|
||
| distinct := lo.DistinctBy(ch, func(u user) int { return u.ID }) | ||
| var result []user | ||
| for v := range distinct { | ||
| result = append(result, v) | ||
| } | ||
| // result contains [{1 Alice} {2 Bob}] | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| --- | ||
| name: Tee | ||
| slug: tee | ||
| sourceRef: channel.go#L369 | ||
| category: core | ||
| subCategory: channel | ||
| signatures: | ||
| - "func Tee[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T" | ||
| similarHelpers: | ||
| - core#channel#fanout | ||
| - core#channel#channeldispatcher | ||
| position: 257 | ||
| --- | ||
|
|
||
| Tee duplicates values into multiple channels. If an output channel is not ready, the value is dropped for that channel. | ||
|
|
||
| ```go | ||
| upstream := lo.SliceToChannel(10, []int{0, 1, 2, 3}) | ||
| downstreams := lo.Tee(2, 2, upstream) | ||
| // Each downstream receives a best-effort copy of the stream. | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| --- | ||
| name: Sliding | ||
| slug: sliding | ||
| sourceRef: it/seq.go#L329 | ||
| category: it | ||
| subCategory: sequence | ||
| signatures: | ||
| - "func Sliding[T any](collection iter.Seq[T], size, step int) iter.Seq[[]T]" | ||
| variantHelpers: | ||
| - it#sequence#sliding | ||
| similarHelpers: | ||
| - core#slice#sliding | ||
| position: 80 | ||
| --- | ||
|
|
||
| Creates a sequence of sliding windows of a given size with a given step. If step equals size, windows don't overlap. | ||
|
|
||
| ```go | ||
| seq := func(yield func(int) bool) { | ||
| yield(1) | ||
| yield(2) | ||
| yield(3) | ||
| yield(4) | ||
| yield(5) | ||
| yield(6) | ||
| yield(7) | ||
| yield(8) | ||
| } | ||
| windows := it.Sliding(seq, 2, 3) | ||
| var result [][]int | ||
| for w := range windows { | ||
| result = append(result, w) | ||
| } | ||
| // result contains [1 2], [4 5], [7 8] | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| --- | ||
| name: Take | ||
| slug: take | ||
| sourceRef: it/seq.go#L682 | ||
| category: it | ||
| subCategory: sequence | ||
| signatures: | ||
| - "func Take[T any, I ~func(func(T) bool)](collection I, n int) I" | ||
| variantHelpers: | ||
| - it#sequence#take | ||
| similarHelpers: | ||
| - core#slice#take | ||
| position: 110 | ||
| --- | ||
|
|
||
| Takes the first n elements from a sequence. | ||
|
|
||
| ```go | ||
| seq := func(yield func(int) bool) { | ||
| yield(1) | ||
| yield(2) | ||
| yield(3) | ||
| yield(4) | ||
| } | ||
| result := it.Take(seq, 2) | ||
| var out []int | ||
| for v := range result { | ||
| out = append(out, v) | ||
| } | ||
| // out contains [1, 2] | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| --- | ||
| name: TakeFilter | ||
| slug: takefilter | ||
| sourceRef: it/seq.go#L729 | ||
| category: it | ||
| subCategory: sequence | ||
| signatures: | ||
| - "func TakeFilter[T any, I ~func(func(T) bool)](collection I, n int, predicate func(item T, index int) bool) I" | ||
| variantHelpers: | ||
| - it#sequence#takefilter | ||
| similarHelpers: | ||
| - core#slice#takefilter | ||
| position: 130 | ||
| --- | ||
|
|
||
| Filters elements and takes the first n matches. Stops once n matches are found. | ||
|
|
||
| ```go | ||
| seq := func(yield func(int) bool) { | ||
| yield(1) | ||
| yield(2) | ||
| yield(3) | ||
| yield(4) | ||
| yield(5) | ||
| yield(6) | ||
| } | ||
| result := it.TakeFilter(seq, 2, func(x, _ int) bool { | ||
| return x%2 == 0 | ||
| }) | ||
| var out []int | ||
| for v := range result { | ||
| out = append(out, v) | ||
| } | ||
| // out contains [2, 4] | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should at least be mentioned that memory can run out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a note in the doc comments