Skip to content

Commit 39d1149

Browse files
committed
Optimize memtable flush logic
add more flushing stats: partitions/rows, bytes rate, CPU and heap allocation for the flushing thread avoid columns filtering overheads for unfilteredIterator do not re-map colums in serializeRowBody if they haven't changed reduce allocations during serialization of NativeClustering add fast return for BTreeRow.hasComplexDeletion, avoid column.name.bytes.hashCode if not needed, avoid capturing lambda allocation in UnfilteredSerializer.serializeRowBody check if Guardrails enabled at the beginning of writing, avoid hidden auto-boxing for logging of primitive parameters split call sites for in Cell serialize logic, make isCounterCell cheaper (avoid megamorphic call + cache isCounterColumn) invoke metadataCollector.updateClusteringValues only for first and last clustering key in a partition enforce inlining for MinMaxIntTracker/MinMaxLongTracker Patch by Dmitry Konstantinov; reviewed by Branimir Lambov for CASSANDRA-21083
1 parent aa4da83 commit 39d1149

28 files changed

+548
-70
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Optimize memtable flush logic (CASSANDRA-21083)
23
* No need to evict already prepared statements, as it creates a race condition between multiple threads (CASSANDRA-17401)
34
* Include Level information for UnifiedCompactionStrategy in nodetool tablestats output (CASSANDRA-20820)
45
* Support low-overhead async profiling (CASSANDRA-20854)

src/java/org/apache/cassandra/db/ClusteringPrefix.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,33 @@ default boolean isEmpty()
260260
*/
261261
public V get(int i);
262262

263+
264+
/**
265+
* A dedicated method to write ith value of this prefix,
266+
* it is introduced for optimization reasons to avoid retrieval (and potential allocation) of ith value object.
267+
* For the same reason null and empty check are performed inside the method.
268+
*/
269+
default void writeValueSkippingNullAndEmpty(AbstractType<?> type, int i, DataOutputPlus out) throws IOException
270+
{
271+
V v = get(i);
272+
if (v != null && !isEmpty(i))
273+
type.writeValue(v, accessor(), out);
274+
}
275+
276+
/**
277+
* A dedicated method to get a written length of ith value of this prefix,
278+
* it is introduced for optimization reasons to avoid retrieval (and potential allocation) of ith value object.
279+
* For the same reason null and empty check are performed inside the method.
280+
*/
281+
default long writtenLengthSkippingNullAndEmpty(AbstractType<?> type, int i)
282+
{
283+
V v = get(i);
284+
if (v == null || isEmpty(i))
285+
return 0;
286+
287+
return type.writtenLength(v, accessor());
288+
}
289+
263290
/**
264291
* The method is introduced to allow to avoid a value object retrieval/allocation for simple checks
265292
*/
@@ -478,7 +505,6 @@ <V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPl
478505
{
479506
int offset = 0;
480507
int clusteringSize = clustering.size();
481-
ValueAccessor<V> accessor = clustering.accessor();
482508
// serialize in batches of 32, to avoid garbage when deserializing headers
483509
while (offset < clusteringSize)
484510
{
@@ -490,9 +516,7 @@ <V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPl
490516
out.writeUnsignedVInt(makeHeader(clustering, offset, limit));
491517
while (offset < limit)
492518
{
493-
V v = clustering.get(offset);
494-
if (v != null && !accessor.isEmpty(v))
495-
types.get(offset).writeValue(v, accessor, out);
519+
clustering.writeValueSkippingNullAndEmpty(types.get(offset), offset, out);
496520
offset++;
497521
}
498522
}
@@ -509,14 +533,9 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver
509533
result += TypeSizes.sizeofUnsignedVInt(makeHeader(clustering, offset, limit));
510534
offset = limit;
511535
}
512-
ValueAccessor<V> accessor = clustering.accessor();
513536
for (int i = 0; i < clusteringSize; i++)
514537
{
515-
V v = clustering.get(i);
516-
if (v == null || accessor.isEmpty(v))
517-
continue; // handled in the header
518-
519-
result += types.get(i).writtenLength(v, accessor);
538+
result += clustering.writtenLengthSkippingNullAndEmpty(types.get(i), i);
520539
}
521540
return result;
522541
}

src/java/org/apache/cassandra/db/NativeClustering.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
*/
1919
package org.apache.cassandra.db;
2020

21+
import java.io.IOException;
2122
import java.nio.ByteBuffer;
2223
import java.nio.ByteOrder;
2324

25+
import org.apache.cassandra.db.marshal.AbstractType;
2426
import org.apache.cassandra.db.marshal.AddressBasedNativeData;
2527
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
28+
import org.apache.cassandra.db.marshal.IndexedValueHolder;
2629
import org.apache.cassandra.db.marshal.NativeAccessor;
2730
import org.apache.cassandra.db.marshal.NativeData;
2831
import org.apache.cassandra.db.marshal.ValueAccessor;
32+
import org.apache.cassandra.io.util.DataOutputPlus;
2933
import org.apache.cassandra.utils.FBUtilities;
3034
import org.apache.cassandra.utils.ObjectSizes;
3135
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -34,7 +38,7 @@
3438
import org.apache.cassandra.utils.memory.MemoryUtil;
3539
import org.apache.cassandra.utils.memory.NativeEndianMemoryUtil;
3640

37-
public class NativeClustering implements Clustering<NativeData>
41+
public class NativeClustering implements Clustering<NativeData>, IndexedValueHolder<NativeData>
3842
{
3943
private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeClustering());
4044

@@ -118,11 +122,59 @@ public NativeData get(int i)
118122
return buildDataObject(i, AddressBasedNativeData::new);
119123
}
120124

125+
public void writeValueSkippingNullAndEmpty(AbstractType<?> type, int i, DataOutputPlus out) throws IOException
126+
{
127+
if (!isEmpty(i)) // is null is checked as a part of isEmpty
128+
type.writeValue(this, i, NativeAccessor.instance, out);
129+
}
130+
131+
public long writtenLengthSkippingNullAndEmpty(AbstractType<?> type, int i)
132+
{
133+
if (isEmpty(i)) // is null is checked as a part of isEmpty
134+
return 0;
135+
136+
return type.writtenLength(this, i, NativeAccessor.instance);
137+
}
138+
139+
@Override
140+
public int size(int i)
141+
{
142+
int size = size();
143+
if (isNull(peer, size, i))
144+
return 0;
145+
146+
int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i * 2);
147+
int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 2);
148+
return (endOffset - startOffset);
149+
}
150+
121151
public boolean isNull(int i)
122152
{
123153
return isNull(peer, size(), i);
124154
}
125155

156+
@Override
157+
public void write(int i, DataOutputPlus out) throws IOException
158+
{
159+
int size = size();
160+
if (i >= size)
161+
throw new IndexOutOfBoundsException();
162+
163+
int metadataSize = (size * 2) + 4;
164+
int bitmapSize = ((size + 7) >>> 3);
165+
long bitmapStart = peer + metadataSize;
166+
int b = NativeEndianMemoryUtil.getByte(bitmapStart + (i >>> 3));
167+
if ((b & (1 << (i & 7))) != 0)
168+
return;
169+
170+
int startOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 2 + i * 2);
171+
int endOffset = NativeEndianMemoryUtil.getUnsignedShort(peer + 4 + i * 2);
172+
173+
long address = bitmapStart + bitmapSize + startOffset;
174+
int length = endOffset - startOffset;
175+
out.writeMemory(address, length);
176+
}
177+
126178
private static boolean isNull(long peer, int size, int i)
127179
{
128180
if (i >= size)

src/java/org/apache/cassandra/db/SerializationHeader.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,33 @@ public class SerializationHeader
6363

6464
private final Map<ByteBuffer, AbstractType<?>> typeMap;
6565

66+
private final boolean columnsMayChanged;
67+
6668
private SerializationHeader(boolean isForSSTable,
6769
AbstractType<?> keyType,
6870
List<AbstractType<?>> clusteringTypes,
6971
RegularAndStaticColumns columns,
7072
EncodingStats stats,
7173
Map<ByteBuffer, AbstractType<?>> typeMap)
74+
{
75+
this(isForSSTable, keyType, clusteringTypes, columns, stats, typeMap, true);
76+
}
77+
78+
private SerializationHeader(boolean isForSSTable,
79+
AbstractType<?> keyType,
80+
List<AbstractType<?>> clusteringTypes,
81+
RegularAndStaticColumns columns,
82+
EncodingStats stats,
83+
Map<ByteBuffer, AbstractType<?>> typeMap,
84+
boolean columnsMayChanged)
7285
{
7386
this.isForSSTable = isForSSTable;
7487
this.keyType = keyType;
7588
this.clusteringTypes = clusteringTypes;
7689
this.columns = columns;
7790
this.stats = stats;
7891
this.typeMap = typeMap;
92+
this.columnsMayChanged = columnsMayChanged;
7993
}
8094

8195
public static SerializationHeader makeWithoutStats(TableMetadata metadata)
@@ -118,6 +132,21 @@ private static Collection<SSTableReader> orderByDescendingGeneration(Collection<
118132
return readers;
119133
}
120134

135+
public SerializationHeader(boolean isForSSTable,
136+
TableMetadata metadata,
137+
RegularAndStaticColumns columns,
138+
EncodingStats stats,
139+
boolean columnsMayChanged)
140+
{
141+
this(isForSSTable,
142+
metadata.partitionKeyType,
143+
metadata.comparator.subtypes(),
144+
columns,
145+
stats,
146+
null,
147+
columnsMayChanged);
148+
}
149+
121150
public SerializationHeader(boolean isForSSTable,
122151
TableMetadata metadata,
123152
RegularAndStaticColumns columns,
@@ -128,7 +157,8 @@ public SerializationHeader(boolean isForSSTable,
128157
metadata.comparator.subtypes(),
129158
columns,
130159
stats,
131-
null);
160+
null,
161+
true);
132162
}
133163

134164
public RegularAndStaticColumns columns()
@@ -146,6 +176,11 @@ public boolean isForSSTable()
146176
return isForSSTable;
147177
}
148178

179+
public boolean columnsMayChanged()
180+
{
181+
return columnsMayChanged;
182+
}
183+
149184
public EncodingStats stats()
150185
{
151186
return stats;

src/java/org/apache/cassandra/db/compaction/unified/ShardedMultiWriter.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,14 @@ public long getOnDiskBytesWritten()
205205
return bytesWritten;
206206
}
207207

208+
public long getTotalRows()
209+
{
210+
long totalRows = 0;
211+
for (int i = 0; i <= currentWriter; ++i)
212+
totalRows += writers[i].getTotalRows();
213+
return totalRows;
214+
}
215+
208216
@Override
209217
public TableId getTableId()
210218
{

src/java/org/apache/cassandra/db/marshal/AbstractType.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,25 @@ public <V> void writeValue(V value, ValueAccessor<V> accessor, DataOutputPlus o
586586
}
587587
}
588588

589+
public <V> void writeValue(IndexedValueHolder<V> valueHolder, int i, ValueAccessor<V> accessor, DataOutputPlus out) throws IOException
590+
{
591+
assert !valueHolder.isNull(i) : "bytes should not be null for type " + this;
592+
int expectedValueLength = valueLengthIfFixed();
593+
if (expectedValueLength >= 0)
594+
{
595+
int actualValueLength = valueHolder.size(i);
596+
if (actualValueLength == expectedValueLength)
597+
accessor.write(valueHolder, i, out);
598+
else
599+
throw new IOException(String.format("Expected exactly %d bytes, but was %d",
600+
expectedValueLength, actualValueLength));
601+
}
602+
else
603+
{
604+
accessor.writeWithVIntLength(valueHolder, i, out);
605+
}
606+
}
607+
589608
public long writtenLength(ByteBuffer value)
590609
{
591610
return writtenLength(value, ByteBufferAccessor.instance);
@@ -599,6 +618,14 @@ public <V> long writtenLength(V value, ValueAccessor<V> accessor)
599618
: accessor.sizeWithVIntLength(value);
600619
}
601620

621+
public <V> long writtenLength(IndexedValueHolder<V> valueHolder, int i, ValueAccessor<V> accessor)
622+
{
623+
assert !valueHolder.isNull(i) : "bytes should not be null for type " + this;
624+
return valueLengthIfFixed() >= 0
625+
? valueHolder.size(i) // if the size is wrong, this will be detected in writeValue
626+
: accessor.sizeWithVIntLength(valueHolder, i);
627+
}
628+
602629
public ByteBuffer readBuffer(DataInputPlus in) throws IOException
603630
{
604631
return readBuffer(in, Integer.MAX_VALUE);
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db.marshal;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.cassandra.io.util.DataOutputPlus;
24+
25+
public interface IndexedValueHolder<V>
26+
{
27+
V get(int i);
28+
int size(int i);
29+
boolean isEmpty(int i);
30+
boolean isNull(int i);
31+
void write(int i, DataOutputPlus out) throws IOException;
32+
}

src/java/org/apache/cassandra/db/marshal/NativeAccessor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ public void write(NativeData sourceValue, DataOutputPlus out) throws IOException
7070
out.writeMemory(sourceValue.getAddress(), sourceValue.nativeDataSize());
7171
}
7272

73+
public void writeWithVIntLength(IndexedValueHolder<NativeData> valueHolder, int i, DataOutputPlus out) throws IOException
74+
{
75+
int size = valueHolder.size(i);
76+
out.writeUnsignedVInt32(size);
77+
valueHolder.write(i, out);
78+
}
79+
7380
@Override
7481
public ByteBuffer toBuffer(NativeData value)
7582
{

src/java/org/apache/cassandra/db/marshal/ValueAccessor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ default int sizeWithVIntLength(V value)
125125
return TypeSizes.sizeofUnsignedVInt(size) + size;
126126
}
127127

128+
default int sizeWithVIntLength(IndexedValueHolder<V> valueHolder, int i)
129+
{
130+
int size = valueHolder.size(i);
131+
return TypeSizes.sizeofUnsignedVInt(size) + size;
132+
}
133+
128134
/** serialized size including a short length prefix */
129135
default int sizeWithShortLength(V value)
130136
{
@@ -173,12 +179,24 @@ default boolean isEmptyFromOffset(V value, int offset)
173179
*/
174180
void write(V value, DataOutputPlus out) throws IOException;
175181

182+
default void write(IndexedValueHolder<V> valueHolder, int i, DataOutputPlus out) throws IOException
183+
{
184+
write(valueHolder.get(i), out);
185+
}
186+
187+
176188
default void writeWithVIntLength(V value, DataOutputPlus out) throws IOException
177189
{
178190
out.writeUnsignedVInt32(size(value));
179191
write(value, out);
180192
}
181193

194+
default void writeWithVIntLength(IndexedValueHolder<V> valueHolder, int i, DataOutputPlus out) throws IOException
195+
{
196+
out.writeUnsignedVInt32(valueHolder.size(i));
197+
write(valueHolder.get(i), out);
198+
}
199+
182200
/**
183201
* Write the contents of the given value into the ByteBuffer
184202
*/

0 commit comments

Comments
 (0)