Refactor tri pack & unpack use stream#16045
Conversation
…dubbo/remoting/http12/h2/Http2ServerChannelObserver.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…dubbo/remoting/http12/h2/Http2ServerChannelObserver.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…eam and Http2ServerChannelObserver
This commit unifies the deframer implementation between client and server sides for the Triple protocol, eliminating code duplication and improving maintainability. ## Key Changes ### Deleted Files - TriDecoder.java: Removed client-specific deframer implementation - Deframer.java: Removed client deframer interface - RecordListener.java: Removed test helper - TriDecoderTest.java: Removed corresponding test ### Core Modifications 1. **LengthFieldStreamingDecoder**: Added BoundedInputStream to support: - Message boundary isolation (prevents reading beyond current message) - mark/reset support required by Hessian2 and other deserializers - Zero-copy optimization by reading directly from accumulate stream 2. **StreamingDecoder.FragmentListener**: Added messageLength parameter to onFragmentMessage() method for better flow control 3. **AbstractTripleClientStream**: Migrated from TriDecoder to GrpcStreamingDecoder, using ByteBufInputStream to adapt Netty's ByteBuf to InputStream 4. **Stream.Listener.onMessage**: Changed parameter from byte[] to InputStream for unified handling and memory optimization 5. **PackableMethod**: Added default parseResponse(InputStream) method for backward compatibility 6. **CompositeInputStream**: Improved stream release logic to prevent exceptions 7. **DescriptorUtils**: Added mark() call before reading stream to support reset ## Architecture Change Before: - Client: ByteBuf -> TriDecoder (Netty-specific) -> byte[] -> deserializer - Server: InputStream -> LengthFieldStreamingDecoder -> byte[] -> deserializer After: - Client: ByteBuf -> ByteBufInputStream -> GrpcStreamingDecoder -> BoundedInputStream -> deserializer - Server: InputStream -> GrpcStreamingDecoder -> BoundedInputStream -> deserializer ## Bug Fix Fixed BoundedInputStream.reset() not restoring the 'remaining' counter, which caused streams to return EOF after mark/reset. This was the root cause of POJO method invocation failures with "Unexpected serialization type:null" error.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## 3.3 #16045 +/- ##
============================================
- Coverage 60.75% 60.74% -0.02%
- Complexity 11751 11755 +4
============================================
Files 1951 1953 +2
Lines 88992 89120 +128
Branches 13418 13443 +25
============================================
+ Hits 54070 54133 +63
- Misses 29362 29423 +61
- Partials 5560 5564 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR refactors the Triple protocol's pack and unpack operations to use streams (InputStream/OutputStream) instead of byte arrays. This change improves memory efficiency by avoiding unnecessary buffering of large messages into memory, and enables more efficient streaming processing.
Changes:
- Replaced byte array-based serialization/deserialization APIs with stream-based APIs in Pack/UnPack/PackableMethod interfaces
- Refactored TriDecoder to use GrpcStreamingDecoder (unified with server-side implementation)
- Updated all Pack/UnPack implementations to support the new stream-based APIs with backward compatibility
- Introduced BoundedInputStream for safe stream reading with mark/reset support
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| Stream.java | Updated Listener.onMessage to accept InputStream with messageLength parameter |
| AbstractTripleClientStream.java | Replaced TriDecoder with GrpcStreamingDecoder and updated to use ByteBufInputStream |
| GrpcStreamingDecoder.java | Implements stream-based message decompression |
| GrpcCompositeCodec.java | Updated encode/decode to use streams instead of byte arrays |
| ReflectionPackableMethod.java | Implemented stream-based pack/unpack methods with backward compatibility |
| TripleCustomerProtocolWrapper.java | Added parseFrom(InputStream) and writeTo(OutputStream) methods with proper varint and field reading |
| PbUnpack.java & PbArrayPacker.java | Implemented stream-based unpack/pack methods |
| LengthFieldStreamingDecoder.java | Added BoundedInputStream for controlled stream reading with mark/reset support |
| Pack.java & UnPack.java | Added new stream-based methods with default implementations and deprecated byte array methods |
| PackableMethod.java | Added stream-based parseRequest/Response and packRequest/Response methods |
| MockClientStreamListener.java | Updated test mock to handle InputStream parameters |
| TriDecoderTest.java & RecordListener.java | Removed obsolete tests for deleted TriDecoder |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int tag = b; | ||
| if ((b & 0x80) != 0) { | ||
| int shift = 7; | ||
| tag = b & 0x7F; | ||
| while (shift < 35) { | ||
| b = inputStream.read(); | ||
| if (b == -1) { | ||
| throw new IOException("Unexpected end of stream while reading tag"); | ||
| } | ||
| tag |= (b & 0x7F) << shift; | ||
| if ((b & 0x80) == 0) { | ||
| break; | ||
| } | ||
| shift += 7; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
The varint parsing logic for tags (lines 208-223) is duplicated and slightly different from the readRawVarint32 method. Consider using readRawVarint32 for both tag and length parsing to reduce duplication and ensure consistency. The current implementation reads the first byte outside the loop and then continues if needed, which could be simplified.
| int tag = b; | |
| if ((b & 0x80) != 0) { | |
| int shift = 7; | |
| tag = b & 0x7F; | |
| while (shift < 35) { | |
| b = inputStream.read(); | |
| if (b == -1) { | |
| throw new IOException("Unexpected end of stream while reading tag"); | |
| } | |
| tag |= (b & 0x7F) << shift; | |
| if ((b & 0x80) == 0) { | |
| break; | |
| } | |
| shift += 7; | |
| } | |
| } | |
| final int firstByte = b; | |
| InputStream tagInputStream = new InputStream() { | |
| private boolean first = true; | |
| @Override | |
| public int read() throws IOException { | |
| if (first) { | |
| first = false; | |
| return firstByte; | |
| } | |
| return inputStream.read(); | |
| } | |
| @Override | |
| public int read(byte[] buffer, int off, int len) throws IOException { | |
| if (buffer == null) { | |
| throw new NullPointerException(); | |
| } | |
| if (off < 0 || len < 0 || len > buffer.length - off) { | |
| throw new IndexOutOfBoundsException(); | |
| } | |
| if (len == 0) { | |
| return 0; | |
| } | |
| if (first) { | |
| first = false; | |
| buffer[off] = (byte) firstByte; | |
| return 1; | |
| } | |
| return inputStream.read(buffer, off, len); | |
| } | |
| }; | |
| int tag = readRawVarint32(tagInputStream); |
| tripleResponseWrapper.serializeType = new String(fieldData); | ||
| } else if (fieldNum == 2) { | ||
| tripleResponseWrapper.data = fieldData; | ||
| } else if (fieldNum == 3) { | ||
| tripleResponseWrapper.type = new String(fieldData); |
There was a problem hiding this comment.
String construction should specify charset explicitly. The current code uses the default platform charset (via new String(byte[])) which may differ across systems. Consider using new String(fieldData, StandardCharsets.UTF_8) to match the encoding used in toByteArray().
| tripleResponseWrapper.serializeType = new String(fieldData); | |
| } else if (fieldNum == 2) { | |
| tripleResponseWrapper.data = fieldData; | |
| } else if (fieldNum == 3) { | |
| tripleResponseWrapper.type = new String(fieldData); | |
| tripleResponseWrapper.serializeType = new String(fieldData, StandardCharsets.UTF_8); | |
| } else if (fieldNum == 2) { | |
| tripleResponseWrapper.data = fieldData; | |
| } else if (fieldNum == 3) { | |
| tripleResponseWrapper.type = new String(fieldData, StandardCharsets.UTF_8); |
| tripleRequestWrapper.serializeType = new String(fieldData); | ||
| } else if (fieldNum == 2) { | ||
| tripleRequestWrapper.args.add(fieldData); | ||
| } else if (fieldNum == 3) { | ||
| tripleRequestWrapper.argTypes.add(new String(fieldData)); |
There was a problem hiding this comment.
String construction should specify charset explicitly. The current code uses the default platform charset (via new String(byte[])) which may differ across systems. Consider using new String(fieldData, StandardCharsets.UTF_8) to match the encoding used in toByteArray().
| tripleRequestWrapper.serializeType = new String(fieldData); | |
| } else if (fieldNum == 2) { | |
| tripleRequestWrapper.args.add(fieldData); | |
| } else if (fieldNum == 3) { | |
| tripleRequestWrapper.argTypes.add(new String(fieldData)); | |
| tripleRequestWrapper.serializeType = new String(fieldData, StandardCharsets.UTF_8); | |
| } else if (fieldNum == 2) { | |
| tripleRequestWrapper.args.add(fieldData); | |
| } else if (fieldNum == 3) { | |
| tripleRequestWrapper.argTypes.add(new String(fieldData, StandardCharsets.UTF_8)); |
...-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/stream/MockClientStreamListener.java
Show resolved
Hide resolved
...tp12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
Show resolved
Hide resolved
| @@ -125,9 +124,9 @@ public static MethodDescriptor findTripleMethodDescriptor( | |||
| ServiceDescriptor serviceDescriptor, String methodName, InputStream rawMessage) throws IOException { | |||
| MethodDescriptor methodDescriptor = findReflectionMethodDescriptor(serviceDescriptor, methodName); | |||
| if (methodDescriptor == null) { | |||
There was a problem hiding this comment.
Using mark with Integer.MAX_VALUE as the readlimit parameter may not work with all InputStream implementations. The BoundedInputStream created in LengthFieldStreamingDecoder overrides mark to ensure sufficient buffer size, but this assumption may not hold for all input streams passed to this method. Consider checking if mark is supported using markSupported() before calling mark, or document that the input stream must support mark/reset.
| if (methodDescriptor == null) { | |
| if (methodDescriptor == null) { | |
| if (!rawMessage.markSupported()) { | |
| throw new IOException("InputStream must support mark/reset to resolve overloaded triple methods"); | |
| } |
...tp12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
Outdated
Show resolved
Hide resolved
...tp12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
Outdated
Show resolved
Hide resolved
...tp12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java
Outdated
Show resolved
Hide resolved
...riple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
Show resolved
Hide resolved
…protocol/tri/stream/AbstractTripleClientStream.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ces for InputStream
This commit unifies the deframer implementation between client and server sides for the Triple protocol, eliminating code duplication and improving maintainability. ## Key Changes ### Deleted Files - TriDecoder.java: Removed client-specific deframer implementation - Deframer.java: Removed client deframer interface - RecordListener.java: Removed test helper - TriDecoderTest.java: Removed corresponding test ### Core Modifications 1. **LengthFieldStreamingDecoder**: Added BoundedInputStream to support: - Message boundary isolation (prevents reading beyond current message) - mark/reset support required by Hessian2 and other deserializers - Zero-copy optimization by reading directly from accumulate stream 2. **StreamingDecoder.FragmentListener**: Added messageLength parameter to onFragmentMessage() method for better flow control 3. **AbstractTripleClientStream**: Migrated from TriDecoder to GrpcStreamingDecoder, using ByteBufInputStream to adapt Netty's ByteBuf to InputStream 4. **Stream.Listener.onMessage**: Changed parameter from byte[] to InputStream for unified handling and memory optimization 5. **PackableMethod**: Added default parseResponse(InputStream) method for backward compatibility 6. **CompositeInputStream**: Improved stream release logic to prevent exceptions 7. **DescriptorUtils**: Added mark() call before reading stream to support reset ## Architecture Change Before: - Client: ByteBuf -> TriDecoder (Netty-specific) -> byte[] -> deserializer - Server: InputStream -> LengthFieldStreamingDecoder -> byte[] -> deserializer After: - Client: ByteBuf -> ByteBufInputStream -> GrpcStreamingDecoder -> BoundedInputStream -> deserializer - Server: InputStream -> GrpcStreamingDecoder -> BoundedInputStream -> deserializer ## Bug Fix Fixed BoundedInputStream.reset() not restoring the 'remaining' counter, which caused streams to return EOF after mark/reset. This was the root cause of POJO method invocation failures with "Unexpected serialization type:null" error.
…ces for InputStream
# Conflicts: # dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java # dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java # dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
# Conflicts: # dubbo-common/src/main/java/org/apache/dubbo/rpc/model/PackableMethod.java # dubbo-metrics/dubbo-metrics-otlp/src/main/java/org/apache/dubbo/metrics/otlp/OtlpMetricsReporterFactory.java # dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DescriptorUtils.java
| if ((b & 0x80) != 0) { | ||
| int shift = 7; | ||
| tag = b & 0x7F; | ||
| while (shift < 35) { |
There was a problem hiding this comment.
There is duplicate code here—please extract it into a separate method.
There was a problem hiding this comment.
We will remove the byte-based methods in subsequent versions.
| } | ||
| tag |= (b & 0x7F) << shift; | ||
| if ((b & 0x80) == 0) { | ||
| break; |
There was a problem hiding this comment.
Have we encoded it using this method?
There was a problem hiding this comment.
To avoid relying on the Protobuf (PB) dependency, we implemented the PB encoding structure directly ourselves.
What is the purpose of the change?
The current PR is based on #16041, and PR #16041 needs to be merged first.
Checklist