Skip to content

Commit 099f376

Browse files
Merge branch 'cassandra-5.0' into trunk
2 parents 7fe688b + 45afd18 commit 099f376

File tree

3 files changed

+217
-2
lines changed

3 files changed

+217
-2
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@
285285
* Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
286286
* Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787)
287287
Merged from 5.0:
288+
* Automatically disable zero-copy streaming for legacy sstables with old bloom filter format (CASSANDRA-21092)
288289
* Fix CQLSSTableWriter serialization of vector of date and time (CASSANDRA-20979)
289290
* Correctly calculate default for FailureDetector max interval (CASSANDRA-21025)
290291
* Adding missing configs in system_views.settings to be backward compatible (CASSANDRA-20863)

src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,11 @@ public void write(StreamSession session, StreamingDataOutputPlus out, int versio
188188
@VisibleForTesting
189189
public boolean computeShouldStreamEntireSSTables()
190190
{
191-
// don't stream if full sstable transfers are disabled or legacy counter shards are present
192-
if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards)
191+
// don't stream if full sstable transfers are disabled, legacy counter shards are present,
192+
// or sstable uses old bloom filter format (pre-4.0) which is incompatible with zero-copy streaming
193+
if (!DatabaseDescriptor.streamEntireSSTables() ||
194+
ref.get().getSSTableMetadata().hasLegacyCounterShards ||
195+
ref.get().descriptor.version.hasOldBfFormat())
193196
return false;
194197

195198
return contained(sections, ref.get());
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.cassandra.io.sstable;
19+
20+
import java.io.IOException;
21+
import java.nio.file.Files;
22+
import java.nio.file.StandardCopyOption;
23+
import java.util.Collections;
24+
import java.util.concurrent.CountDownLatch;
25+
26+
import org.junit.After;
27+
import org.junit.Before;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
31+
import org.apache.cassandra.SchemaLoader;
32+
import org.apache.cassandra.ServerTestUtils;
33+
import org.apache.cassandra.Util;
34+
import org.apache.cassandra.config.CassandraRelevantProperties;
35+
import org.apache.cassandra.config.DatabaseDescriptor;
36+
import org.apache.cassandra.cql3.QueryProcessor;
37+
import org.apache.cassandra.db.ColumnFamilyStore;
38+
import org.apache.cassandra.db.Keyspace;
39+
import org.apache.cassandra.io.util.File;
40+
import org.apache.cassandra.io.util.FileUtils;
41+
import org.apache.cassandra.locator.Replica;
42+
import org.apache.cassandra.net.MessagingService;
43+
import org.apache.cassandra.schema.KeyspaceParams;
44+
import org.apache.cassandra.schema.Schema;
45+
import org.apache.cassandra.schema.TableMetadataRef;
46+
import org.apache.cassandra.service.StorageService;
47+
import org.apache.cassandra.streaming.StreamEvent;
48+
import org.apache.cassandra.streaming.StreamEventHandler;
49+
import org.apache.cassandra.streaming.StreamState;
50+
import org.apache.cassandra.utils.FBUtilities;
51+
import org.apache.cassandra.utils.OutputHandler;
52+
53+
import static org.junit.Assert.assertTrue;
54+
55+
/**
56+
* Tests SSTableLoader with legacy sstables from Cassandra 3.x
57+
*/
58+
public class SSTableLoaderLegacyTest
59+
{
60+
public static final String KEYSPACE1 = "sstableloaderlegacytest";
61+
public static final String LEGACY_VERSION = "me"; // Cassandra 3.11
62+
public static final String LEGACY_TABLE = "legacy_me_simple";
63+
64+
private static File LEGACY_SSTABLE_ROOT;
65+
private File tmpdir;
66+
67+
@BeforeClass
68+
public static void defineSchema()
69+
{
70+
String scp = CassandraRelevantProperties.TEST_LEGACY_SSTABLE_ROOT.getString();
71+
if (scp == null || scp.isEmpty())
72+
{
73+
throw new RuntimeException("System property for legacy sstable root is not set.");
74+
}
75+
LEGACY_SSTABLE_ROOT = new File(scp).toAbsolute();
76+
77+
ServerTestUtils.prepareServerNoRegister();
78+
SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1));
79+
80+
// Create table matching the legacy sstable schema
81+
// legacy_me_simple has schema: pk text PRIMARY KEY, val text
82+
QueryProcessor.executeInternal(String.format(
83+
"CREATE TABLE %s.%s (pk text PRIMARY KEY, val text)",
84+
KEYSPACE1, LEGACY_TABLE));
85+
86+
MessagingService.instance().waitUntilListeningUnchecked();
87+
StorageService.instance.initServer();
88+
}
89+
90+
@Before
91+
public void setup() throws IOException
92+
{
93+
tmpdir = new File(Files.createTempDirectory("sstableloaderlegacytest").toFile());
94+
}
95+
96+
@After
97+
public void cleanup()
98+
{
99+
FileUtils.deleteRecursive(tmpdir);
100+
}
101+
102+
/**
103+
* Test that loading legacy 3.11 sstables works automatically.
104+
* Zero-copy streaming is automatically disabled for legacy sstables that use the old bloom filter format.
105+
*/
106+
@Test
107+
public void testLoadLegacy311SSTable() throws Exception
108+
{
109+
assertTrue("Zero-copy streaming should be enabled by default",
110+
DatabaseDescriptor.streamEntireSSTables());
111+
112+
File dataDir = setupLegacySSTableDirectory();
113+
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(LEGACY_TABLE);
114+
115+
final CountDownLatch latch = new CountDownLatch(1);
116+
SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(),
117+
new OutputHandler.SystemOutput(false, false));
118+
119+
loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
120+
latch.await();
121+
122+
assertTrue("Data should be loaded from legacy sstable",
123+
!Util.getAll(Util.cmd(cfs).build()).isEmpty());
124+
}
125+
126+
private static final class TestClient extends SSTableLoader.Client
127+
{
128+
private String keyspace;
129+
130+
public void init(String keyspace)
131+
{
132+
this.keyspace = keyspace;
133+
for (Replica replica : StorageService.instance.getLocalReplicas(KEYSPACE1))
134+
addRangeForEndpoint(replica.range(), FBUtilities.getBroadcastAddressAndPort());
135+
}
136+
137+
public TableMetadataRef getTableMetadata(String tableName)
138+
{
139+
return Schema.instance.getTableMetadataRef(keyspace, tableName);
140+
}
141+
}
142+
143+
/**
144+
* Sets up a directory with legacy 3.11 sstables copied from test data.
145+
*/
146+
private File setupLegacySSTableDirectory() throws IOException
147+
{
148+
File dataDir = new File(tmpdir, KEYSPACE1 + "/" + LEGACY_TABLE);
149+
if (!dataDir.exists())
150+
dataDir.createDirectoriesIfNotExists();
151+
152+
File legacyTableDir = new File(LEGACY_SSTABLE_ROOT,
153+
String.format("%s/legacy_tables/%s", LEGACY_VERSION, LEGACY_TABLE));
154+
155+
if (!legacyTableDir.isDirectory())
156+
{
157+
throw new RuntimeException("Legacy sstable directory not found: " + legacyTableDir);
158+
}
159+
160+
// Copy all sstable components to the test directory
161+
File[] sourceFiles = legacyTableDir.tryList();
162+
if (sourceFiles != null)
163+
{
164+
for (File sourceFile : sourceFiles)
165+
{
166+
copyFile(sourceFile, new File(dataDir, sourceFile.name()));
167+
}
168+
}
169+
170+
System.out.println("Copied legacy sstables from: " + legacyTableDir);
171+
System.out.println("To: " + dataDir);
172+
File[] copiedFiles = dataDir.tryList();
173+
System.out.println("File count: " + (copiedFiles != null ? copiedFiles.length : 0));
174+
175+
return dataDir;
176+
}
177+
178+
/**
179+
* Copies a file from source to target.
180+
*/
181+
private static void copyFile(File sourceFile, File targetFile) throws IOException
182+
{
183+
if (sourceFile.isFile())
184+
{
185+
Files.copy(sourceFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
186+
}
187+
}
188+
189+
/**
190+
* Creates a stream completion listener.
191+
*/
192+
private StreamEventHandler completionStreamListener(final CountDownLatch latch)
193+
{
194+
return new StreamEventHandler()
195+
{
196+
public void onFailure(Throwable arg0)
197+
{
198+
latch.countDown();
199+
}
200+
201+
public void onSuccess(StreamState arg0)
202+
{
203+
latch.countDown();
204+
}
205+
206+
public void handleStreamEvent(StreamEvent event)
207+
{
208+
}
209+
};
210+
}
211+
}

0 commit comments

Comments
 (0)