Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e0d49a1
Defined a CriticalSingleThreadExecutorMetricSource.
adityamukho Jan 28, 2026
5f728a8
Merge branch 'main' into ignite-27381
adityamukho Jan 28, 2026
04e952c
Using already available thread pool metric sources.
adityamukho Jan 28, 2026
51ddd76
Unique inbound metric name.
adityamukho Jan 28, 2026
bdf9a97
Fixed broken tests.
adityamukho Jan 28, 2026
dd02706
Fixed broken tests.
adityamukho Jan 28, 2026
2e4ba6e
Unregister metrics sources on shutdown.
adityamukho Jan 28, 2026
971b305
Check if source is registered before unregistering.
adityamukho Jan 28, 2026
45b012a
Unregister on forced shutdown.
adityamukho Jan 28, 2026
1d7a41e
Wrap metric source unregister operation in try-catch.
adityamukho Jan 29, 2026
eaca885
Added metrics for DefaultMessagingService.
adityamukho Jan 29, 2026
a1e7856
Semantic refinement of metrics.
adityamukho Jan 29, 2026
148d3c3
Fixed broken tests.
adityamukho Jan 30, 2026
081976a
Reverted comment reformatting.
adityamukho Jan 30, 2026
d01b243
Removed implementation-specific naming.
adityamukho Feb 2, 2026
cad0848
Update modules/workers/src/main/java/org/apache/ignite/internal/worke…
adityamukho Feb 3, 2026
3f21c8e
Apply suggestions from code review
adityamukho Feb 3, 2026
0af060d
Applied review suggestion to improve readability.
adityamukho Feb 3, 2026
bd28a13
Enable and register metric source in the start method.
adityamukho Feb 3, 2026
46d0000
Increment the correct metric.
adityamukho Feb 3, 2026
74780d5
Removed mention of default implementation.
adityamukho Feb 3, 2026
a8b5d2c
Initialization moved to declaration.
adityamukho Feb 3, 2026
cbfdc52
Simplified unregister code.
adityamukho Feb 3, 2026
e3188df
Refined metrics.
adityamukho Feb 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ public abstract class CliIntegrationTest extends ClusterPerClassIntegrationTest
new MetricSource().name("resource.vacuum").enabled(true),
new MetricSource().name("clock.service").enabled(true),
new MetricSource().name("index.builder").enabled(true),
new MetricSource().name("raft.snapshots").enabled(true)
new MetricSource().name("raft.snapshots").enabled(true),
new MetricSource().name("network.messaging").enabled(true),
new MetricSource().name("network.messaging.executor.inbound.Default").enabled(true),
new MetricSource().name("network.messaging.executor.inbound.DeploymentUnits").enabled(true),
new MetricSource().name("network.messaging.executor.inbound.ScaleCube").enabled(true),
new MetricSource().name("network.messaging.executor.outbound").enabled(true),
};

/** Correct ignite jdbc url. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ private PartialNode startPartialNode(int idx, Consumer<MetaStorageManager> metaS
clusterIdService,
new NoOpCriticalWorkerRegistry(),
failureProcessor,
new NoOpMetricManager(),
defaultChannelTypeRegistry(),
new DefaultIgniteProductVersionSource()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableMap;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
Expand All @@ -44,6 +45,7 @@
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
Expand Down Expand Up @@ -186,18 +188,30 @@ public void registerSource(MetricSource src) {

@Override
public void unregisterSource(MetricSource src) {
inBusyLockSafe(busyLock, () -> {
disable(src);
registry.unregisterSource(src);
});
try {
inBusyLockSafe(busyLock, () -> {
disable(src);
registry.unregisterSource(src);
});
} catch (Exception e) {
if (!hasCause(e, NodeStoppingException.class)) {
log.error("Failed to unregister metrics source {}", e, src.name());
}
}
}

@Override
public void unregisterSource(String srcName) {
inBusyLockSafe(busyLock, () -> {
disable(srcName);
registry.unregisterSource(srcName);
});
try {
inBusyLockSafe(busyLock, () -> {
disable(srcName);
registry.unregisterSource(srcName);
});
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern doesn't look very beautiful, and there is also duplication on top. Let's discuss it higher first and come back here after we are done there

if (!hasCause(e, NodeStoppingException.class)) {
log.warn("Failed to unregister metrics source {}", e, srcName);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.metrics.AbstractMetricSource;
import org.apache.ignite.internal.metrics.IntGauge;
import org.apache.ignite.internal.metrics.LongGauge;
import org.apache.ignite.internal.metrics.Metric;
import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;

/** Metric source for monitoring of {@link StripedThreadPoolExecutor}. */
public class StripedThreadPoolMetricSource extends AbstractMetricSource<StripedThreadPoolMetricSource.Holder> {
public class StripedThreadPoolMetricSource<T extends AbstractStripedThreadPoolExecutor<? extends ExecutorService>> extends
AbstractMetricSource<StripedThreadPoolMetricSource<T>.Holder> {
/** Striped thread pool to be monitored. */
private final StripedThreadPoolExecutor exec;
private final T exec;

/**
* Creates a new thread pool metric source with the given {@code name} to monitor the provided striped executor {@code exec},
Expand All @@ -44,8 +47,8 @@ public class StripedThreadPoolMetricSource extends AbstractMetricSource<StripedT
* @param exec Striped thread pool executor to monitor.
* @see StripedThreadPoolExecutor
*/
public StripedThreadPoolMetricSource(String name, @Nullable String description, StripedThreadPoolExecutor exec) {
this(name, null, THREAD_POOLS_GROUP_NAME, exec);
public StripedThreadPoolMetricSource(String name, @Nullable String description, T exec) {
this(name, description, THREAD_POOLS_GROUP_NAME, exec);
}

/**
Expand All @@ -60,7 +63,7 @@ public StripedThreadPoolMetricSource(
String name,
@Nullable String description,
@Nullable String group,
StripedThreadPoolExecutor exec
T exec
) {
super(name, description, group);

Expand Down
3 changes: 3 additions & 0 deletions modules/network/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-vault')
implementation project(':ignite-workers')
implementation project(':ignite-failure-handler')
implementation project(':ignite-metrics')
implementation libs.jetbrains.annotations
implementation libs.scalecube.cluster
implementation libs.fastutil.core
Expand All @@ -56,6 +57,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-configuration'))
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-failure-handler'))
testImplementation testFixtures(project(':ignite-metrics'))
testImplementation libs.jmh.core
testImplementation(libs.kryo) {
//IDEA test runner don't apply Gradle dependency resolve strategy, this is just not implemented
Expand All @@ -72,6 +74,7 @@ dependencies {
testFixturesImplementation project(':ignite-configuration')
testFixturesImplementation project(':ignite-configuration-root')
testFixturesImplementation project(':ignite-failure-handler')
testFixturesImplementation testFixtures(project(':ignite-metrics'))
testFixturesImplementation testFixtures(project(':ignite-core'))
testFixturesImplementation testFixtures(project(':ignite-configuration'))
testFixturesImplementation testFixtures(project(':ignite-workers'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.thread.StripedExecutor;
import org.apache.ignite.internal.worker.CriticalWorker;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
import org.jetbrains.annotations.Nullable;

/**
* Collection of {@link StripedExecutor executors} for the network based on {@link ChannelType#id()}.
Expand All @@ -51,11 +53,15 @@ class CriticalStripedExecutors implements ManuallyCloseable {
String poolNamePrefix,
CriticalWorkerRegistry workerRegistry,
ChannelTypeRegistry channelTypeRegistry,
IgniteLogger log
IgniteLogger log,
@Nullable MetricManager metricManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are they nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CriticalStripedExecutors may be used outside of the messaging service in the future, and metrics may not be so important elsewhere as to be made mandatory.

@Nullable String metricNamePrefix,
@Nullable String metricDescription
) {
this.workerRegistry = workerRegistry;

var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName, poolNamePrefix, log, workerRegistry, registeredWorkers);
var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName, poolNamePrefix, log, workerRegistry, registeredWorkers,
metricManager, metricNamePrefix, metricDescription);

executorByChannelTypeId = StripedExecutorByChannelTypeId.of(channelTypeRegistry, factory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import java.util.List;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.worker.CriticalStripedThreadPoolExecutor;
import org.apache.ignite.internal.worker.CriticalWorker;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
import org.jetbrains.annotations.Nullable;

/** Factory for creating {@link CriticalStripedThreadPoolExecutor}. */
class CriticalStripedThreadPoolExecutorFactory {
Expand All @@ -43,18 +45,45 @@ class CriticalStripedThreadPoolExecutorFactory {

private final List<CriticalWorker> registeredWorkers;

@Nullable
private final MetricManager metricManager;

@Nullable
private final String metricNamePrefix;

@Nullable
private final String metricDescription;
Comment on lines +48 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are they nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For similar reasons as decsribed here.


@SuppressWarnings("unused")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this suppression needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new constructor to pass metrics-related params, but wanted to preserve the old constructor for potential use in places where metrics are deemed unnecessary. Since the old constructor is not used anywhere after the changes in this PR (but I still considered it worthwhile to preserve it), I added the suppression.

CriticalStripedThreadPoolExecutorFactory(
String nodeName,
String poolNamePrefix,
IgniteLogger log,
CriticalWorkerRegistry workerRegistry,
List<CriticalWorker> registeredWorkers
) {
this(nodeName, poolNamePrefix, log, workerRegistry, registeredWorkers, null, null, null);
}

CriticalStripedThreadPoolExecutorFactory(
String nodeName,
String poolNamePrefix,
IgniteLogger log,
CriticalWorkerRegistry workerRegistry,
List<CriticalWorker> registeredWorkers,
@Nullable MetricManager metricManager,
@Nullable String metricNamePrefix,
@Nullable String metricDescription
) {
this.nodeName = nodeName;
this.poolNamePrefix = poolNamePrefix;
this.log = log;
this.workerRegistry = workerRegistry;
this.registeredWorkers = registeredWorkers;

this.metricManager = metricManager;
this.metricNamePrefix = metricNamePrefix;
this.metricDescription = metricDescription;
}

CriticalStripedThreadPoolExecutor create(ChannelType channelType) {
Expand All @@ -64,6 +93,12 @@ CriticalStripedThreadPoolExecutor create(ChannelType channelType) {
var threadFactory = IgniteMessageServiceThreadFactory.create(nodeName, poolName, log, NOTHING_ALLOWED);
var executor = new CriticalStripedThreadPoolExecutor(stripeCountForIndex(channelTypeId), threadFactory, false, 0);

if (metricManager != null && metricNamePrefix != null) {
String metricName = String.format("%s.%s", metricNamePrefix, channelType.name());

executor.initMetricSource(metricManager, metricName, metricDescription);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we init metric source on executor creation (in its constructor)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but that would require either passing nullable metrics params to the constructor or have two different constructors. I can make the change using the former approach, since it is already used above in CriticalStripedThreadPoolExecutorFactory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, if we stick with the nullable metric-related parameters, I think it is better to use the current approach of null-checks followed by init.

}

for (CriticalWorker worker : executor.workers()) {
workerRegistry.register(worker);
registeredWorkers.add(worker);
Expand Down
Loading