Skip to content

Commit 0989b9d

Browse files
committed
Add zstd compression and decompression for pekko streams
1 parent 00b44f3 commit 0989b9d

File tree

18 files changed

+835
-12
lines changed

18 files changed

+835
-12
lines changed

.scala-steward.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ updates.pin = [
88
# agrona major+minor version should match the one brought
99
# in by aeron
1010
{ groupId = "org.agrona", artifactId = "agrona", version = "2.2." }
11+
# zstd-jni upgrades are fine as long as they are backwards compatible
12+
{ groupId = "com.github.luben", artifactId = "zstd-jni", version = "1."}
1113
]
1214

1315
updates.ignore = [

actor/src/main/java/org/apache/pekko/io/ByteBufferCleaner.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.lang.invoke.MethodHandles;
2424
import java.lang.reflect.Field;
2525
import java.nio.ByteBuffer;
26+
import org.apache.pekko.annotation.InternalStableApi;
2627

2728
/**
2829
* Cleans a direct {@link ByteBuffer}. Without manual intervention, direct ByteBuffers will be
@@ -36,7 +37,8 @@
3637
*
3738
* <p>See <a href=https://bugs.openjdk.java.net/browse/JDK-4724038>JDK-4724038</a>
3839
*/
39-
final class ByteBufferCleaner {
40+
@InternalStableApi
41+
public final class ByteBufferCleaner {
4042

4143
// adapted from
4244
// https://github.com/apache/commons-io/blob/441115a4b5cd63ae808dd4c40fc238cb52c8048f/src/main/java/org/apache/commons/io/input/ByteBufferCleaner.java
@@ -75,7 +77,7 @@ public void clean(final ByteBuffer buffer) throws Throwable {
7577
* @param buffer to release.
7678
* @throws IllegalStateException on internal failure.
7779
*/
78-
static void clean(final ByteBuffer buffer) {
80+
public static void clean(final ByteBuffer buffer) {
7981
try {
8082
INSTANCE.clean(buffer);
8183
} catch (final Throwable t) {
@@ -116,7 +118,7 @@ private static Cleaner getCleaner() {
116118
*
117119
* @return {@code true} if cleaning is supported, {@code false} otherwise.
118120
*/
119-
static boolean isSupported() {
121+
public static boolean isSupported() {
120122
return INSTANCE != null;
121123
}
122124
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Compression.zstd
2+
3+
Creates a flow that zstd-compresses a stream of ByteStrings.
4+
5+
@ref[Compression operators](../index.md#compression-operators)
6+
7+
## Signature
8+
9+
@apidoc[Compression.zstd](stream.*.Compression$) { scala="#zstd:org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstd()" }
10+
11+
## Description
12+
13+
Creates a flow that zstd-compresses a stream of ByteStrings. Note that the compressor
14+
will SYNC_FLUSH after every @apidoc[util.ByteString] so that it is guaranteed that every @apidoc[util.ByteString]
15+
coming out of the flow can be fully decompressed without waiting for additional data. This may
16+
come at a compression performance cost for very small chunks.
17+
18+
Use the overload method to control the compression level.
19+
20+
## Reactive Streams semantics
21+
22+
@@@div { .callout }
23+
24+
**emits** when the compression algorithm produces output for the received `ByteString`
25+
26+
**backpressures** when downstream backpressures
27+
28+
**completes** when upstream completes
29+
30+
@@@
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Compression.zstdDecompress
2+
3+
Creates a flow that zstd-decompresses a stream of ByteStrings.
4+
5+
@ref[Compression operators](../index.md#compression-operators)
6+
7+
## Signature
8+
9+
@apidoc[Compression.zstdDecompress](stream.*.Compression$) { scala="#zstdDecompress(maxBytesPerChunk:Int):org.apache.pekko.stream.scaladsl.Flow[org.apache.pekko.util.ByteString,org.apache.pekko.util.ByteString,org.apache.pekko.NotUsed]" java="#zstdDecompress(int)" }
10+
11+
## Description
12+
13+
Creates a flow that zstd-decompresses a stream of ByteStrings. If the input is truncated, uses invalid
14+
compression method or is invalid (failed CRC checks) this operator fails with a `com.github.luben.zstd.ZstdIOException`.
15+
16+
## Reactive Streams semantics
17+
18+
@@@div { .callout }
19+
20+
**emits** when the decompression algorithm produces output for the received `ByteString` (the emitted `ByteString` is of `maxBytesPerChunk` maximum length)
21+
22+
**backpressures** when downstream backpressures
23+
24+
**completes** when upstream completes
25+
26+
@@@

project/Dependencies.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,8 @@ object Dependencies {
352352

353353
// pekko stream
354354

355-
lazy val stream = l ++= Seq[sbt.ModuleID](reactiveStreams, TestDependencies.scalatest)
355+
lazy val stream =
356+
l ++= Seq[sbt.ModuleID](reactiveStreams, "com.github.luben" % "zstd-jni" % "1.5.7-6", TestDependencies.scalatest)
356357

357358
lazy val streamTestkit = l ++= Seq(
358359
TestDependencies.scalatest,

stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ package org.apache.pekko.stream.io.compression
1515

1616
import java.io.{ ByteArrayOutputStream, InputStream, OutputStream }
1717
import java.util.concurrent.ThreadLocalRandom
18-
import java.util.zip.DataFormatException
1918

2019
import scala.annotation.tailrec
2120
import scala.concurrent.Await
2221
import scala.concurrent.duration._
22+
import scala.reflect.ClassTag
2323
import scala.util.control.NoStackTrace
2424

2525
import org.apache.pekko
@@ -31,7 +31,8 @@ import pekko.util.ByteString
3131
import org.scalatest.Inspectors
3232
import org.scalatest.wordspec.AnyWordSpec
3333

34-
abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSupport with Inspectors {
34+
abstract class CoderSpec[CorruptInputException: ClassTag](codecName: String) extends AnyWordSpec with CodecSpecSupport
35+
with Inspectors {
3536
import CompressionTestingTools._
3637

3738
protected def newCompressor(): Compressor
@@ -85,7 +86,7 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu
8586
"throw an error on corrupt input" in {
8687
(the[RuntimeException] thrownBy {
8788
ourDecode(corruptContent)
88-
}).ultimateCause should be(a[DataFormatException])
89+
}).ultimateCause should be(a[CorruptInputException])
8990
}
9091
}
9192

stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import pekko.stream.impl.io.compression.{ Compressor, DeflateCompressor }
2121
import pekko.stream.scaladsl.{ Compression, Flow }
2222
import pekko.util.ByteString
2323

24-
class DeflateSpec extends CoderSpec("deflate") {
24+
class DeflateSpec extends CoderSpec[DataFormatException]("deflate") {
2525
import CompressionTestingTools._
2626

2727
protected def newCompressor(): Compressor = new DeflateCompressor

stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ package org.apache.pekko.stream.io.compression
1515

1616
import java.io.{ InputStream, OutputStream }
1717
import java.nio.charset.StandardCharsets
18-
import java.util.zip.{ GZIPInputStream, GZIPOutputStream, ZipException }
18+
import java.util.zip.{ DataFormatException, GZIPInputStream, GZIPOutputStream, ZipException }
1919

2020
import org.apache.pekko
2121
import pekko.stream.impl.io.compression.{ Compressor, GzipCompressor }
2222
import pekko.stream.scaladsl.{ Compression, Flow }
2323
import pekko.util.ByteString
2424

25-
class GzipSpec extends CoderSpec("gzip") {
25+
class GzipSpec extends CoderSpec[DataFormatException]("gzip") {
2626
import CompressionTestingTools._
2727

2828
protected def newCompressor(): Compressor = new GzipCompressor
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.io.compression
19+
20+
import org.apache.pekko
21+
import pekko.stream.scaladsl.{ Compression, Flow }
22+
import pekko.util.ByteString
23+
24+
class ZstdAutoFlushSpec extends ZstdSpec {
25+
override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
26+
Compression.zstd(Compression.ZstdDefaultCompressionLevel, dictionary = None, autoFlush = false)
27+
28+
override protected val autoFlush: Boolean = false
29+
30+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.io.compression
19+
20+
import com.github.luben.zstd.{ ZstdIOException, ZstdInputStream, ZstdOutputStream }
21+
22+
import org.apache.pekko
23+
import pekko.stream.impl.io.compression.{ Compressor, ZstdCompressor }
24+
import pekko.stream.scaladsl.{ Compression, Flow }
25+
import pekko.util.ByteString
26+
27+
import java.io.{ InputStream, OutputStream }
28+
29+
class ZstdSpec extends CoderSpec[ZstdIOException]("zstd") {
30+
import CompressionTestingTools._
31+
32+
override protected def newCompressor(): Compressor = new ZstdCompressor
33+
34+
override protected def encoderFlow: Flow[ByteString, ByteString, Any] = Compression.zstd
35+
36+
override protected def decoderFlow(maxBytesPerChunk: Int): Flow[ByteString, ByteString, Any] =
37+
Compression.zstdDecompress(maxBytesPerChunk)
38+
39+
override protected def newDecodedInputStream(underlying: InputStream): InputStream =
40+
new ZstdInputStream(underlying)
41+
42+
override protected def newEncodedOutputStream(underlying: OutputStream): OutputStream =
43+
new ZstdOutputStream(underlying)
44+
45+
override def extraTests(): Unit = {
46+
"decode concatenated compressions" in {
47+
ourDecode(Seq(encode("Hello, "), encode("dear "), encode("User!")).join) should readAs("Hello, dear User!")
48+
}
49+
}
50+
}

0 commit comments

Comments
 (0)