Skip to content

Commit 9bfbce0

Browse files
committed
Revert "OAK-12070 - Reduce memory consumption of azure segment stores (#2699)"
This reverts commit 082c3a3.
1 parent 0861c05 commit 9bfbce0

File tree

15 files changed

+142
-475
lines changed

15 files changed

+142
-475
lines changed

oak-segment-aws/src/main/java/org/apache/jackrabbit/oak/segment/aws/AwsSegmentArchiveReader.java

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020

2121
import java.io.File;
2222
import java.io.IOException;
23-
import java.util.Iterator;
24-
import java.util.NoSuchElementException;
23+
import java.util.UUID;
2524

2625
import org.apache.jackrabbit.oak.commons.Buffer;
2726
import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveReader;
@@ -32,35 +31,47 @@ public class AwsSegmentArchiveReader extends AbstractRemoteSegmentArchiveReader
3231

3332
private final S3Directory directory;
3433

34+
private final String archiveName;
35+
36+
private final long length;
37+
3538
AwsSegmentArchiveReader(S3Directory directory, String archiveName, IOMonitor ioMonitor) throws IOException {
36-
super(ioMonitor, archiveName, createEntryIterable(directory, archiveName));
39+
super(ioMonitor);
3740
this.directory = directory;
41+
this.archiveName = archiveName;
42+
this.length = computeArchiveIndexAndLength();
43+
}
44+
45+
@Override
46+
public long length() {
47+
return length;
48+
}
49+
50+
@Override
51+
public String getName() {
52+
return archiveName;
3853
}
3954

40-
private static Iterable<ArchiveEntry> createEntryIterable(S3Directory directory, String archiveName) throws IOException{
55+
@Override
56+
protected long computeArchiveIndexAndLength() throws IOException {
57+
long length = 0;
4158
Buffer buffer = directory.readObjectToBuffer(archiveName + ".idx", OFF_HEAP);
42-
return () -> new Iterator<>() {
43-
@Override
44-
public boolean hasNext() {
45-
return buffer.hasRemaining();
46-
}
47-
48-
@Override
49-
public ArchiveEntry next() {
50-
if (!hasNext()) {
51-
throw new NoSuchElementException();
52-
}
53-
54-
long msb = buffer.getLong();
55-
long lsb = buffer.getLong();
56-
int position = buffer.getInt();
57-
int contentLength = buffer.getInt();
58-
int generation = buffer.getInt();
59-
int fullGeneration = buffer.getInt();
60-
boolean compacted = buffer.get() != 0;
61-
return new ArchiveEntry(new RemoteSegmentArchiveEntry(msb, lsb, position, contentLength, generation, fullGeneration, compacted));
62-
}
63-
};
59+
while (buffer.hasRemaining()) {
60+
long msb = buffer.getLong();
61+
long lsb = buffer.getLong();
62+
int position = buffer.getInt();
63+
int contentLength = buffer.getInt();
64+
int generation = buffer.getInt();
65+
int fullGeneration = buffer.getInt();
66+
boolean compacted = buffer.get() != 0;
67+
68+
RemoteSegmentArchiveEntry indexEntry = new RemoteSegmentArchiveEntry(msb, lsb, position, contentLength,
69+
generation, fullGeneration, compacted);
70+
index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
71+
length += contentLength;
72+
}
73+
74+
return length;
6475
}
6576

6677
@Override

oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureSegmentArchiveReader.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,47 +17,65 @@
1717
package org.apache.jackrabbit.oak.segment.azure;
1818

1919
import com.azure.storage.blob.BlobContainerClient;
20+
import com.azure.storage.blob.models.BlobItem;
2021
import com.azure.storage.blob.models.BlobStorageException;
2122
import com.azure.storage.blob.models.ListBlobsOptions;
2223
import com.azure.storage.blob.specialized.BlockBlobClient;
2324
import org.apache.jackrabbit.oak.commons.Buffer;
2425
import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveReader;
26+
import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
2527
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
26-
import org.jetbrains.annotations.NotNull;
2728

2829
import java.io.File;
2930
import java.io.IOException;
3031
import java.util.Map;
32+
import java.util.UUID;
3133

3234
import static org.apache.jackrabbit.oak.segment.azure.AzureUtilities.readBufferFully;
3335

3436
public class AzureSegmentArchiveReader extends AbstractRemoteSegmentArchiveReader {
3537

3638
private final BlobContainerClient blobContainerClient;
3739

40+
private final long length;
41+
42+
private final String archiveName;
43+
3844
private final String archivePathPrefix;
3945

40-
AzureSegmentArchiveReader(BlobContainerClient blobContainerClient, String rootPrefix, String archiveName, IOMonitor ioMonitor) {
41-
super(ioMonitor, AzureUtilities.ensureNoTrailingSlash(archiveName),
42-
createEntryIterable(blobContainerClient, AzureUtilities.asAzurePrefix(rootPrefix, archiveName)));
46+
AzureSegmentArchiveReader(BlobContainerClient blobContainerClient, String rootPrefix, String archiveName, IOMonitor ioMonitor) throws IOException {
47+
super(ioMonitor);
4348
this.blobContainerClient = blobContainerClient;
49+
this.archiveName = AzureUtilities.ensureNoTrailingSlash(archiveName);
4450
this.archivePathPrefix = AzureUtilities.asAzurePrefix(rootPrefix, archiveName);
51+
this.length = computeArchiveIndexAndLength();
52+
}
53+
54+
@Override
55+
public long length() {
56+
return length;
57+
}
58+
59+
@Override
60+
public String getName() {
61+
return archiveName;
4562
}
4663

47-
private static Iterable<ArchiveEntry> createEntryIterable(BlobContainerClient blobContainerClient, @NotNull String archivePathPrefix) {
64+
@Override
65+
protected long computeArchiveIndexAndLength() throws IOException {
66+
long length = 0;
4867
ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
4968
listBlobsOptions.setPrefix(archivePathPrefix);
50-
return AzureUtilities.getBlobs(blobContainerClient, listBlobsOptions).stream()
51-
.map(blobItem -> {
52-
Map<String, String> metadata = blobItem.getMetadata();
53-
int length = blobItem.getProperties().getContentLength().intValue();
54-
if (AzureBlobMetadata.isSegment(metadata)) {
55-
return new ArchiveEntry(AzureBlobMetadata.toIndexEntry(metadata, length));
56-
} else {
57-
return new ArchiveEntry(length);
58-
}
59-
})
60-
::iterator;
69+
for (BlobItem blob : AzureUtilities.getBlobs(blobContainerClient, listBlobsOptions)) {
70+
Map<String, String> metadata = blob.getMetadata();
71+
if (AzureBlobMetadata.isSegment(metadata)) {
72+
RemoteSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, blob.getProperties().getContentLength().intValue());
73+
index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
74+
}
75+
length += blob.getProperties().getContentLength();
76+
}
77+
78+
return length;
6179
}
6280

6381
@Override

oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/v8/AzureSegmentArchiveReaderV8.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,37 +22,54 @@
2222
import java.io.IOException;
2323
import java.net.URISyntaxException;
2424
import java.util.Map;
25+
import java.util.UUID;
2526

2627
import com.microsoft.azure.storage.StorageException;
28+
import com.microsoft.azure.storage.blob.CloudBlob;
2729
import com.microsoft.azure.storage.blob.CloudBlobDirectory;
2830
import com.microsoft.azure.storage.blob.CloudBlockBlob;
2931

3032
import org.apache.jackrabbit.oak.commons.Buffer;
3133
import org.apache.jackrabbit.oak.segment.azure.AzureBlobMetadata;
3234
import org.apache.jackrabbit.oak.segment.remote.AbstractRemoteSegmentArchiveReader;
35+
import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
3336
import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
3437

3538
public class AzureSegmentArchiveReaderV8 extends AbstractRemoteSegmentArchiveReader {
3639

3740
private final CloudBlobDirectory archiveDirectory;
3841

42+
private final long length;
43+
3944
protected AzureSegmentArchiveReaderV8(CloudBlobDirectory archiveDirectory, IOMonitor ioMonitor) throws IOException {
40-
super(ioMonitor, AzureUtilitiesV8.getName(archiveDirectory), createEntryIterable(archiveDirectory));
45+
super(ioMonitor);
4146
this.archiveDirectory = archiveDirectory;
47+
this.length = computeArchiveIndexAndLength();
48+
}
49+
50+
@Override
51+
public long length() {
52+
return length;
4253
}
4354

44-
private static Iterable<ArchiveEntry> createEntryIterable(CloudBlobDirectory archiveDirectory) throws IOException {
45-
return AzureUtilitiesV8.getBlobs(archiveDirectory).stream()
46-
.map(blob -> {
47-
Map<String, String> metadata = blob.getMetadata();
48-
int length = (int) blob.getProperties().getLength();
49-
if (AzureBlobMetadata.isSegment(metadata)) {
50-
return new ArchiveEntry(AzureBlobMetadata.toIndexEntry(metadata, length));
51-
} else {
52-
return new ArchiveEntry(length);
53-
}
54-
})
55-
::iterator;
55+
@Override
56+
public String getName() {
57+
return AzureUtilitiesV8.getName(archiveDirectory);
58+
}
59+
60+
@Override
61+
protected long computeArchiveIndexAndLength() throws IOException {
62+
long length = 0;
63+
for (CloudBlob blob : AzureUtilitiesV8.getBlobs(archiveDirectory)) {
64+
Map<String, String> metadata = blob.getMetadata();
65+
if (AzureBlobMetadata.isSegment(metadata)) {
66+
RemoteSegmentArchiveEntry indexEntry = AzureBlobMetadata.toIndexEntry(metadata, (int) blob.getProperties().getLength());
67+
index.put(new UUID(indexEntry.getMsb(), indexEntry.getLsb()), indexEntry);
68+
}
69+
length += blob.getProperties().getLength();
70+
}
71+
72+
return length;
5673
}
5774

5875
@Override

oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/AbstractRemoteSegmentArchiveReader.java

Lines changed: 11 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -30,53 +30,20 @@
3030

3131
import java.io.File;
3232
import java.io.IOException;
33-
import java.util.Collections;
34-
import java.util.Comparator;
35-
import java.util.LinkedList;
33+
import java.util.ArrayList;
34+
import java.util.LinkedHashMap;
3635
import java.util.List;
3736
import java.util.Map;
38-
import java.util.Set;
3937
import java.util.UUID;
4038
import java.util.concurrent.TimeUnit;
41-
import java.util.stream.Collectors;
4239

4340
public abstract class AbstractRemoteSegmentArchiveReader implements SegmentArchiveReader {
44-
4541
protected final IOMonitor ioMonitor;
4642

47-
/**
48-
* Unordered immutable map of segment UUIDs to their corresponding archive entries.
49-
*/
50-
private final Map<UUID, RemoteSegmentArchiveEntry> index;
51-
52-
/**
53-
* The name of the archive.
54-
*/
55-
private final String archiveName;
43+
protected final Map<UUID, RemoteSegmentArchiveEntry> index = new LinkedHashMap<>();
5644

57-
/**
58-
* The total size of the archive in bytes.
59-
*/
60-
private final long length;
61-
62-
protected AbstractRemoteSegmentArchiveReader(IOMonitor ioMonitor, String archiveName, Iterable<ArchiveEntry> entries) {
45+
public AbstractRemoteSegmentArchiveReader(IOMonitor ioMonitor) throws IOException {
6346
this.ioMonitor = ioMonitor;
64-
this.archiveName = archiveName;
65-
66-
IndexBuilder indexBuilder = new IndexBuilder();
67-
entries.forEach(indexBuilder::addEntry);
68-
this.index = indexBuilder.createIndex();
69-
this.length = indexBuilder.getLength();
70-
}
71-
72-
@Override
73-
public @NotNull String getName() {
74-
return archiveName;
75-
}
76-
77-
@Override
78-
public long length() {
79-
return length;
8047
}
8148

8249
@Override
@@ -106,16 +73,9 @@ public boolean containsSegment(long msb, long lsb) {
10673
return index.containsKey(new UUID(msb, lsb));
10774
}
10875

109-
@Override
110-
public Set<UUID> getSegmentUUIDs() {
111-
return Collections.unmodifiableSet(index.keySet());
112-
}
113-
11476
@Override
11577
public List<SegmentArchiveEntry> listSegments() {
116-
return index.values().stream()
117-
.sorted(Comparator.comparing(RemoteSegmentArchiveEntry::getPosition))
118-
.collect(Collectors.toList());
78+
return new ArrayList<>(index.values());
11979
}
12080

12181
@Override
@@ -142,6 +102,12 @@ public int getEntrySize(int size) {
142102
return size;
143103
}
144104

105+
/**
106+
* Populates the archive index, summing up each entry's length.
107+
* @return length, the total length of the archive
108+
*/
109+
protected abstract long computeArchiveIndexAndLength() throws IOException;
110+
145111
/**
146112
* Reads the segment from the remote storage.
147113
* @param segmentFileName, the name of the segment (msb + lsb) prefixed by its position in the archive
@@ -166,53 +132,4 @@ public int getEntrySize(int size) {
166132
public boolean isRemote() {
167133
return true;
168134
}
169-
170-
protected static final class ArchiveEntry {
171-
172-
private final RemoteSegmentArchiveEntry entry;
173-
174-
private final int length;
175-
176-
public ArchiveEntry(RemoteSegmentArchiveEntry entry) {
177-
this.entry = entry;
178-
this.length = entry.getLength();
179-
}
180-
181-
public ArchiveEntry(int length) {
182-
this.entry = null;
183-
this.length = length;
184-
}
185-
186-
int getLength() {
187-
return length;
188-
}
189-
190-
RemoteSegmentArchiveEntry getRemoteSegmentArchiveEntry() {
191-
return entry;
192-
}
193-
}
194-
195-
private static final class IndexBuilder {
196-
197-
private final List<Map.Entry<UUID, RemoteSegmentArchiveEntry>> entries = new LinkedList<>();
198-
199-
private long length = 0;
200-
201-
private void addEntry(ArchiveEntry entry) {
202-
RemoteSegmentArchiveEntry archiveEntry = entry.getRemoteSegmentArchiveEntry();
203-
if (archiveEntry != null) {
204-
this.entries.add(Map.entry(archiveEntry.getUuid(), archiveEntry));
205-
}
206-
this.length += entry.getLength();
207-
}
208-
209-
@SuppressWarnings("unchecked")
210-
private Map<UUID, RemoteSegmentArchiveEntry> createIndex() {
211-
return Map.ofEntries(entries.toArray(Map.Entry[]::new));
212-
}
213-
214-
private long getLength() {
215-
return length;
216-
}
217-
}
218135
}

0 commit comments

Comments
 (0)