1818
1919package org .apache .cassandra .service .disk .usage ;
2020
21+ import java .util .Set ;
2122import java .util .concurrent .ConcurrentHashMap ;
2223import java .util .concurrent .ConcurrentMap ;
2324import java .util .concurrent .TimeUnit ;
2728import org .slf4j .Logger ;
2829import org .slf4j .LoggerFactory ;
2930
31+ import org .apache .cassandra .config .DatabaseDescriptor ;
3032import org .apache .cassandra .gms .ApplicationState ;
3133import org .apache .cassandra .gms .EndpointState ;
3234import org .apache .cassandra .gms .Gossiper ;
3335import org .apache .cassandra .gms .IEndpointStateChangeSubscriber ;
3436import org .apache .cassandra .gms .VersionedValue ;
3537import org .apache .cassandra .locator .InetAddressAndPort ;
38+ import org .apache .cassandra .locator .Locator ;
3639import org .apache .cassandra .service .StorageService ;
40+ import org .apache .cassandra .tcm .membership .Location ;
3741import org .apache .cassandra .utils .NoSpamLogger ;
3842
3943/**
@@ -50,6 +54,8 @@ public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber
5054 private final DiskUsageMonitor monitor ;
5155 private final ConcurrentMap <InetAddressAndPort , DiskUsageState > usageInfo = new ConcurrentHashMap <>();
5256 private volatile boolean hasStuffedOrFullNode = false ;
57+ private final ConcurrentMap <String , Set <InetAddressAndPort >> fullNodesByDatacenter = new ConcurrentHashMap <>();
58+ private final ConcurrentMap <String , Set <InetAddressAndPort >> stuffedNodesByDatacenter = new ConcurrentHashMap <>();
5359
5460 @ VisibleForTesting
5561 public DiskUsageBroadcaster (DiskUsageMonitor monitor )
@@ -83,6 +89,34 @@ public boolean isStuffed(InetAddressAndPort endpoint)
8389 return state (endpoint ).isStuffed ();
8490 }
8591
92+ /**
93+ * @return {@code true} if there exists any node in the datacenter of {@code endpoint} which has FULL disk usage.
94+ */
95+ @ VisibleForTesting
96+ public boolean isDatacenterFull (String datacenter )
97+ {
98+ if (!hasStuffedOrFullNode ())
99+ {
100+ return false ;
101+ }
102+ Set <InetAddressAndPort > fullNodes = fullNodesByDatacenter .get (datacenter );
103+ return fullNodes != null && !fullNodes .isEmpty ();
104+ }
105+
106+ /**
107+ * @return {@code true} if there exists any node in the datacenter of {@code endpoint} which has FULL disk usage
108+ */
109+ @ VisibleForTesting
110+ public boolean isDatacenterStuffed (String datacenter )
111+ {
112+ if (!hasStuffedOrFullNode ())
113+ {
114+ return false ;
115+ }
116+ Set <InetAddressAndPort > stuffedNodes = stuffedNodesByDatacenter .get (datacenter );
117+ return stuffedNodes != null && !stuffedNodes .isEmpty ();
118+ }
119+
86120 @ VisibleForTesting
87121 public DiskUsageState state (InetAddressAndPort endpoint )
88122 {
@@ -114,8 +148,9 @@ public void onChange(InetAddressAndPort endpoint, ApplicationState state, Versio
114148 noSpamLogger .warn (String .format ("Found unknown DiskUsageState: %s. Using default state %s instead." ,
115149 value .value , usageState ));
116150 }
117- usageInfo .put (endpoint , usageState );
118151
152+ computeUsageStateForEpDatacenter (endpoint , usageState );
153+ usageInfo .put (endpoint , usageState );
119154 hasStuffedOrFullNode = usageState .isStuffedOrFull () || computeHasStuffedOrFullNode ();
120155 }
121156
@@ -131,6 +166,85 @@ private boolean computeHasStuffedOrFullNode()
131166 return false ;
132167 }
133168
169+ /**
170+ * Update the set of full nodes by datacenter based on the disk usage state for the given endpoint.
171+ * If the node is FULL, add it to the set for its datacenter. Otherwise, remove it from the set.
172+ * This method is idempotent - adding an already-present node or removing an absent node has no effect.
173+ *
174+ * @param endpoint The endpoint whose state has changed.
175+ * @param usageState The new disk usage state value.
176+ */
177+ private void computeUsageStateForEpDatacenter (InetAddressAndPort endpoint , DiskUsageState usageState )
178+ {
179+ Location location = location (endpoint );
180+ if (location .equals (Location .UNKNOWN ))
181+ {
182+ noSpamLogger .warn ("Unable to track disk usage by datacenter for endpoint {} because we are unable to determine its location." ,
183+ endpoint );
184+ return ;
185+ }
186+
187+ String datacenter = location .datacenter ;
188+ if (usageState .isFull ())
189+ {
190+ // Add this node to the set of full nodes for its datacenter and remove it from the stuffed nodes
191+ // if it was there.
192+ fullNodesByDatacenter .computeIfAbsent (datacenter , dc -> ConcurrentHashMap .newKeySet ())
193+ .add (endpoint );
194+ noSpamLogger .debug ("Endpoint {} is FULL, added to full nodes set for datacenter {}" , endpoint , datacenter );
195+ Set <InetAddressAndPort > stuffedNodes = stuffedNodesByDatacenter .get (datacenter );
196+ if (stuffedNodes != null && stuffedNodes .remove (endpoint ))
197+ {
198+ noSpamLogger .debug ("Endpoint {} is now FULL. Removed it from the stuffed nodes set for datacenter {}" ,
199+ endpoint , datacenter );
200+ }
201+ }
202+ else if (usageState .isStuffed ())
203+ {
204+ // Add this node to the set of stuffed nodes for its datacenter and remove it from the full nodes
205+ // if it was there.
206+ stuffedNodesByDatacenter .computeIfAbsent (datacenter , dc -> ConcurrentHashMap .newKeySet ())
207+ .add (endpoint );
208+ noSpamLogger .debug ("Endpoint {} is now STUFFED. Added it to the stuffed nodes set for datacenter {}" ,
209+ endpoint , datacenter );
210+ Set <InetAddressAndPort > fullNodes = fullNodesByDatacenter .get (datacenter );
211+ if (fullNodes != null && fullNodes .remove (endpoint ))
212+ {
213+ noSpamLogger .debug ("Endpoint {} is now STUFFED. Removed it from full nodes set for datacenter {}" ,
214+ endpoint , datacenter );
215+ }
216+ }
217+ else
218+ {
219+ // Remove this node from the set of full nodes and set of stuffed nodes for its datacenter if it was there.
220+ Set <InetAddressAndPort > fullNodes = fullNodesByDatacenter .get (datacenter );
221+ if (fullNodes != null && fullNodes .remove (endpoint ))
222+ {
223+ noSpamLogger .debug ("Endpoint {} is no longer STUFFED or FULL, removed from stuffed for datacenter {}" ,
224+ endpoint , datacenter );
225+ }
226+ Set <InetAddressAndPort > stuffedNodes = stuffedNodesByDatacenter .get (datacenter );
227+ if (stuffedNodes != null && stuffedNodes .remove (endpoint ))
228+ {
229+ noSpamLogger .debug ("Endpoint {} is no longer STUFFED, removed from the stuffed set for datacenter {}" ,
230+ endpoint , datacenter );
231+ }
232+ }
233+ }
234+
235+ private Location location (InetAddressAndPort endpoint )
236+ {
237+ Locator locator = DatabaseDescriptor .getLocator ();
238+ if (locator == null )
239+ {
240+ noSpamLogger .warn ("Unable to track disk usage by datacenter for endpoint {} because locator is null" ,
241+ endpoint );
242+ return Location .UNKNOWN ;
243+ }
244+ Location location = locator .location (endpoint );
245+ return location != null ? location : Location .UNKNOWN ;
246+ }
247+
134248 @ Override
135249 public void onJoin (InetAddressAndPort endpoint , EndpointState epState )
136250 {
@@ -164,10 +278,34 @@ public void onRestart(InetAddressAndPort endpoint, EndpointState state)
164278 @ Override
165279 public void onRemove (InetAddressAndPort endpoint )
166280 {
281+ updateDiskUsageStateForDatacenterOnRemoval (endpoint );
167282 usageInfo .remove (endpoint );
168283 hasStuffedOrFullNode = usageInfo .values ().stream ().anyMatch (DiskUsageState ::isStuffedOrFull );
169284 }
170285
286+ private void updateDiskUsageStateForDatacenterOnRemoval (InetAddressAndPort endpoint )
287+ {
288+ Location nodeLocation = location (endpoint );
289+ if (nodeLocation .equals (Location .UNKNOWN ))
290+ {
291+ logger .debug ("Unable to determine location for removed endpoint {}. Will not update datacenter tracking." , endpoint );
292+ return ;
293+ }
294+
295+ String datacenter = nodeLocation .datacenter ;
296+ // Remove the endpoint from the full nodes and stuffed nodes set for its datacenter
297+ Set <InetAddressAndPort > fullNodes = fullNodesByDatacenter .get (datacenter );
298+ if (fullNodes != null && fullNodes .remove (endpoint ))
299+ {
300+ logger .debug ("Removed endpoint {} from full nodes set for datacenter {} on node removal" , endpoint , datacenter );
301+ }
302+ Set <InetAddressAndPort > stuffedNodes = stuffedNodesByDatacenter .get (datacenter );
303+ if (stuffedNodes != null && stuffedNodes .remove (endpoint ))
304+ {
305+ logger .debug ("Removed endpoint {} from stuffed nodes set for datacenter {} on node removal" , endpoint , datacenter );
306+ }
307+ }
308+
171309 private void updateDiskUsage (InetAddressAndPort endpoint , EndpointState state )
172310 {
173311 VersionedValue localValue = state .getApplicationState (ApplicationState .DISK_USAGE );
0 commit comments