Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4bf68fb
refactor: enhance backpressure handling with byte tracking in HTTP/2 …
EarthChen Jan 20, 2026
fc0718a
Update dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/…
EarthChen Jan 20, 2026
3272a1d
Update dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/…
EarthChen Jan 20, 2026
13ed0fe
fix
EarthChen Jan 20, 2026
6d7d1ba
refactor: expose methods for byte tracking in AbstractTripleClientStr…
EarthChen Jan 20, 2026
244ba4e
refactor: enhance error handling and byte tracking in HTTP/2 stream o…
EarthChen Jan 20, 2026
41a031b
refactor: improve closing logic and state management in LengthFieldSt…
EarthChen Jan 21, 2026
05deae1
Merge branch '3.3' of github.com:apache/dubbo into 3.3
EarthChen Jan 22, 2026
f7de211
fix: prevent race condition in stream initialization
EarthChen Jan 22, 2026
3c025c9
Merge branch 'apache:3.3' into 3.3
EarthChen Jan 22, 2026
abb678d
Merge branch '3.3' of github.com:EarthChen/dubbo into 3.3
EarthChen Jan 22, 2026
96714b8
Merge branch 'fix-init-stream-race' into 3.3
EarthChen Jan 22, 2026
05f0b17
refactor: unify client and server deframer for Triple protocol
EarthChen Jan 23, 2026
ae2600c
refactor: update tri serialization methods to use InputStream and Out…
EarthChen Jan 23, 2026
a9d1e39
Merge branch '3.3' of github.com:apache/dubbo into refactor-tri-drframer
EarthChen Jan 23, 2026
52cabcf
refactor: update tri serialization methods to use InputStream and Out…
EarthChen Jan 23, 2026
07caa6b
Merge branch '3.3' into refactor-packable-stream
zrlw Jan 23, 2026
dbe9d7d
Update dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/…
EarthChen Jan 24, 2026
598a2a7
Merge branch 'refactor-tri-drframer' into refactor-packable-stream
EarthChen Jan 24, 2026
3128bd2
refactor: enhance error handling in deframer by using try-with-resour…
EarthChen Jan 24, 2026
4ba46e6
fix
EarthChen Jan 24, 2026
39f647d
refactor: unify client and server deframer for Triple protocol
EarthChen Jan 23, 2026
aed26d0
refactor: enhance error handling in deframer by using try-with-resour…
EarthChen Jan 24, 2026
c97786e
refactor: update tri serialization methods to use InputStream and Out…
EarthChen Jan 23, 2026
ccf7d0d
fix
EarthChen Jan 24, 2026
c3e6699
Merge branch 'refactor-tri-drframer' into refactor-packable-stream
EarthChen Jan 24, 2026
9f2f062
refactor: update tri serialization methods to use InputStream and Out…
EarthChen Jan 23, 2026
9f88aac
Merge branch 'refactor-tri-drframer' into refactor-packable-stream
EarthChen Jan 24, 2026
7a35d21
fix
EarthChen Jan 24, 2026
3961b0c
Merge branch '3.3' of github.com:EarthChen/dubbo into 3.3
EarthChen Feb 4, 2026
a341043
Merge branch '3.3' into refactor-packable-stream
EarthChen Feb 4, 2026
a319765
Merge branch '3.3' of github.com:apache/dubbo into 3.3
EarthChen Feb 4, 2026
bb6d634
Merge branch '3.3' into refactor-packable-stream
EarthChen Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.OutputStream;

public interface Pack {

/**
* @param obj instance
* @return byte array
* @throws Exception when error occurs
* @deprecated use {@link #pack(Object, OutputStream)} instead
*/
@Deprecated
byte[] pack(Object obj) throws Exception;

default void pack(Object obj, OutputStream out) throws Exception {
out.write(pack(obj));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,35 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.InputStream;
import java.io.OutputStream;

/**
* A packable method is used to customize serialization for methods. It can provide a common wrapper
* for RESP / Protobuf.
*/
public interface PackableMethod {

/**
* @deprecated use {@link #parseRequest(InputStream)} instead
*/
@Deprecated
default Object parseRequest(byte[] data) throws Exception {
return getRequestUnpack().unpack(data);
}

/**
* @deprecated use {@link #parseResponse(InputStream)} instead
*/
@Deprecated
default Object parseResponse(byte[] data) throws Exception {
return parseResponse(data, false);
}

/**
* @deprecated use {@link #parseResponse(InputStream, boolean)} instead
*/
@Deprecated
default Object parseResponse(byte[] data, boolean isReturnTriException) throws Exception {
UnPack unPack = getResponseUnpack();
if (unPack instanceof WrapperUnPack) {
Expand All @@ -38,14 +53,46 @@ default Object parseResponse(byte[] data, boolean isReturnTriException) throws E
return unPack.unpack(data);
}

/**
* @deprecated use {@link #packRequest(Object, OutputStream)} instead
*/
@Deprecated
default byte[] packRequest(Object request) throws Exception {
return getRequestPack().pack(request);
}

/**
* @deprecated use {@link #packResponse(Object, OutputStream)} instead
*/
@Deprecated
default byte[] packResponse(Object response) throws Exception {
return getResponsePack().pack(response);
}

default Object parseRequest(InputStream inputStream) throws Exception {
return getRequestUnpack().unpack(inputStream);
}

default Object parseResponse(InputStream inputStream) throws Exception {
return parseResponse(inputStream, false);
}

default Object parseResponse(InputStream inputStream, boolean isReturnTriException) throws Exception {
UnPack unPack = getResponseUnpack();
if (unPack instanceof WrapperUnPack) {
return ((WrapperUnPack) unPack).unpack(inputStream, isReturnTriException);
}
return unPack.unpack(inputStream);
}

default void packRequest(Object request, OutputStream outputStream) throws Exception {
getRequestPack().pack(request, outputStream);
}

default void packResponse(Object response, OutputStream outputStream) throws Exception {
getResponsePack().pack(response, outputStream);
}

default boolean needWrapper() {
return false;
}
Expand Down
18 changes: 15 additions & 3 deletions dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;

public interface UnPack {

/**
* @param data byte array
* @return object instance
* @throws Exception exception
* @deprecated use {@link #unpack(InputStream)} instead
*/
@Deprecated
Object unpack(byte[] data) throws Exception;

default Object unpack(InputStream inputStream) throws Exception {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
byte[] tmp = new byte[4096];
int len;
while ((len = inputStream.read(tmp)) != -1) {
buffer.write(tmp, 0, len);
}
return unpack(buffer.toByteArray());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,34 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;

public interface WrapperUnPack extends UnPack {

@Override
default Object unpack(byte[] data) throws Exception {
return unpack(data, false);
}

/**
* @deprecated use {@link #unpack(InputStream, boolean)} instead
*/
@Deprecated
Object unpack(byte[] data, boolean isReturnTriException) throws Exception;

@Override
default Object unpack(InputStream inputStream) throws Exception {
return unpack(inputStream, false);
}

default Object unpack(InputStream inputStream, boolean isReturnTriException) throws Exception {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
byte[] tmp = new byte[4096];
int len;
while ((len = inputStream.read(tmp)) != -1) {
buffer.write(tmp, 0, len);
}
return unpack(buffer.toByteArray(), isReturnTriException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,15 @@ public void close() throws IOException {
}
}

private void releaseHeadStream() throws IOException {
InputStream removeStream = inputStreams.remove();
removeStream.close();
private void releaseHeadStream() {
InputStream removeStream = inputStreams.poll();
if (removeStream != null) {
try {
removeStream.close();
} catch (IOException ignore) {
// ignore
}
}
}

private void releaseIfNecessary(InputStream inputStream) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void close() {
return;
}
closed = true;
listener.onFragmentMessage(accumulate);
listener.onFragmentMessage(accumulate, accumulate.available());
accumulate.close();
listener.onClose();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -119,7 +120,9 @@ private void doClose() {
} catch (IOException e) {
// ignore
}
listener.onClose();
if (listener != null) {
listener.onClose();
}
}

/**
Expand Down Expand Up @@ -233,27 +236,33 @@ private void processBody() throws IOException {
// Calculate total bytes read: header (offset + length field) + payload
int totalBytesRead = lengthFieldOffset + lengthFieldLength + requiredLength;

byte[] rawMessage;
MessageStream messageStream;
try {
rawMessage = readRawMessage(accumulate, requiredLength);
messageStream = readMessageStream(accumulate, requiredLength);
} finally {
// Notify listener about bytes read for flow control immediately after reading bytes
// This must be in finally block to ensure flow control works even if reading fails
// Following gRPC's pattern: bytesRead is called as soon as bytes are consumed from input
listener.bytesRead(totalBytesRead);
}

// Process the message after notifying about bytes read
InputStream inputStream = new ByteArrayInputStream(rawMessage);
invokeListener(inputStream);
invokeListener(messageStream.inputStream, messageStream.length);

// Done with this frame, begin processing the next header.
state = DecodeState.HEADER;
requiredLength = lengthFieldOffset + lengthFieldLength;
}

public void invokeListener(InputStream inputStream) {
this.listener.onFragmentMessage(inputStream);
public void invokeListener(InputStream inputStream, int messageLength) {
this.listener.onFragmentMessage(inputStream, messageLength);
}

/**
* Read message from the input stream and return it as a MessageStream.
*/
protected MessageStream readMessageStream(InputStream inputStream, int length) throws IOException {
InputStream boundedStream = new BoundedInputStream(inputStream, length);
return new MessageStream(boundedStream, length);
}

protected byte[] readRawMessage(InputStream inputStream, int length) throws IOException {
Expand All @@ -262,6 +271,82 @@ protected byte[] readRawMessage(InputStream inputStream, int length) throws IOEx
return data;
}

protected static class MessageStream {

public final InputStream inputStream;
public final int length;

public MessageStream(InputStream inputStream, int length) {
this.inputStream = inputStream;
this.length = length;
}
}

/**
* A bounded InputStream that reads at most 'limit' bytes from the source stream.
* Extends BufferedInputStream to support mark/reset, which is required by
* deserializers like Hessian2.
*/
private static class BoundedInputStream extends BufferedInputStream {

private final int limit;
private int remaining;
private int markedRemaining;

public BoundedInputStream(InputStream source, int limit) {
super(source, limit);
this.limit = limit;
this.remaining = limit;
this.markedRemaining = limit;
}

@Override
public int read() throws IOException {
if (remaining <= 0) {
return -1;
}
int result = super.read();
if (result != -1) {
remaining--;
}
return result;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (remaining <= 0) {
return -1;
}
int toRead = Math.min(len, remaining);
int result = super.read(b, off, toRead);
if (result > 0) {
remaining -= result;
}
return result;
}

@Override
public int available() throws IOException {
return Math.min(super.available(), remaining);
}

@Override
public synchronized void mark(int readlimit) {
// Force readlimit to be at least the remaining message length.
// This ensures mark is always valid within the bounded stream,
// regardless of what readlimit is passed by the deserializer (e.g., Hessian2).
super.mark(Math.max(readlimit, limit));
markedRemaining = remaining;
}

@Override
public synchronized void reset() throws IOException {
super.reset();
// Restore the remaining count to the value at mark time
remaining = markedRemaining;
}
}

private boolean hasEnoughBytes() {
return requiredLength - accumulate.available() <= 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ interface FragmentListener {
void bytesRead(int numBytes);

/**
* @param rawMessage raw message
* Called when a complete message fragment is received.
*
* @param rawMessage raw message as InputStream
* @param messageLength the length of the message payload in bytes
*/
void onFragmentMessage(InputStream rawMessage);
void onFragmentMessage(InputStream rawMessage, int messageLength);

default void onClose() {}
}
Expand All @@ -59,6 +62,6 @@ private NoopFragmentListener() {}
public void bytesRead(int numBytes) {}

@Override
public void onFragmentMessage(InputStream rawMessage) {}
public void onFragmentMessage(InputStream rawMessage, int messageLength) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,13 @@ private void deliver() {

private void processBody() throws IOException {
byte[] rawMessage = readRawMessage(accumulate, accumulate.available());
int messageLength = rawMessage.length;
InputStream inputStream = new ByteArrayInputStream(rawMessage);
invokeListener(inputStream);
invokeListener(inputStream, messageLength);
}

protected void invokeListener(InputStream inputStream) {
this.listener.onFragmentMessage(inputStream);
protected void invokeListener(InputStream inputStream, int messageLength) {
this.listener.onFragmentMessage(inputStream, messageLength);
}

protected byte[] readRawMessage(InputStream inputStream, int length) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.remoting.http12.exception.UnimplementedException;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -125,9 +124,9 @@ public static MethodDescriptor findTripleMethodDescriptor(
ServiceDescriptor serviceDescriptor, String methodName, InputStream rawMessage) throws IOException {
MethodDescriptor methodDescriptor = findReflectionMethodDescriptor(serviceDescriptor, methodName);
if (methodDescriptor == null) {
Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using mark with Integer.MAX_VALUE as the readlimit parameter may not work with all InputStream implementations. The BoundedInputStream created in LengthFieldStreamingDecoder overrides mark to ensure sufficient buffer size, but this assumption may not hold for all input streams passed to this method. Consider checking if mark is supported using markSupported() before calling mark, or document that the input stream must support mark/reset.

Suggested change
if (methodDescriptor == null) {
if (methodDescriptor == null) {
if (!rawMessage.markSupported()) {
throw new IOException("InputStream must support mark/reset to resolve overloaded triple methods");
}

Copilot uses AI. Check for mistakes.
byte[] data = StreamUtils.readBytes(rawMessage);
rawMessage.mark(Integer.MAX_VALUE);
List<MethodDescriptor> methodDescriptors = serviceDescriptor.getMethods(methodName);
TripleRequestWrapper request = TripleRequestWrapper.parseFrom(data);
TripleRequestWrapper request = TripleRequestWrapper.parseFrom(rawMessage);
String[] paramTypes = request.getArgTypes().toArray(new String[0]);
// wrapper mode the method can overload so maybe list
for (MethodDescriptor descriptor : methodDescriptors) {
Expand Down
Loading
Loading