Skip to content

Commit db97659

Browse files
committed
feat: Separate legacy drivers into their own classpath, ref: NOISSUED
1 parent 57f42c6 commit db97659

File tree

14 files changed

+300
-58
lines changed

14 files changed

+300
-58
lines changed

benchmark-framework/pom.xml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,6 @@
7474
<artifactId>driver-nsq</artifactId>
7575
<version>${project.version}</version>
7676
</dependency>
77-
<dependency>
78-
<groupId>${project.groupId}</groupId>
79-
<artifactId>driver-pravega</artifactId>
80-
<version>${project.version}</version>
81-
</dependency>
8277
<dependency>
8378
<groupId>${project.groupId}</groupId>
8479
<artifactId>driver-pulsar</artifactId>
@@ -173,6 +168,12 @@
173168
<version>${project.version}</version>
174169
<scope>provided</scope>
175170
</dependency>
171+
<dependency>
172+
<groupId>${project.groupId}</groupId>
173+
<artifactId>driver-pravega</artifactId>
174+
<version>${project.version}</version>
175+
<scope>provided</scope>
176+
</dependency>
176177
<dependency>
177178
<groupId>com.github.stefanbirkner</groupId>
178179
<artifactId>system-lambda</artifactId>
@@ -201,7 +202,7 @@
201202
<plugin>
202203
<groupId>org.apache.maven.plugins</groupId>
203204
<artifactId>maven-dependency-plugin</artifactId>
204-
<version>2.10</version>
205+
<version>3.9.0</version>
205206
<executions>
206207
<execution>
207208
<id>build-classpath</id>

benchmark-framework/src/main/java/io/openmessaging/benchmark/Benchmark.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ static class Arguments {
7272
description = "Allocate extra consumer workers when your backlog builds.")
7373
boolean extraConsumers;
7474

75+
@Parameter(
76+
names = {"--legacy-driver-name"},
77+
description = "Path to legacy driver with conflicting or not maintained dependencies")
78+
public String legacyDriverName;
79+
7580
@Parameter(description = "Workloads") // , required = true)
7681
public List<String> workloads;
7782

@@ -165,7 +170,16 @@ static void main(String[] args) throws Exception {
165170
// Stop any left over workload
166171
worker.stopAll();
167172

168-
worker.initializeDriver(new File(driverConfig));
173+
File legacyDriverFile =
174+
arguments.legacyDriverName != null
175+
? new File(arguments.legacyDriverName)
176+
: null;
177+
178+
if (worker instanceof LocalWorker) {
179+
worker.initializeDriver(new File(driverConfig), legacyDriverFile);
180+
} else {
181+
worker.initializeDriver(new File(driverConfig));
182+
}
169183

170184
WorkloadGenerator generator =
171185
new WorkloadGenerator(driverConfiguration.name, workload, worker);
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.openmessaging.benchmark.utils;
15+
16+
import java.io.File;
17+
import java.lang.reflect.Constructor;
18+
import java.net.URL;
19+
import java.net.URLClassLoader;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
/**
26+
* Child-first classloader that isolates driver dependencies from the main classpath. Use this to
27+
* load drivers with conflicting transitive dependencies.
28+
*/
29+
public final class IsolatedDriverLoader extends URLClassLoader {
30+
private static final Logger log = LoggerFactory.getLogger(IsolatedDriverLoader.class);
31+
32+
private IsolatedDriverLoader(URL[] urls, ClassLoader parent) {
33+
super(urls, parent);
34+
}
35+
36+
@Override
37+
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
38+
// Always delegate java.*, javax.*, and our driver-api to parent
39+
if (name.startsWith("java.")
40+
|| name.startsWith("javax.")
41+
|| name.startsWith("io.openmessaging.benchmark.driver.")) {
42+
return super.loadClass(name, resolve);
43+
}
44+
45+
// Child-first for everything else
46+
synchronized (getClassLoadingLock(name)) {
47+
Class<?> c = findLoadedClass(name);
48+
if (c == null) {
49+
try {
50+
c = findClass(name);
51+
} catch (ClassNotFoundException e) {
52+
c = super.loadClass(name, resolve);
53+
}
54+
}
55+
if (resolve) {
56+
resolveClass(c);
57+
}
58+
return c;
59+
}
60+
}
61+
62+
/**
63+
* Create an isolated classloader for the given driver home.
64+
*
65+
* @param driverName The driver name that will be loaded by the isolated classloader.
66+
* @return The classloader for the driver.
67+
* @throws Exception If the driver name is invalid.
68+
*/
69+
public static ClassLoader forDriverFolder(File driverName) throws Exception {
70+
List<URL> urls = new ArrayList<>();
71+
72+
if (driverName.isFile() && driverName.getName().endsWith(".jar")) {
73+
// Single jar
74+
urls.add(driverName.toURI().toURL());
75+
} else if (driverName.isDirectory()) {
76+
// Maven module layout: target/classes or a distribution folder
77+
File classesDir = new File(driverName, "target/classes");
78+
if (classesDir.exists()) {
79+
urls.add(classesDir.toURI().toURL());
80+
}
81+
82+
// Add all jars in root
83+
File[] rootJars = driverName.listFiles((d, n) -> n.endsWith(".jar"));
84+
if (rootJars != null) {
85+
for (File f : rootJars) {
86+
urls.add(f.toURI().toURL());
87+
}
88+
}
89+
90+
// Add all jars in lib/
91+
File libDir = new File(driverName, "lib");
92+
if (libDir.exists()) {
93+
File[] libJars = libDir.listFiles((d, n) -> n.endsWith(".jar"));
94+
if (libJars != null) {
95+
for (File f : libJars) {
96+
urls.add(f.toURI().toURL());
97+
}
98+
}
99+
}
100+
}
101+
102+
if (urls.isEmpty()) {
103+
throw new IllegalArgumentException(
104+
"No jars found in driver home: " + driverName.getAbsolutePath());
105+
}
106+
107+
log.info("Creating isolated classloader with {} URLs for {}", urls.size(), driverName);
108+
return new IsolatedDriverLoader(
109+
urls.toArray(new URL[0]), IsolatedDriverLoader.class.getClassLoader());
110+
}
111+
112+
/**
113+
* Reflectively instantiate a class from an isolated classloader.
114+
*
115+
* @param cl The classloader to use.
116+
* @param className The class name to instantiate of the isolated benchmark driver.
117+
* @param <T> The type of the class to instantiate.
118+
* @return The instantiated class.
119+
*/
120+
@SuppressWarnings("unchecked")
121+
public static <T> T newInstance(ClassLoader cl, String className) throws Exception {
122+
Class<?> cls = Class.forName(className, true, cl);
123+
Constructor<?> ctor = cls.getDeclaredConstructor();
124+
ctor.setAccessible(true);
125+
return (T) ctor.newInstance();
126+
}
127+
}

benchmark-framework/src/main/java/io/openmessaging/benchmark/utils/RandomGenerator.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,35 @@
1818
public final class RandomGenerator {
1919

2020
private static final Random random = new Random();
21-
private static final String KAFKA_SAFE_CHARS =
22-
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
21+
private static final char[] KAFKA_SAFE_CHARS = buildKafkaSafeChars();
2322

2423
private RandomGenerator() {}
2524

2625
public static String getRandomString() {
2726
// Generate a Kafka-safe random string (only alphanumerics, underscore, and hyphen)
2827
StringBuilder sb = new StringBuilder(7);
2928
for (int i = 0; i < 7; i++) {
30-
sb.append(KAFKA_SAFE_CHARS.charAt(random.nextInt(KAFKA_SAFE_CHARS.length())));
29+
sb.append(KAFKA_SAFE_CHARS[random.nextInt(KAFKA_SAFE_CHARS.length)]);
3130
}
3231
return sb.toString();
3332
}
33+
34+
private static char[] buildKafkaSafeChars() {
35+
char[] chars = new char[26 + 26 + 10 + 2]; // a-z + A-Z + 0-9 + _-
36+
int index = 0;
37+
38+
for (char c = 'a'; c <= 'z'; c++) {
39+
chars[index++] = c;
40+
}
41+
for (char c = 'A'; c <= 'Z'; c++) {
42+
chars[index++] = c;
43+
}
44+
for (char c = '0'; c <= '9'; c++) {
45+
chars[index++] = c;
46+
}
47+
chars[index++] = '_';
48+
chars[index] = '-';
49+
50+
return chars;
51+
}
3452
}

benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/BenchmarkWorker.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@
2525
import org.slf4j.LoggerFactory;
2626

2727
/** A benchmark worker that listen for tasks to perform. */
28-
public class BenchmarkWorker {
28+
public final class BenchmarkWorker {
29+
30+
private BenchmarkWorker() {
31+
throw new UnsupportedOperationException(
32+
"BenchmarkWorker is a CLI utility and cannot be instantiated");
33+
}
2934

3035
@SuppressWarnings("unused")
3136
static class Arguments {
@@ -44,6 +49,7 @@ static class Arguments {
4449
@Parameter(
4550
names = {"-sp", "--stats-port"},
4651
description = "Stats port to listen on")
52+
@Deprecated(since = "0.0.3", forRemoval = true)
4753
private int statsPort = 8081;
4854

4955
public boolean isHelp() {
@@ -59,7 +65,7 @@ public int getStatsPort() {
5965
}
6066
}
6167

62-
public static void main(String[] args) throws Exception {
68+
static void main(String[] args) throws Exception {
6369
final Arguments arguments = new Arguments();
6470
JCommander jc = new JCommander(arguments);
6571
jc.setProgramName("benchmark-worker");

benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/DistributedWorkersEnsemble.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,20 +74,19 @@ static int getNumberOfProducerWorkers(List<Worker> workers, boolean extraConsume
7474
}
7575

7676
@Override
77-
public void initializeDriver(File configurationFile) throws IOException {
77+
public void initializeDriver(File configurationFile) throws Exception {
7878
workers.parallelStream()
7979
.forEach(
8080
w -> {
8181
try {
8282
w.initializeDriver(configurationFile);
83-
} catch (IOException e) {
83+
} catch (Exception e) {
8484
throw new RuntimeException(e);
8585
}
8686
});
8787
}
8888

8989
@Override
90-
@SuppressWarnings("unchecked")
9190
public List<String> createTopics(TopicsInfo topicsInfo) throws IOException {
9291
return leader.createTopics(topicsInfo);
9392
}

benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.fasterxml.jackson.databind.ObjectMapper;
2020
import com.fasterxml.jackson.databind.ObjectWriter;
2121
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
22-
import com.google.common.base.Preconditions;
2322
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
2423
import io.netty.util.concurrent.DefaultThreadFactory;
2524
import io.openmessaging.benchmark.DriverConfiguration;
@@ -30,6 +29,7 @@
3029
import io.openmessaging.benchmark.driver.BenchmarkDriver.TopicInfo;
3130
import io.openmessaging.benchmark.driver.BenchmarkProducer;
3231
import io.openmessaging.benchmark.driver.ConsumerCallback;
32+
import io.openmessaging.benchmark.utils.IsolatedDriverLoader;
3333
import io.openmessaging.benchmark.utils.RandomGenerator;
3434
import io.openmessaging.benchmark.utils.Timer;
3535
import io.openmessaging.benchmark.utils.UniformRateLimiter;
@@ -79,19 +79,42 @@ public LocalWorker(PrometheusMeterRegistry statsLogger) {
7979
}
8080

8181
@Override
82-
public void initializeDriver(File driverConfigFile) throws IOException {
83-
Preconditions.checkArgument(benchmarkDriver == null);
84-
testCompleted = false;
82+
public void initializeDriver(File driverConfigFile) throws Exception {
83+
initializeDriver(driverConfigFile, null);
84+
}
8585

86+
@Override
87+
public void initializeDriver(File driverConfigFile, File isolatedDriverHome) throws Exception {
8688
DriverConfiguration driverConfiguration =
8789
mapper.readValue(driverConfigFile, DriverConfiguration.class);
88-
89-
log.info("Driver: {}", writer.writeValueAsString(driverConfiguration));
90+
log.info("Initializing driver: {}", writer.writeValueAsString(driverConfiguration));
9091

9192
try {
92-
benchmarkDriver =
93-
(BenchmarkDriver) Class.forName(driverConfiguration.driverClass).newInstance();
93+
// Validate driver class is present
94+
if (driverConfiguration.driverClass == null || driverConfiguration.driverClass.isEmpty()) {
95+
throw new IllegalArgumentException("Driver configuration must specify a driverClass");
96+
}
97+
98+
// Check if this driver should be loaded in isolation
99+
boolean shouldIsolate =
100+
isolatedDriverHome != null && driverConfiguration.driverClass.contains("pravega");
101+
102+
if (shouldIsolate) {
103+
log.info(
104+
"Loading driver {} with isolated classloader from {}",
105+
driverConfiguration.driverClass,
106+
isolatedDriverHome);
107+
ClassLoader isolatedCL = IsolatedDriverLoader.forDriverFolder(isolatedDriverHome);
108+
benchmarkDriver =
109+
IsolatedDriverLoader.newInstance(isolatedCL, driverConfiguration.driverClass);
110+
} else {
111+
// Standard loading for non-isolated drivers
112+
benchmarkDriver =
113+
(BenchmarkDriver) Class.forName(driverConfiguration.driverClass).newInstance();
114+
}
115+
94116
benchmarkDriver.initialize(driverConfigFile, stats.getMeterRegistry());
117+
95118
} catch (InstantiationException
96119
| IllegalAccessException
97120
| ClassNotFoundException

benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/Worker.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@
2525

2626
public interface Worker extends AutoCloseable {
2727

28-
void initializeDriver(File configurationFile) throws IOException;
28+
void initializeDriver(File driverConfigFile) throws Exception;
29+
30+
// New method to support isolated driver loading
31+
default void initializeDriver(File driverConfigFile, File isolatedDriverHome) throws Exception {
32+
initializeDriver(driverConfigFile);
33+
}
2934

3035
List<String> createTopics(TopicsInfo topicsInfo) throws IOException;
3136

docker/Dockerfile

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
# limitations under the License.
1313
#
1414

15-
FROM eclipse-temurin:17
16-
15+
FROM amazoncorretto:25-alpine
1716
ARG BENCHMARK_TARBALL
1817

1918
# Using ADD instead of COPY because ${BENCHMARK_TARBALL} is an archive that needs to be extracted
@@ -24,8 +23,8 @@ RUN mv openmessaging-benchmark-* /benchmark
2423
WORKDIR /benchmark
2524

2625
# Install vim
27-
ENV DEBIAN_FRONTEND=noninteractive
28-
RUN apt-get update && apt-get install -y vim tzdata && rm -rf /var/lib/apt/lists/*
26+
#ENV DEBIAN_FRONTEND=noninteractive
27+
#RUN apt-get update && apt-get install -y vim tzdata && rm -rf /var/lib/apt/lists/*
2928

3029
# Start a shell by default
3130
CMD ["/bin/bash"]

0 commit comments

Comments
 (0)