Skip to content

Commit bd14a13

Browse files
author
gitlab
committed
Merge branch 'fix-ZSTAC-81028' into '5.5.0'
<fix>[migration]: fix batch migration error See merge request zstackio/zstack!8986
2 parents 70aba3c + a65c3b7 commit bd14a13

File tree

3 files changed

+52
-22
lines changed

3 files changed

+52
-22
lines changed

plugin/zbs/src/main/java/org/zstack/storage/zbs/ZbsStorageController.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import org.springframework.beans.factory.annotation.Autowire;
55
import org.springframework.beans.factory.annotation.Autowired;
66
import org.springframework.beans.factory.annotation.Configurable;
7-
import org.zstack.cbd.*;
87
import org.zstack.cbd.kvm.CbdHeartbeatVolumeTO;
98
import org.zstack.cbd.kvm.CbdVolumeTo;
109
import org.zstack.compute.host.HostGlobalConfig;
@@ -32,7 +31,6 @@
3231
import org.zstack.header.host.HostVO;
3332
import org.zstack.header.image.ImageConstant;
3433
import org.zstack.header.message.MessageReply;
35-
import org.zstack.header.rest.RESTFacade;
3634
import org.zstack.header.storage.addon.*;
3735
import org.zstack.header.storage.addon.primary.*;
3836
import org.zstack.header.storage.primary.*;
@@ -711,9 +709,8 @@ private LogicalPoolInfo allocateFreePool(long size, Predicate<LogicalPoolInfo> f
711709
}
712710

713711
private List<LogicalPoolInfo> getSelfPools() {
714-
List<LogicalPoolInfo> logicalPoolInfos = addonInfo.getLogicalPoolInfos();
715-
logicalPoolInfos.removeIf(it -> !config.getPoolNames().contains(it.getLogicalPoolName()));
716-
return logicalPoolInfos;
712+
return addonInfo.getLogicalPoolInfos().stream().filter(pool ->
713+
config.getPoolNames().contains(pool.getLogicalPoolName())).collect(Collectors.toList());
717714
}
718715

719716
@Override

storage/src/main/java/org/zstack/storage/primary/PrimaryStorageBase.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.collect.Interner;
44
import com.google.common.collect.Interners;
5+
import java.util.function.Consumer;
56
import org.springframework.beans.factory.annotation.Autowire;
67
import org.springframework.beans.factory.annotation.Autowired;
78
import org.springframework.beans.factory.annotation.Configurable;
@@ -19,6 +20,9 @@
1920
import org.zstack.core.job.JobQueueFacade;
2021
import org.zstack.core.thread.ChainTask;
2122
import org.zstack.core.thread.MergeQueue;
23+
import org.zstack.core.thread.SingleFlightTask;
24+
import org.zstack.core.thread.SingleFlightTask.SingleFlightDone;
25+
import org.zstack.core.thread.SingleFlightTaskResult;
2226
import org.zstack.core.thread.SyncTaskChain;
2327
import org.zstack.core.thread.ThreadFacade;
2428
import org.zstack.core.trash.StorageTrash;
@@ -1244,23 +1248,33 @@ public void setup() {
12441248

12451249
@Override
12461250
public void run(final FlowTrigger trigger, Map data) {
1247-
syncPhysicalCapacity(new ReturnValueCompletion<PhysicalCapacityUsage>(trigger) {
1248-
@Override
1249-
public void success(PhysicalCapacityUsage returnValue) {
1250-
PrimaryStorageCapacityUpdater updater = new PrimaryStorageCapacityUpdater(self.getUuid());
1251-
updater.run(cap -> {
1252-
cap.setAvailablePhysicalCapacity(returnValue.availablePhysicalSize < 0 ? 0 : returnValue.availablePhysicalSize);
1253-
cap.setTotalPhysicalCapacity(returnValue.totalPhysicalSize);
1254-
return cap;
1255-
});
1256-
trigger.next();
1257-
}
1258-
1259-
@Override
1260-
public void fail(ErrorCode errorCode) {
1261-
trigger.fail(errorCode);
1262-
}
1263-
});
1251+
thdf.singleFlightSubmit(new SingleFlightTask(trigger)
1252+
.setSyncSignature("sync-physical-capacity-on-ps-" + self.getUuid())
1253+
.run(completion -> {
1254+
syncPhysicalCapacity(new ReturnValueCompletion<PhysicalCapacityUsage>(completion) {
1255+
@Override
1256+
public void success(PhysicalCapacityUsage returnValue) {
1257+
PrimaryStorageCapacityUpdater updater = new PrimaryStorageCapacityUpdater(self.getUuid());
1258+
updater.run(cap -> {
1259+
cap.setAvailablePhysicalCapacity(returnValue.availablePhysicalSize < 0 ? 0 : returnValue.availablePhysicalSize);
1260+
cap.setTotalPhysicalCapacity(returnValue.totalPhysicalSize);
1261+
return cap;
1262+
});
1263+
completion.success(null);
1264+
}
1265+
1266+
@Override
1267+
public void fail(ErrorCode errorCode) {
1268+
completion.fail(errorCode);
1269+
}
1270+
});
1271+
}).done(result -> {
1272+
if (!result.isSuccess()) {
1273+
trigger.fail(result.getErrorCode());
1274+
} else {
1275+
trigger.next();
1276+
}
1277+
}));
12641278
}
12651279
});
12661280

test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ import org.zstack.testlib.SubCase
3232
import org.zstack.utils.data.SizeUnit
3333
import org.zstack.utils.gson.JSONObjectUtil
3434

35+
import java.util.concurrent.atomic.AtomicInteger
36+
3537
/**
3638
* @author Xingwei Yu
3739
* @date 2024/4/19 10:09
@@ -164,6 +166,7 @@ class ZbsPrimaryStorageCase extends SubCase {
164166
kvm = env.inventoryByName("kvm-1") as KVMHostInventory
165167
evtf = bean(EventFacade.class)
166168

169+
testSyncPrimaryStorageCapacityConcurrently()
167170
testDefaultConfig()
168171
testUpdateExternalPrimaryStorage()
169172
testMdsConnectFailed()
@@ -177,6 +180,22 @@ class ZbsPrimaryStorageCase extends SubCase {
177180
}
178181
}
179182

183+
void testSyncPrimaryStorageCapacityConcurrently() {
184+
def threads = new ArrayList<>()
185+
def success_cnt = new AtomicInteger(0)
186+
(1..20).forEach {
187+
threads.add(Thread.start {
188+
syncPrimaryStorageCapacity {
189+
primaryStorageUuid = ps.uuid
190+
}
191+
success_cnt.incrementAndGet()
192+
})
193+
}
194+
195+
threads.each { it.join() }
196+
assert success_cnt.get() == 20
197+
}
198+
180199
void testCheckHostStorageConnection() {
181200
attachPrimaryStorageToCluster {
182201
primaryStorageUuid = ps.uuid

0 commit comments

Comments
 (0)