Skip to content

Commit 70ee841

Browse files
committed
<fix>[storage]: Serialize and modify multiple mds nodes
When API requests for modifying multiple MDS nodes are initiated, place the API requests into a queue for serialized modification. Resolves: ZSTAC-79225 Change-Id: I6b7963687668796878786568746d797165756b69
1 parent 88b85d2 commit 70ee841

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.zstack.core.workflow.FlowChainBuilder;
1919
import org.zstack.core.workflow.ShareFlow;
2020
import org.zstack.header.core.Completion;
21+
import org.zstack.header.core.NoErrorCompletion;
2122
import org.zstack.header.core.NopeCompletion;
2223
import org.zstack.header.core.ReturnValueCompletion;
2324
import org.zstack.header.core.WhileDoneCompletion;
@@ -172,7 +173,31 @@ protected void handleApiMessage(APIMessage msg) {
172173
}
173174
}
174175

175-
private void handle(APIUpdateExternalPrimaryStorageMsg msg) {
176+
private void handle(final APIUpdateExternalPrimaryStorageMsg msg) {
177+
thdf.chainSubmit(new ChainTask(msg) {
178+
@Override
179+
public String getSyncSignature() {
180+
return String.format("update-external-primary-storage-%s", msg.getUuid());
181+
}
182+
183+
@Override
184+
public void run(SyncTaskChain chain) {
185+
doUpdateExternalPrimaryStorageInQueue(msg, new NoErrorCompletion(chain) {
186+
@Override
187+
public void done() {
188+
chain.next();
189+
}
190+
});
191+
}
192+
193+
@Override
194+
public String getName() {
195+
return getSyncSignature();
196+
}
197+
});
198+
}
199+
200+
private void doUpdateExternalPrimaryStorageInQueue(APIUpdateExternalPrimaryStorageMsg msg, NoErrorCompletion completion) {
176201
APIUpdateExternalPrimaryStorageEvent evt = new APIUpdateExternalPrimaryStorageEvent(msg.getId());
177202
if (msg.getName() != null) {
178203
externalVO.setName(msg.getName());
@@ -188,7 +213,7 @@ private void handle(APIUpdateExternalPrimaryStorageMsg msg) {
188213
}
189214
boolean needReconnect = false;
190215
String oldConfig = externalVO.getConfig();
191-
if (msg.getConfig() != null) {
216+
if (msg.getConfig() != null && !msg.getConfig().equals(oldConfig)) {
192217
String config = controller.validateConfig(msg.getConfig());
193218
externalVO.setConfig(config);
194219
needReconnect = true;
@@ -216,13 +241,15 @@ public void run(MessageReply reply) {
216241
}
217242

218243
bus.publish(evt);
244+
completion.done();
219245
}
220246
});
221247
return;
222248
}
223249

224250
evt.setInventory(externalVO.toInventory());
225251
bus.publish(evt);
252+
completion.done();
226253
}
227254

228255
@Override

test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package org.zstack.test.integration.storage.primary.addon.zbs
22

33
import org.springframework.http.HttpEntity
4+
import org.zstack.core.cloudbus.CloudBus
45
import org.zstack.core.cloudbus.EventCallback
56
import org.zstack.core.cloudbus.EventFacade
67
import org.zstack.core.db.Q
8+
import org.zstack.core.thread.ThreadFacadeImpl
79
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO
810
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageSpaceVO
911
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO_
@@ -13,6 +15,8 @@ import org.zstack.header.storage.primary.PrimaryStorageCapacityVO_
1315
import org.zstack.header.storage.primary.PrimaryStorageHostRefVO
1416
import org.zstack.header.storage.primary.PrimaryStorageHostRefVO_
1517
import org.zstack.header.storage.primary.PrimaryStorageStatus
18+
import org.zstack.header.storage.primary.ReconnectPrimaryStorageMsg
19+
import org.zstack.header.storage.primary.ReconnectPrimaryStorageReply
1620
import org.zstack.storage.zbs.MdsUri
1721
import org.zstack.sdk.*
1822
import org.zstack.storage.addon.primary.ExternalPrimaryStorageSystemTags
@@ -31,6 +35,7 @@ import org.zstack.testlib.HttpError
3135
import org.zstack.testlib.SubCase
3236
import org.zstack.utils.data.SizeUnit
3337
import org.zstack.utils.gson.JSONObjectUtil
38+
import java.util.concurrent.atomic.AtomicInteger
3439

3540
import java.util.concurrent.atomic.AtomicInteger
3641

@@ -47,6 +52,8 @@ class ZbsPrimaryStorageCase extends SubCase {
4752
VolumeInventory vol, vol2
4853
KVMHostInventory kvm
4954
EventFacade evtf
55+
ThreadFacadeImpl thdf
56+
AtomicInteger reconnectMsgCount = new AtomicInteger(0)
5057

5158
@Override
5259
void clean() {
@@ -165,6 +172,7 @@ class ZbsPrimaryStorageCase extends SubCase {
165172
diskOffering = env.inventoryByName("diskOffering") as DiskOfferingInventory
166173
kvm = env.inventoryByName("kvm-1") as KVMHostInventory
167174
evtf = bean(EventFacade.class)
175+
thdf = bean(ThreadFacadeImpl.class)
168176

169177
testSyncPrimaryStorageCapacityConcurrently()
170178
testDefaultConfig()
@@ -294,6 +302,49 @@ class ZbsPrimaryStorageCase extends SubCase {
294302
assert Q.New(ExternalPrimaryStorageSpaceVO.class)
295303
.eq(ExternalPrimaryStorageSpaceVO_.primaryStorageUuid, ps.uuid)
296304
.count() == 1
305+
306+
def signature = String.format("update-external-primary-storage-%s", ps.uuid)
307+
def run = thdf.getChainTaskInfo(signature).getRunningTask().size()
308+
env.message(ReconnectPrimaryStorageMsg.class) { ReconnectPrimaryStorageMsg msg, CloudBus bus ->
309+
if (ps != null && msg.getPrimaryStorageUuid() == ps.uuid) {
310+
reconnectMsgCount.incrementAndGet()
311+
run = thdf.getChainTaskInfo(signature).getRunningTask().size()
312+
assert run == 1
313+
}
314+
def reply = new ReconnectPrimaryStorageReply()
315+
bus.reply(msg, reply)
316+
}
317+
Thread.start {
318+
updateExternalPrimaryStorage {
319+
uuid = ps.uuid
320+
config = "{\"mdsUrls\":[\"root:password@127.0.1.4\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}"
321+
}
322+
}
323+
Thread.start {
324+
updateExternalPrimaryStorage {
325+
uuid = ps.uuid
326+
config = "{\"mdsUrls\":[\"root:password@127.0.1.5\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}"
327+
}
328+
}
329+
String oldConfig = Q.New(ExternalPrimaryStorageVO.class)
330+
.select(ExternalPrimaryStorageVO_.config)
331+
.eq(ExternalPrimaryStorageVO_.uuid, ps.uuid)
332+
.findValue()
333+
Thread.start {
334+
updateExternalPrimaryStorage {
335+
uuid = ps.uuid
336+
config = oldConfig
337+
}
338+
}
339+
retryInSecs {
340+
assert reconnectMsgCount.get() >=1
341+
}
342+
retryInSecs {
343+
assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0
344+
assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0
345+
}
346+
assert reconnectMsgCount.get() == 2
347+
env.revokeMessage(ReconnectPrimaryStorageMsg.class, null)
297348
// update multi pools
298349
// Config.Pool
299350
updateExternalPrimaryStorage {

0 commit comments

Comments
 (0)