Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,3 +325,71 @@ func FanOut[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T {

return channelsToReadOnly(downstreams)
}

// Distinct returns a channel with duplicate values removed.
// The first occurrence is preserved, and ordering is maintained.
// Memory usage grows with the number of distinct values and can be unbounded.
func Distinct[T comparable](upstream <-chan T) <-chan T {
Comment on lines 329 to 332
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should at least be mentioned that memory can run out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note in the doc comments

unique := make(chan T)

go func() {
defer close(unique)
seenValues := make(map[T]struct{})

for value := range upstream {
if _, ok := seenValues[value]; ok {
continue
}
seenValues[value] = struct{}{}
unique <- value
}
}()

return unique
}

// DistinctBy returns a channel with duplicate values removed based on the provided key.
// The first occurrence is preserved, and ordering is maintained.
// Memory usage grows with the number of distinct keys and can be unbounded.
func DistinctBy[T any, K comparable](upstream <-chan T, key func(item T) K) <-chan T {
unique := make(chan T)

go func() {
defer close(unique)
seenKeys := make(map[K]struct{})

for value := range upstream {
keyValue := key(value)
if _, ok := seenKeys[keyValue]; ok {
continue
}
seenKeys[keyValue] = struct{}{}
unique <- value
}
}()

return unique
}

// Tee duplicates the stream into multiple output channels without blocking.
// If an output channel is full or not ready, the value is dropped for that channel.
func Tee[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T {
Comment on lines +374 to +376
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this function is too specific.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should remove Tee because FanOut/FanIn already cover this, or keep it as a separate helper?

downStreams := createChannels[T](count, channelsBufferCap)

go func() {
for item := range upstream {
for i := range downStreams {
select {
case downStreams[i] <- item:
default:
}
}
}

for i := range downStreams {
close(downStreams[i])
}
}()

return channelsToReadOnly(downStreams)
}
74 changes: 74 additions & 0 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,77 @@ func TestFanOut(t *testing.T) { //nolint:paralleltest
is.Zero(msg)
}
}

func TestDistinct(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)

upstream := SliceToChannel(10, []int{1, 2, 1, 3, 2, 4, 4})
result := ChannelToSlice(Distinct(upstream))
is.Equal([]int{1, 2, 3, 4}, result)

empty := make(chan int)
close(empty)
is.Empty(ChannelToSlice(Distinct(empty)))
}

func TestDistinctBy(t *testing.T) {
t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)

type user struct {
ID int
Name string
}
upstream := SliceToChannel(10, []user{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
{ID: 1, Name: "Alicia"},
{ID: 3, Name: "Eve"},
{ID: 2, Name: "Bobby"},
})
result := ChannelToSlice(DistinctBy(upstream, func(u user) int { return u.ID }))
is.Equal([]user{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
{ID: 3, Name: "Eve"},
}, result)
}

func TestTee(t *testing.T) { //nolint:paralleltest
// t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)

upstream := SliceToChannel(10, []int{0, 1, 2, 3})
downstreams := Tee(2, 10, upstream)

time.Sleep(10 * time.Millisecond)
is.Len(downstreams, 2)
for i := range downstreams {
is.Equal([]int{0, 1, 2, 3}, ChannelToSlice(downstreams[i]))
}

time.Sleep(10 * time.Millisecond)
for i := range downstreams {
msg, ok := <-downstreams[i]
is.False(ok)
is.Zero(msg)
}
}

func TestTeeDropsWhenDownstreamNotReady(t *testing.T) { //nolint:paralleltest
// t.Parallel()
testWithTimeout(t, 100*time.Millisecond)
is := assert.New(t)

upstream := SliceToChannel(10, []int{0, 1, 2, 3})
downstreams := Tee(2, 0, upstream)

time.Sleep(10 * time.Millisecond)
for i := range downstreams {
is.Empty(ChannelToSlice(downstreams[i]))
}
}
25 changes: 25 additions & 0 deletions docs/data/core-distinct.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
name: Distinct
slug: distinct
sourceRef: channel.go#L329
category: core
subCategory: channel
signatures:
- "func Distinct[T comparable](upstream <-chan T) <-chan T"
similarHelpers:
- core#slice#uniq
- core#slice#uniqby
position: 258
---

Removes duplicate values from a channel while preserving order.

```go
ch := lo.SliceToChannel(10, []int{1, 2, 1, 3, 2, 4})
distinct := lo.Distinct(ch)
var result []int
for v := range distinct {
result = append(result, v)
}
// result contains [1, 2, 3, 4]
```
35 changes: 35 additions & 0 deletions docs/data/core-distinctby.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
name: DistinctBy
slug: distinctby
sourceRef: channel.go#L350
category: core
subCategory: channel
signatures:
- "func DistinctBy[T any, K comparable](upstream <-chan T, key func(item T) K) <-chan T"
similarHelpers:
- core#slice#uniq
- core#slice#uniqby
position: 259
---

Removes duplicate values from a channel based on a key function while preserving order.

```go
type user struct {
ID int
Name string
}

ch := lo.SliceToChannel(10, []user{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
{ID: 1, Name: "Alicia"},
})

distinct := lo.DistinctBy(ch, func(u user) int { return u.ID })
var result []user
for v := range distinct {
result = append(result, v)
}
// result contains [{1 Alice} {2 Bob}]
```
1 change: 1 addition & 0 deletions docs/data/core-fanout.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ similarHelpers:
- core#channel#fanin
- core#channel#channeldispatcher
- it#channel#channelseq
- core#channel#tee
position: 256
---

Expand Down
1 change: 1 addition & 0 deletions docs/data/core-sliding.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ similarHelpers:
- core#slice#chunk
- core#slice#partitionby
- core#slice#flatten
- it#sequence#sliding
position: 150
signatures:
- "func Sliding[T any, Slice ~[]T](collection Slice, size, step int) []Slice"
Expand Down
1 change: 1 addition & 0 deletions docs/data/core-take.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ similarHelpers:
- core#slice#first
- core#slice#filtermap
- core#slice#takefilter
- it#sequence#take
position: 175
signatures:
- "func Take[T any, Slice ~[]T](collection Slice, n int) Slice"
Expand Down
1 change: 1 addition & 0 deletions docs/data/core-takefilter.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ similarHelpers:
- core#slice#takewhile
- core#slice#filtermap
- core#slice#filterreject
- it#sequence#takefilter
position: 125
signatures:
- "func TakeFilter[T any, Slice ~[]T](collection Slice, n int, predicate func(item T, index int) bool) Slice"
Expand Down
1 change: 1 addition & 0 deletions docs/data/core-takewhile.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ similarHelpers:
- core#slice#filter
- core#slice#takefilter
- core#slice#first
- it#sequence#takewhile
position: 195
signatures:
- "func TakeWhile[T any, Slice ~[]T](collection Slice, predicate func(item T) bool) Slice"
Expand Down
21 changes: 21 additions & 0 deletions docs/data/core-tee.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
name: Tee
slug: tee
sourceRef: channel.go#L369
category: core
subCategory: channel
signatures:
- "func Tee[T any](count, channelsBufferCap int, upstream <-chan T) []<-chan T"
similarHelpers:
- core#channel#fanout
- core#channel#channeldispatcher
position: 257
---

Tee duplicates values into multiple channels. If an output channel is not ready, the value is dropped for that channel.

```go
upstream := lo.SliceToChannel(10, []int{0, 1, 2, 3})
downstreams := lo.Tee(2, 2, upstream)
// Each downstream receives a best-effort copy of the stream.
```
1 change: 1 addition & 0 deletions docs/data/core-window.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ similarHelpers:
- core#slice#chunk
- core#slice#partitionby
- core#slice#flatten
- it#sequence#window
position: 145
signatures:
- "func Window[T any, Slice ~[]T](collection Slice, size int) []Slice"
Expand Down
35 changes: 35 additions & 0 deletions docs/data/it-sliding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
name: Sliding
slug: sliding
sourceRef: it/seq.go#L329
category: it
subCategory: sequence
signatures:
- "func Sliding[T any](collection iter.Seq[T], size, step int) iter.Seq[[]T]"
variantHelpers:
- it#sequence#sliding
similarHelpers:
- core#slice#sliding
position: 80
---

Creates a sequence of sliding windows of a given size with a given step. If step equals size, windows don't overlap.

```go
seq := func(yield func(int) bool) {
yield(1)
yield(2)
yield(3)
yield(4)
yield(5)
yield(6)
yield(7)
yield(8)
}
windows := it.Sliding(seq, 2, 3)
var result [][]int
for w := range windows {
result = append(result, w)
}
// result contains [1 2], [4 5], [7 8]
```
31 changes: 31 additions & 0 deletions docs/data/it-take.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
name: Take
slug: take
sourceRef: it/seq.go#L682
category: it
subCategory: sequence
signatures:
- "func Take[T any, I ~func(func(T) bool)](collection I, n int) I"
variantHelpers:
- it#sequence#take
similarHelpers:
- core#slice#take
position: 110
---

Takes the first n elements from a sequence.

```go
seq := func(yield func(int) bool) {
yield(1)
yield(2)
yield(3)
yield(4)
}
result := it.Take(seq, 2)
var out []int
for v := range result {
out = append(out, v)
}
// out contains [1, 2]
```
35 changes: 35 additions & 0 deletions docs/data/it-takefilter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
name: TakeFilter
slug: takefilter
sourceRef: it/seq.go#L729
category: it
subCategory: sequence
signatures:
- "func TakeFilter[T any, I ~func(func(T) bool)](collection I, n int, predicate func(item T, index int) bool) I"
variantHelpers:
- it#sequence#takefilter
similarHelpers:
- core#slice#takefilter
position: 130
---

Filters elements and takes the first n matches. Stops once n matches are found.

```go
seq := func(yield func(int) bool) {
yield(1)
yield(2)
yield(3)
yield(4)
yield(5)
yield(6)
}
result := it.TakeFilter(seq, 2, func(x, _ int) bool {
return x%2 == 0
})
var out []int
for v := range result {
out = append(out, v)
}
// out contains [2, 4]
```
Loading
Loading