From 9a5522880c26c2a6dbf3b2a21cb8980b8b037bc7 Mon Sep 17 00:00:00 2001 From: Riku Uehara Date: Fri, 7 Mar 2025 08:12:53 +0000 Subject: [PATCH 1/7] Update last_path encoding to use varint length field --- .../org/embulk/input/gcs/GcsFileInput.java | 34 +++++++++++++++---- .../embulk/input/gcs/GcsFileInputPlugin.java | 7 ++-- .../input/gcs/TestGcsFileInputPlugin.java | 4 +-- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInput.java b/src/main/java/org/embulk/input/gcs/GcsFileInput.java index 2ae132e..72801b0 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInput.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInput.java @@ -91,28 +91,50 @@ static FileList listFiles(final PluginTask task) { // String nextToken = base64Encode(0x0a + ASCII character according to utf8EncodeLength position+ filePath); static String base64Encode(final String path) { + byte[] lengthVarint; byte[] encoding; byte[] utf8 = path.getBytes(StandardCharsets.UTF_8); LOG.debug("path string: {} ,path length:{} \" + ", path, utf8.length); int utf8EncodeLength = utf8.length; - if (utf8EncodeLength >= 128) { + if (utf8EncodeLength >= 65_535) { throw new ConfigException(String.format("last_path '%s' is too long to encode. Please try to reduce its length", path)); } - encoding = new byte[utf8.length + 2]; + lengthVarint = encodeVarint(utf8EncodeLength); + encoding = new byte[1 + lengthVarint.length + utf8.length]; encoding[0] = 0x0a; - // for example: 60 -> '<' - char temp = (char) utf8EncodeLength; - encoding[1] = (byte) temp; - System.arraycopy(utf8, 0, encoding, 2, utf8.length); + System.arraycopy(lengthVarint, 0, encoding, 1, lengthVarint.length); + System.arraycopy(utf8, 0, encoding, 1 + lengthVarint.length, utf8.length); final String s = Base64.getEncoder().encodeToString(encoding); LOG.debug("last_path(base64 encoded): {}", s); return s; } + // see: https://protobuf.dev/programming-guides/encoding/#varints + private static byte[] encodeVarint(int value) + { + // utf8EncodeLength.length is up to 65535, so 2 bytes are enough for buffer + byte[] buffer = new byte[2]; + int pos = 0; + while (true) { + int bits = value & 0x7F; + value >>>= 7; + if (value != 0) { + buffer[pos++] = (byte) (bits | 0x80); + } + else { + buffer[pos++] = (byte) bits; + break; + } + } + byte[] result = new byte[pos]; + System.arraycopy(buffer, 0, result, 0, pos); + return result; + } + private static void printBucketInfo(final Storage client, final String bucket) { // get Bucket Storage.BucketGetOption fields = Storage.BucketGetOption.fields( diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java b/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java index 46c497c..82884a9 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java @@ -17,6 +17,7 @@ package org.embulk.input.gcs; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; import org.embulk.config.ConfigDiff; @@ -64,10 +65,10 @@ public ConfigDiff transaction(final ConfigSource config, final FileInputPlugin.C } } - // @see https://cloud.google.com/storage/docs/bucket-naming + // @see https://cloud.google.com/storage/docs/objects#naming if (task.getLastPath().isPresent()) { - if (task.getLastPath().get().length() >= 128) { - throw new ConfigException("last_path length is allowed up to 127 characters"); + if (task.getLastPath().get().getBytes(StandardCharsets.UTF_8).length >= 1025) { + throw new ConfigException("last_path length is allowed up to 1024 bytes."); } } diff --git a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java index 2242e68..607da70 100644 --- a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java @@ -394,8 +394,8 @@ public void testBase64() { assertEquals("CgJjMg==", GcsFileInput.base64Encode("c2")); assertEquals("Cgh0ZXN0LmNzdg==", GcsFileInput.base64Encode("test.csv")); assertEquals("ChZnY3MtdGVzdC9zYW1wbGVfMDEuY3N2", GcsFileInput.base64Encode("gcs-test/sample_01.csv")); - String params = "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc127"; - String expected = "Cn9jY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjMTI3"; + String params = "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc200"; + String expected = "CsgBY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2MyMDA="; assertEquals(expected, GcsFileInput.base64Encode(params)); params = "テストダミー/テス123/テストダミー/テストダミ.csv"; From db7e7b97aa050328abcf7e8f785be2f5b52a6cdc Mon Sep 17 00:00:00 2001 From: r-uehara0219 Date: Thu, 13 Mar 2025 19:56:22 +0900 Subject: [PATCH 2/7] Update last_path length validation to align with GCP object name limits --- src/main/java/org/embulk/input/gcs/GcsFileInput.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInput.java b/src/main/java/org/embulk/input/gcs/GcsFileInput.java index 72801b0..6de82d8 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInput.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInput.java @@ -97,8 +97,10 @@ static String base64Encode(final String path) { LOG.debug("path string: {} ,path length:{} \" + ", path, utf8.length); int utf8EncodeLength = utf8.length; - if (utf8EncodeLength >= 65_535) { - throw new ConfigException(String.format("last_path '%s' is too long to encode. Please try to reduce its length", path)); + // GCP object names can be up to 1024 bytes in length. + // This limit aligns with task.getLastPath() expectations. + if (utf8EncodeLength >= 1025) { + throw new ConfigException(String.format("last_path '%s' is too long to encode. Maximum allowed is 1024 bytes", path)); } lengthVarint = encodeVarint(utf8EncodeLength); From 9f691e6b491af347542c3e097feb521cc9fd3eb7 Mon Sep 17 00:00:00 2001 From: r-uehara0219 Date: Thu, 13 Mar 2025 19:57:25 +0900 Subject: [PATCH 3/7] Add unit test for encodeVarint method in GcsFileInput --- .../input/gcs/TestGcsFileInputPlugin.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java index 607da70..17ba3fc 100644 --- a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java @@ -18,6 +18,7 @@ import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER; import static org.embulk.input.gcs.GcsFileInputPlugin.CONFIG_MAPPER_FACTORY; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeNotNull; @@ -30,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.lang.reflect.Method; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -403,6 +405,29 @@ public void testBase64() { assertEquals(expected, GcsFileInput.base64Encode(params)); } + @Test + public void testEncodeVarint() throws Exception + { + Method encodeVarintMethod = GcsFileInput.class.getDeclaredMethod("encodeVarint", int.class); + encodeVarintMethod.setAccessible(true); + + byte[] expected1 = new byte[]{0x01}; + byte[] result1 = (byte[]) encodeVarintMethod.invoke(null, 1); + assertArrayEquals("encodeVarint(1) should return {0x01}", expected1, result1); + + byte[] expected127 = new byte[]{0x7F}; + byte[] result127 = (byte[]) encodeVarintMethod.invoke(null, 127); + assertArrayEquals("encodeVarint(127) should return {0x7F}", expected127, result127); + + byte[] expected128 = new byte[]{(byte) 0x80, 0x01}; + byte[] result128 = (byte[]) encodeVarintMethod.invoke(null, 128); + assertArrayEquals("encodeVarint(128) should return {0x80, 0x01}", expected128, result128); + + byte[] expected1024 = new byte[]{(byte) 0x80, 0x08}; + byte[] result1024 = (byte[]) encodeVarintMethod.invoke(null, 1024); + assertArrayEquals("encodeVarint(1024) should return {0x80, 0x08}", expected1024, result1024); + } + private ConfigSource config() { ConfigSource config = CONFIG_MAPPER_FACTORY.newConfigSource() .set("bucket", GCP_BUCKET) From 33b526c05ad6ed17a0e5585a84760965913dbe13 Mon Sep 17 00:00:00 2001 From: r-uehara0219 Date: Thu, 13 Mar 2025 20:05:14 +0900 Subject: [PATCH 4/7] stylecheck --- src/main/java/org/embulk/input/gcs/GcsFileInput.java | 10 ++++------ .../org/embulk/input/gcs/TestGcsFileInputPlugin.java | 3 +-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInput.java b/src/main/java/org/embulk/input/gcs/GcsFileInput.java index 6de82d8..ceaf782 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInput.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInput.java @@ -91,8 +91,6 @@ static FileList listFiles(final PluginTask task) { // String nextToken = base64Encode(0x0a + ASCII character according to utf8EncodeLength position+ filePath); static String base64Encode(final String path) { - byte[] lengthVarint; - byte[] encoding; byte[] utf8 = path.getBytes(StandardCharsets.UTF_8); LOG.debug("path string: {} ,path length:{} \" + ", path, utf8.length); @@ -103,6 +101,8 @@ static String base64Encode(final String path) { throw new ConfigException(String.format("last_path '%s' is too long to encode. Maximum allowed is 1024 bytes", path)); } + byte[] lengthVarint; + byte[] encoding; lengthVarint = encodeVarint(utf8EncodeLength); encoding = new byte[1 + lengthVarint.length + utf8.length]; encoding[0] = 0x0a; @@ -116,8 +116,7 @@ static String base64Encode(final String path) { } // see: https://protobuf.dev/programming-guides/encoding/#varints - private static byte[] encodeVarint(int value) - { + private static byte[] encodeVarint(int value) { // utf8EncodeLength.length is up to 65535, so 2 bytes are enough for buffer byte[] buffer = new byte[2]; int pos = 0; @@ -126,8 +125,7 @@ private static byte[] encodeVarint(int value) value >>>= 7; if (value != 0) { buffer[pos++] = (byte) (bits | 0x80); - } - else { + } else { buffer[pos++] = (byte) bits; break; } diff --git a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java index 17ba3fc..29b4346 100644 --- a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java @@ -406,8 +406,7 @@ public void testBase64() { } @Test - public void testEncodeVarint() throws Exception - { + public void testEncodeVarint() throws Exception { Method encodeVarintMethod = GcsFileInput.class.getDeclaredMethod("encodeVarint", int.class); encodeVarintMethod.setAccessible(true); From d398e280db27ce4481523e72606628927c545c3c Mon Sep 17 00:00:00 2001 From: Riku Uehara <58354448+r-uehara0219@users.noreply.github.com> Date: Thu, 13 Mar 2025 20:32:56 +0900 Subject: [PATCH 5/7] Update src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java Co-authored-by: Dai MIKURUBE --- src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java b/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java index 82884a9..50b7846 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInputPlugin.java @@ -68,7 +68,7 @@ public ConfigDiff transaction(final ConfigSource config, final FileInputPlugin.C // @see https://cloud.google.com/storage/docs/objects#naming if (task.getLastPath().isPresent()) { if (task.getLastPath().get().getBytes(StandardCharsets.UTF_8).length >= 1025) { - throw new ConfigException("last_path length is allowed up to 1024 bytes."); + throw new ConfigException("last_path is too long, which can contain a maximum of 1024 bytes encoded in UTF-8."); } } From dea553bc2a2b87cdbe782c17391419ab713b9b60 Mon Sep 17 00:00:00 2001 From: r-uehara0219 Date: Fri, 14 Mar 2025 11:10:07 +0900 Subject: [PATCH 6/7] Make encodeVarint method package-private and modify unit test --- .../java/org/embulk/input/gcs/GcsFileInput.java | 2 +- .../embulk/input/gcs/TestGcsFileInputPlugin.java | 14 +++++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/embulk/input/gcs/GcsFileInput.java b/src/main/java/org/embulk/input/gcs/GcsFileInput.java index ceaf782..0508f8e 100644 --- a/src/main/java/org/embulk/input/gcs/GcsFileInput.java +++ b/src/main/java/org/embulk/input/gcs/GcsFileInput.java @@ -116,7 +116,7 @@ static String base64Encode(final String path) { } // see: https://protobuf.dev/programming-guides/encoding/#varints - private static byte[] encodeVarint(int value) { + static byte[] encodeVarint(int value) { // utf8EncodeLength.length is up to 65535, so 2 bytes are enough for buffer byte[] buffer = new byte[2]; int pos = 0; diff --git a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java index 29b4346..332fb64 100644 --- a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.lang.reflect.Method; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -406,24 +405,21 @@ public void testBase64() { } @Test - public void testEncodeVarint() throws Exception { - Method encodeVarintMethod = GcsFileInput.class.getDeclaredMethod("encodeVarint", int.class); - encodeVarintMethod.setAccessible(true); - + public void testEncodeVarint() { byte[] expected1 = new byte[]{0x01}; - byte[] result1 = (byte[]) encodeVarintMethod.invoke(null, 1); + byte[] result1 = GcsFileInput.encodeVarint(1); assertArrayEquals("encodeVarint(1) should return {0x01}", expected1, result1); byte[] expected127 = new byte[]{0x7F}; - byte[] result127 = (byte[]) encodeVarintMethod.invoke(null, 127); + byte[] result127 = GcsFileInput.encodeVarint(127); assertArrayEquals("encodeVarint(127) should return {0x7F}", expected127, result127); byte[] expected128 = new byte[]{(byte) 0x80, 0x01}; - byte[] result128 = (byte[]) encodeVarintMethod.invoke(null, 128); + byte[] result128 = GcsFileInput.encodeVarint(128); assertArrayEquals("encodeVarint(128) should return {0x80, 0x01}", expected128, result128); byte[] expected1024 = new byte[]{(byte) 0x80, 0x08}; - byte[] result1024 = (byte[]) encodeVarintMethod.invoke(null, 1024); + byte[] result1024 = GcsFileInput.encodeVarint(1024); assertArrayEquals("encodeVarint(1024) should return {0x80, 0x08}", expected1024, result1024); } From df160ffa7eb68d7346efd2531e2df56160fa26a9 Mon Sep 17 00:00:00 2001 From: r-uehara0219 Date: Fri, 14 Mar 2025 12:51:12 +0900 Subject: [PATCH 7/7] Fix expected value in base64Encode test for GcsFileInput --- src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java index 332fb64..5ee29fc 100644 --- a/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java +++ b/src/test/java/org/embulk/input/gcs/TestGcsFileInputPlugin.java @@ -396,7 +396,7 @@ public void testBase64() { assertEquals("Cgh0ZXN0LmNzdg==", GcsFileInput.base64Encode("test.csv")); assertEquals("ChZnY3MtdGVzdC9zYW1wbGVfMDEuY3N2", GcsFileInput.base64Encode("gcs-test/sample_01.csv")); String params = "ccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc200"; - String expected = "CsgBY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2MyMDA="; + String expected = "CsgBY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2NjY2MyMDA="; assertEquals(expected, GcsFileInput.base64Encode(params)); params = "テストダミー/テス123/テストダミー/テストダミ.csv";