Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
*/
package org.apache.camel.dataformat.zipfile;

import java.io.*;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Objects;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.converter.stream.CachedOutputStream;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.util.IOHelper;
import org.apache.commons.compress.archivers.ArchiveException;
Expand Down Expand Up @@ -57,7 +62,7 @@ public ZipIterator(Exchange exchange, InputStream inputStream) {
zipInputStream = zipArchiveInputStream;
} else {
try {
ArchiveInputStream input = new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.ZIP,
ArchiveInputStream<?> input = new ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.ZIP,
new BufferedInputStream(inputStream));
zipInputStream = (ZipArchiveInputStream) input;
} catch (ArchiveException e) {
Expand Down Expand Up @@ -143,10 +148,9 @@ private Message getNextElement() {
return getNextElement(); // skip directory
}
} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
IOHelper.copy(zipInputStream, baos);
byte[] data = baos.toByteArray();
answer.setBody(new ByteArrayInputStream(data));
CachedOutputStream cos = new CachedOutputStream(exchange);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who is supposed to close this stream?

@davsclaus

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the zip iterator should close it when its done, I would assume when itself is closed - the COS may spool to disk and therefore need to be closed so it can delete the temp file

IOHelper.copy(zipInputStream, cos);
answer.setBody(cos.getInputStream());
}

return answer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;

public class ZipFileMultipleFilesSplitterTest extends ZipSplitterRouteTest {
public class ZipFileMultipleFilesSplitterTest extends CamelTestSupport {
static final String PROCESSED_FILES_HEADER_NAME = "processedFiles";

@Override
@Test
public void testSplitter() throws InterruptedException {
MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
Expand Down Expand Up @@ -80,5 +80,4 @@ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,174 @@
*/
package org.apache.camel.dataformat.zipfile;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

public class ZipSplitterRouteTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ZipIterator.class);

private static final File testDirectory = new File("test/in");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use JUnit TempDir?


private final AtomicInteger memoryMbBefore = new AtomicInteger(0);
private final AtomicInteger memoryMbInside = new AtomicInteger(0);

@Test
public void testSplitter() throws InterruptedException {
public void testSplitter() throws InterruptedException, IOException {
File srcFile = new File("src/test/resources/org/apache/camel/dataformat/zipfile/data/resources.zip");
File testFile = new File(testDirectory, srcFile.getName());

FileUtils.copyFile(srcFile, testFile);

MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
processZipEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola", "another_chiau", "another_hi");
MockEndpoint.assertIsSatisfied(context);
}

/**
* Test that ZipSplitter doesn't read the whole files in the zip file into memory when Spool is Enabled in the
* Stream Caching Strategy
*/
@Test
public void testSplitterLargeFileWithSpoolEnabled() throws InterruptedException, IOException {
File testFile = new File(testDirectory, "large1.zip");

int diff = testSplitterLargeFile(testFile);

assertThat("Memory spike detected! " + diff + "MB increased.", diff, lessThan(10));
}

/**
* Test that ZipSplitter read the whole files in the zip file into memory when Spool is Disabled in the Stream
* Caching Strategy
*/
@Test
public void testSplitterLargeFileWithoutSpoolEnabled() throws InterruptedException, IOException {
File testFile = new File(testDirectory, "large2.zip");

int diff = testSplitterLargeFile(testFile);

assertThat("Memory spike detected! " + diff + "MB increased.", diff, greaterThan(10));
}

private int testSplitterLargeFile(File testFile) throws IOException, FileNotFoundException, InterruptedException {
String expectedBody = null;

System.out.println("Generating 50MB test file...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove or change to log.info

try (OutputStream os = new FileOutputStream(testFile);
ZipOutputStream zos = new ZipOutputStream(os)) {
zos.putNextEntry(new ZipEntry("test.txt"));
byte[] chunk = new byte[1024 * 1024];
Arrays.fill(chunk, (byte) 'A');

expectedBody = new String(chunk, 0, 20);
for (int i = 1; i <= 50; i++) {
zos.write(chunk);
}
zos.closeEntry();
zos.flush();
}

MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
processZipEntry.expectedBodiesReceivedInAnyOrder(expectedBody);
MockEndpoint.assertIsSatisfied(context);

int before = memoryMbBefore.get();
int inside = memoryMbInside.get();
int diff = inside - before;

LOG.info("Memory before {}MB, inside {}MB & diff {}MB", before, inside, diff);

return diff;
}

@Override
protected void setupResources() {
if (testDirectory.exists()) {
try {
FileUtils.deleteDirectory(testDirectory);
} catch (IOException e) {
LOG.warn("Failed to delete test directory: " + testDirectory, e);
}
}

if (!testDirectory.mkdirs()) {
LOG.warn("Failed to create test directory: {}", testDirectory);
}
}

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();

if ("testSplitterLargeFileWithSpoolEnabled()".equals(contextManagerExtension.getCurrentTestName())) {
StreamCachingStrategy streamCachingStrategy = context.getStreamCachingStrategy();
streamCachingStrategy.setSpoolEnabled(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default spool threshold is 128kb

}

return context;
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
// Unzip file and Split it according to FileEntry
from("file:src/test/resources/org/apache/camel/dataformat/zipfile/data?delay=1000&noop=true")
from("file:test/in")
.process(exchange -> captureMemory(memoryMbBefore, "BEFORE"))
.log("Start processing big file: ${header.CamelFileName}")
.split(new ZipSplitter()).streaming()
.setBody().message(message -> { // Convert up to 20 bytes of body to string
try {
InputStream is = message.getBody(InputStream.class);
byte buf[] = new byte[20];
int bytesRead = is.read(buf);
captureMemory(memoryMbInside, "INSIDE");
return new String(buf, 0, bytesRead);
} catch (IOException e) {
throw new RuntimeException("Failed to convert body to String", e);
}
}).to("mock:processZipEntry")
.to("log:entry")
.convertBodyTo(String.class).to("mock:processZipEntry")
.end()
.log("Done processing big file: ${header.CamelFileName}");
}
};

}

private void captureMemory(AtomicInteger storage, String logPrefix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.gc() is not reliable, I'd remove this captureMemory()

System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}

Runtime runtime = Runtime.getRuntime();
long used = (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024);

storage.set((int) used);
LOG.info("{}: {}MB", logPrefix, used);
}
}