Skip to content

Redis Streams Imperative creating massive connections that lead to connection problems #3288

@arne-kroeger

Description

@arne-kroeger

When creating a StreamMessageListenerContainer with Spring boot 4.0.1 written in Kotlin the schedulers seem to open massive connections until the connections stack is full and results in the following error:

java.net.BindException: Can't assign requested address

val create = StreamMessageListenerContainer.create(
            connectionFactory,
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofMillis(blockMillis))
                .targetType(JobRunInfo::class.java)
                .errorHandler { e ->
                    log.error("Redis Stream error", e) }
                .build()
        )

        create.receive(
            Consumer.from(groupName, consumerName),
            StreamOffset.fromStart(streamKey), {
                handler.invoke(it)
            })

create.start()

Full stacktrace:

org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1868) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1799) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.doGetAsyncDedicatedConnection(LettuceConnection.java:968) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.getOrCreateDedicatedConnection(LettuceConnection.java:1022) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.getAsyncDedicatedConnection(LettuceConnection.java:953) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.getAsyncDedicatedConnection(LettuceStreamCommands.java:328) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:292) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$1(DefaultStreamMessageListenerContainer.java:245) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:413) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:380) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:368) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$0(DefaultStreamMessageListenerContainer.java:244) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:147) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:128) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:113) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost/<unresolved>:49952
	at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:63) ~[lettuce-core-6.8.1.RELEASE.jar:6.8.1.RELEASE/ec0535e]
	at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:41) ~[lettuce-core-6.8.1.RELEASE.jar:6.8.1.RELEASE/ec0535e]
	at io.lettuce.core.AbstractRedisClient.getConnection(AbstractRedisClient.java:352) ~[lettuce-core-6.8.1.RELEASE.jar:6.8.1.RELEASE/ec0535e]
	at io.lettuce.core.RedisClient.connect(RedisClient.java:220) ~[lettuce-core-6.8.1.RELEASE.jar:6.8.1.RELEASE/ec0535e]
	at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.lambda$getConnection$1(StandaloneConnectionProvider.java:113) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at java.base/java.util.Optional.orElseGet(Optional.java:364) ~[na:na]
	at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.getConnection(StandaloneConnectionProvider.java:113) ~[spring-data-redis-4.0.1.jar:4.0.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1797) ~[spring-data-redis-4.0.1.jar:4.0.1]
	... 14 common frames omitted
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Can't assign requested address: localhost/127.0.0.1:49952
Caused by: java.net.BindException: Can't assign requested address
	at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
	at java.base/sun.nio.ch.Net.connect(Net.java:589) ~[na:na]
	at java.base/sun.nio.ch.Net.connect(Net.java:596) ~[na:na]
	at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:880) ~[na:na]
	at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:91) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.internal.SocketUtils$3.run(SocketUtils.java:88) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:571) ~[na:na]
	at io.netty.util.internal.SocketUtils.connect(SocketUtils.java:88) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:315) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:295) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1361) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:547) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:526) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:510) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:999) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.Channel.connect(Channel.java:297) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.bootstrap.Bootstrap$3.run(Bootstrap.java:264) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:148) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:141) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:535) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:201) ~[netty-transport-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1195) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.2.9.Final.jar:4.2.9.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

Tested on Mac OS.

I created a repro project: https://github.com/arne-kroeger/demo-redis-repro (here the setup is done via TestController SmartLifecycle start.

When I remove the group configuration Consumer.from(groupName, consumerName) everything works perfect.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions