From e878ad1117354913a9147400bf3f042c4c700da0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=95=E5=87=AF=E5=8D=8E?= Date: Thu, 25 Dec 2025 11:33:11 +0800 Subject: [PATCH 01/38] enable ha --- .../component/ComponentCustomJobFactory.java | 49 +++ .../job/component/ComponentCustomJob.java | 76 ++++ .../command/stage/ComponentCustomStage.java | 49 +++ .../command/task/ComponentCustomTask.java | 53 +++ .../validator/ComponentCustomValidator.java | 55 +++ .../server/controller/HdfsHaController.java | 59 +++ .../server/controller/YarnHaController.java | 63 ++++ .../server/enums/ApiExceptionEnum.java | 1 + .../server/model/req/EnableHdfsHaReq.java | 55 +++ .../server/model/req/EnableYarnRmHaReq.java | 52 +++ .../manager/server/service/HdfsHaService.java | 40 ++ .../manager/server/service/YarnHaService.java | 31 ++ .../service/impl/HdfsHaServiceImpl.java | 357 ++++++++++++++++++ .../service/impl/YarnHaServiceImpl.java | 276 ++++++++++++++ .../bigtop/v3_3_0/hadoop/HadoopParams.java | 232 ++++++++++-- .../bigtop/v3_3_0/hadoop/HadoopSetup.java | 7 + .../bigtop/v3_3_0/hadoop/NameNodeScript.java | 41 +- .../bigtop/v3_3_0/hadoop/ZkfcScript.java | 8 + .../stack/bigtop/v3_3_0/kafka/KafkaSetup.java | 28 +- bigtop-manager-ui/src/api/service/index.ts | 30 ++ 20 files changed, 1520 insertions(+), 42 deletions(-) create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentCustomJobFactory.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentCustomJob.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/ComponentCustomStage.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentCustomTask.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/validator/ComponentCustomValidator.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/HdfsHaController.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/YarnHaController.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableHdfsHaReq.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableYarnRmHaReq.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/HdfsHaService.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/YarnHaService.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/HdfsHaServiceImpl.java create mode 100644 bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/YarnHaServiceImpl.java diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentCustomJobFactory.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentCustomJobFactory.java new file mode 100644 index 000000000..2d04eed77 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentCustomJobFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.factory.component; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.CommandIdentifier; +import org.apache.bigtop.manager.server.command.job.Job; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.command.job.component.ComponentCustomJob; +import org.apache.bigtop.manager.server.enums.CommandLevel; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class ComponentCustomJobFactory extends AbstractComponentJobFactory { + + @Override + public CommandIdentifier getCommandIdentifier() { + return new CommandIdentifier(CommandLevel.COMPONENT, Command.CUSTOM); + } + + @Override + public Job createJob(JobContext jobContext) { + return new ComponentCustomJob(jobContext); + } +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentCustomJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentCustomJob.java new file mode 100644 index 000000000..941b20a1f --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentCustomJob.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.job.component; + +import org.apache.bigtop.manager.server.command.stage.ComponentCustomStage; +import org.apache.bigtop.manager.server.command.stage.StageContext; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.ServiceDTO; +import org.apache.bigtop.manager.server.utils.StackUtils; + +import java.util.List; +import java.util.Map; + +/** + * Job for component custom command. + */ +public class ComponentCustomJob extends AbstractComponentJob { + + public ComponentCustomJob(JobContext jobContext) { + super(jobContext); + } + + @Override + protected void createStages() { + String customCommand = jobContext.getCommandDTO().getCustomCommand(); + if (customCommand == null || customCommand.isBlank()) { + throw new ServerException("customCommand must not be blank for CUSTOM command"); + } + + Map> componentHostsMap = getComponentHostsMap(); + for (Map.Entry> e : componentHostsMap.entrySet()) { + String componentName = e.getKey(); + if (componentName != null) { + componentName = componentName.toLowerCase(); + } + List hostnames = e.getValue(); + if (hostnames == null || hostnames.isEmpty()) { + continue; + } + + StageContext stageContext = StageContext.fromCommandDTO(jobContext.getCommandDTO()); + stageContext.setHostnames(hostnames); + stageContext.setComponentName(componentName); + + // AbstractComponentStage#createTaskContext() will call StackUtils.getServiceDTO(stageContext.getServiceName()). + // So serviceName must be set here. + ServiceDTO serviceDTO = StackUtils.getServiceDTOByComponentName(componentName); + stageContext.setServiceName(serviceDTO.getName()); + + stages.add(new ComponentCustomStage(stageContext, customCommand)); + } + } + + @Override + public String getName() { + return "Custom component command"; + } +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/ComponentCustomStage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/ComponentCustomStage.java new file mode 100644 index 000000000..be43ef681 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/ComponentCustomStage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.stage; + +import org.apache.bigtop.manager.server.command.task.ComponentCustomTask; +import org.apache.bigtop.manager.server.command.task.Task; +import org.apache.bigtop.manager.server.utils.StackUtils; + +/** + * Stage for component custom command. + */ +public class ComponentCustomStage extends AbstractComponentStage { + + private final String customCommand; + + public ComponentCustomStage(StageContext stageContext, String customCommand) { + super(stageContext); + this.customCommand = customCommand; + } + + @Override + protected Task createTask(String hostname) { + return new ComponentCustomTask(createTaskContext(hostname), customCommand); + } + + @Override + public String getName() { + return "Custom " + + StackUtils.getComponentDTO(stageContext.getComponentName()).getDisplayName() + + " (" + customCommand + ")"; + } +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentCustomTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentCustomTask.java new file mode 100644 index 000000000..fc1c5d045 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentCustomTask.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.task; + +import org.apache.bigtop.manager.common.enums.Command; + +/** + * Component custom command task. + * + * It will send Command=CUSTOM and customCommand to agent. + */ +public class ComponentCustomTask extends AbstractComponentTask { + + private final String customCommand; + + public ComponentCustomTask(TaskContext taskContext, String customCommand) { + super(taskContext); + this.customCommand = customCommand; + } + + @Override + protected Command getCommand() { + return Command.CUSTOM; + } + + @Override + protected String getCustomCommand() { + return customCommand; + } + + @Override + public String getName() { + return "Custom " + taskContext.getComponentDisplayName() + + " (" + customCommand + ") on " + taskContext.getHostname(); + } +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/validator/ComponentCustomValidator.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/validator/ComponentCustomValidator.java new file mode 100644 index 000000000..9c4d3b9df --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/validator/ComponentCustomValidator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.validator; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.CommandIdentifier; +import org.apache.bigtop.manager.server.enums.ApiExceptionEnum; +import org.apache.bigtop.manager.server.enums.CommandLevel; +import org.apache.bigtop.manager.server.exception.ApiException; + +import org.apache.commons.lang3.StringUtils; + +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Validate component CUSTOM command. + */ +@Component +public class ComponentCustomValidator implements CommandValidator { + + @Override + public List getCommandIdentifiers() { + return List.of(new CommandIdentifier(CommandLevel.COMPONENT, Command.CUSTOM)); + } + + @Override + public void validate(ValidatorContext context) { + String customCommand = context.getCommandDTO().getCustomCommand(); + if (StringUtils.isBlank(customCommand)) { + throw new ApiException(ApiExceptionEnum.OPERATION_FAILED, "customCommand must not be blank"); + } + if (context.getCommandDTO().getComponentCommands() == null || context.getCommandDTO().getComponentCommands().isEmpty()) { + throw new ApiException(ApiExceptionEnum.OPERATION_FAILED, "componentCommands must not be empty"); + } + } +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/HdfsHaController.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/HdfsHaController.java new file mode 100644 index 000000000..828bca350 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/HdfsHaController.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.controller; + +import org.apache.bigtop.manager.server.annotations.Audit; +import org.apache.bigtop.manager.server.model.req.EnableHdfsHaReq; +import org.apache.bigtop.manager.server.model.vo.CommandVO; +import org.apache.bigtop.manager.server.service.CommandService; +import org.apache.bigtop.manager.server.service.HdfsHaService; +import org.apache.bigtop.manager.server.utils.ResponseEntity; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import jakarta.validation.Valid; + +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Tag(name = "HDFS HA Controller") +@RestController +@RequestMapping("/clusters/{clusterId}/services/{serviceId}/actions") +public class HdfsHaController { + + @Resource + private HdfsHaService hdfsHaService; + + @Resource + private CommandService commandService; + + @Audit + @Operation(summary = "enableHdfsHa", description = "Enable HDFS HA for an existing single NameNode cluster") + @PostMapping("/enable-hdfs-ha") + public ResponseEntity enableHdfsHa( + @PathVariable Long clusterId, + @PathVariable Long serviceId, + @RequestBody @Valid EnableHdfsHaReq req) { + return ResponseEntity.success(hdfsHaService.buildEnableHdfsHaCommand(clusterId, serviceId, req)); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/YarnHaController.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/YarnHaController.java new file mode 100644 index 000000000..e16a5e065 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/YarnHaController.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.controller; + +import org.apache.bigtop.manager.server.annotations.Audit; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableYarnRmHaReq; +import org.apache.bigtop.manager.server.model.vo.CommandVO; +import org.apache.bigtop.manager.server.service.CommandService; +import org.apache.bigtop.manager.server.service.YarnHaService; +import org.apache.bigtop.manager.server.utils.ResponseEntity; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import jakarta.validation.Valid; + +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Tag(name = "YARN HA Controller") +@RestController +@RequestMapping("/clusters/{clusterId}/services/{serviceId}/actions") +public class YarnHaController { + + @Resource + private YarnHaService yarnHaService; + + @Resource + private CommandService commandService; + + @Audit + @Operation(summary = "enableYarnRmHa", description = "Enable YARN ResourceManager HA") + @PostMapping("/enable-yarn-rm-ha") + public ResponseEntity enableYarnRmHa( + @PathVariable Long clusterId, + @PathVariable Long serviceId, + @RequestBody @Valid EnableYarnRmHaReq req) { + + CommandDTO commandDTO = yarnHaService.buildEnableYarnRmHaCommand(clusterId, serviceId, req); + return ResponseEntity.success(commandService.command(commandDTO)); + } +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java index b93fcced3..c60e1be01 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java @@ -82,6 +82,7 @@ public enum ApiExceptionEnum { // Command Exceptions -- 18000 ~ 18999 COMMAND_NOT_FOUND(18000, LocaleKeys.COMMAND_NOT_FOUND), COMMAND_NOT_SUPPORTED(18001, LocaleKeys.COMMAND_NOT_SUPPORTED), + INVALID_PARAMS(18002, LocaleKeys.OPERATION_FAILED), // LLM Exceptions -- 19000 ~ 19999 PLATFORM_NOT_FOUND(19000, LocaleKeys.PLATFORM_NOT_FOUND), diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableHdfsHaReq.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableHdfsHaReq.java new file mode 100644 index 000000000..f11ac586c --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableHdfsHaReq.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.model.req; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +@Data +public class EnableHdfsHaReq { + + @NotBlank + @Schema(description = "Active NameNode hostname", example = "nn-a") + private String activeNameNodeHost; + + @NotBlank + @Schema(description = "Standby NameNode hostname", example = "nn-b") + private String standbyNameNodeHost; + + @NotEmpty + @Schema(description = "JournalNode hostnames, must be >=3", example = "[\"jn-1\",\"jn-2\",\"jn-3\"]") + private List journalNodeHosts; + + @NotNull + @Schema(description = "Zookeeper service id", example = "12") + private Long zookeeperServiceId; + + @NotEmpty + @Schema(description = "ZKFC hostnames (usually 2 namenodes)", example = "[\"nn-a\",\"nn-b\"]") + private List zkfcHosts; + + @NotBlank + @Schema(description = "HDFS nameservice, allow rename", example = "nameservice1") + private String nameservice; +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableYarnRmHaReq.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableYarnRmHaReq.java new file mode 100644 index 000000000..a39eff4ce --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableYarnRmHaReq.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.model.req; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +@Data +public class EnableYarnRmHaReq { + + @NotBlank + @Schema(description = "Active ResourceManager hostname (rm1)", example = "rm-a") + private String activeResourceManagerHost; + + @NotBlank + @Schema(description = "Standby ResourceManager hostname (rm2)", example = "rm-b") + private String standbyResourceManagerHost; + + @NotEmpty + @Schema(description = "RM IDs used by yarn.resourcemanager.ha.rm-ids", example = "[\"rm1\",\"rm2\"]") + private List rmIds; + + @NotBlank + @Schema(description = "YARN cluster id (yarn.resourcemanager.cluster-id)", example = "yarn-cluster") + private String yarnClusterId; + + @NotNull + @Schema(description = "Zookeeper service id", example = "12") + private Long zookeeperServiceId; +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/HdfsHaService.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/HdfsHaService.java new file mode 100644 index 000000000..3bc7c6fd9 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/HdfsHaService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service; + +import org.apache.bigtop.manager.server.model.req.EnableHdfsHaReq; + +/** + * HDFS HA service. + * + * 说明:当前仓库中 HdfsHaController 依赖该 Service,但历史版本中该文件缺失,导致编译失败。 + * 本接口用于修复该缺失问题,并为后续实现“单 NN -> 启用 HA”流程提供扩展点。 + */ +public interface HdfsHaService { + + /** + * Build a command to enable HDFS HA. + * + * 当前先提供最小实现所需的方法签名,用于通过编译。 + * 具体启用 HA 的编排逻辑(写入 hdfs-site/core-site、初始化 JN shared edits、bootstrap standby、formatZK、重启组件等) + * 建议在实现类中完成。 + */ + org.apache.bigtop.manager.server.model.vo.CommandVO buildEnableHdfsHaCommand(Long clusterId, Long serviceId, EnableHdfsHaReq req); +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/YarnHaService.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/YarnHaService.java new file mode 100644 index 000000000..d93696d61 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/YarnHaService.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service; + +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableYarnRmHaReq; + +public interface YarnHaService { + + /** + * 启用 YARN ResourceManager HA:写入 yarn-site 推荐 key,并触发 service configure/restart。 + */ + CommandDTO buildEnableYarnRmHaCommand(Long clusterId, Long serviceId, EnableYarnRmHaReq req); +} + diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/HdfsHaServiceImpl.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/HdfsHaServiceImpl.java new file mode 100644 index 000000000..2db28ff99 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/HdfsHaServiceImpl.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service.impl; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.common.utils.JsonUtils; +import org.apache.bigtop.manager.dao.po.ComponentPO; +import org.apache.bigtop.manager.dao.po.ServiceConfigPO; +import org.apache.bigtop.manager.dao.po.ServicePO; +import org.apache.bigtop.manager.dao.query.ComponentQuery; +import org.apache.bigtop.manager.dao.repository.ComponentDao; +import org.apache.bigtop.manager.dao.repository.ServiceConfigDao; +import org.apache.bigtop.manager.dao.repository.ServiceDao; +import org.apache.bigtop.manager.server.enums.CommandLevel; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.dto.PropertyDTO; +import org.apache.bigtop.manager.server.model.dto.ServiceConfigDTO; +import org.apache.bigtop.manager.server.model.dto.command.ComponentCommandDTO; +import org.apache.bigtop.manager.server.model.dto.command.ServiceCommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableHdfsHaReq; +import org.apache.bigtop.manager.server.service.CommandService; +import org.apache.bigtop.manager.server.service.HdfsHaService; +import org.apache.bigtop.manager.server.utils.StackConfigUtils; +import org.apache.bigtop.manager.server.utils.StackUtils; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import jakarta.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Service +public class HdfsHaServiceImpl implements HdfsHaService { + + private static final String ZOOKEEPER_SERVICE_NAME = "zookeeper"; + + @Resource + private ServiceDao serviceDao; + + @Resource + private ServiceConfigDao serviceConfigDao; + + @Resource + private ComponentDao componentDao; + + @Resource + private CommandService commandService; + + @Override + @Transactional + public org.apache.bigtop.manager.server.model.vo.CommandVO buildEnableHdfsHaCommand(Long clusterId, Long serviceId, EnableHdfsHaReq req) { + // 0. Validate prerequisites (components exist, ZK is available, etc.) + validatePrerequisites(clusterId, serviceId, req); + + // 1. Write HA configurations to core-site.xml and hdfs-site.xml + writeHaConfiguration(clusterId, serviceId, req); + + // 2. Orchestrate a sequence of commands to enable HA + // The commandService.command() is asynchronous. The JobScheduler will execute them sequentially. + // We return the first command's VO to the frontend for tracking. + + // Stage 1: Start JournalNodes first + submitStartJournalNodes(clusterId, req.getJournalNodeHosts()); + + // Stage 2: Start Active NameNode + submitStartComponent(clusterId, "namenode", req.getActiveNameNodeHost()); + + // Stage 3: Initialize Active NameNode + submitCustomCommand(clusterId, "namenode", req.getActiveNameNodeHost(), "initializeSharedEdits"); + + // Stage 4: Format ZKFC on Active NameNode host + submitCustomCommand(clusterId, "zkfc", req.getActiveNameNodeHost(), "formatZk"); + + // Stage 5: Start Standby NameNode (NameNodeScript.start() may bootstrap standby automatically) + submitStartComponent(clusterId, "namenode", req.getStandbyNameNodeHost()); + + // Stage 6: Start all ZKFCs + submitStartComponent(clusterId, "zkfc", req.getZkfcHosts()); + + // Final Stage: Trigger a service-level CONFIGURE to apply settings to all other components (e.g., DataNodes) + return submitFinalConfigure(clusterId, serviceId); + } + + private void validatePrerequisites(Long clusterId, Long serviceId, EnableHdfsHaReq req) { + if (clusterId == null) { + throw new ServerException("clusterId is required"); + } + if (serviceId == null) { + throw new ServerException("serviceId is required"); + } + if (req == null) { + throw new ServerException("request body is required"); + } + if (StringUtils.isBlank(req.getActiveNameNodeHost()) || StringUtils.isBlank(req.getStandbyNameNodeHost())) { + throw new ServerException("activeNameNodeHost/standbyNameNodeHost must not be blank"); + } + if (req.getActiveNameNodeHost().equalsIgnoreCase(req.getStandbyNameNodeHost())) { + throw new ServerException("activeNameNodeHost and standbyNameNodeHost must be different"); + } + if (StringUtils.isBlank(req.getNameservice())) { + throw new ServerException("nameservice must not be blank"); + } + if (CollectionUtils.isEmpty(req.getJournalNodeHosts()) || req.getJournalNodeHosts().size() < 3) { + throw new ServerException("journalNodeHosts must be provided and have at least 3 nodes"); + } + if (CollectionUtils.isEmpty(req.getZkfcHosts())) { + throw new ServerException("zkfcHosts must not be empty"); + } + + // Verify required components exist on specified hosts + assertComponentExists(clusterId, "namenode", req.getActiveNameNodeHost()); + assertComponentExists(clusterId, "namenode", req.getStandbyNameNodeHost()); + + for (String jnHost : req.getJournalNodeHosts()) { + assertComponentExists(clusterId, "journalnode", jnHost); + } + + // zkfc should exist on active host at least (as we run formatZk there) + assertComponentExists(clusterId, "zkfc", req.getActiveNameNodeHost()); + + // Validate ZK quorum can be generated (must for automatic failover) + String zk = buildZkAddress(clusterId, req.getZookeeperServiceId()); + if (StringUtils.isBlank(zk)) { + throw new ServerException("Failed to build ha.zookeeper.quorum, please check zookeeper service and components"); + } + } + + private void assertComponentExists(Long clusterId, String componentName, String hostname) { + ComponentQuery q = ComponentQuery.builder() + .clusterId(clusterId) + .name(componentName) + .hostname(hostname) + .build(); + List list = componentDao.findByQuery(q); + if (CollectionUtils.isEmpty(list)) { + throw new ServerException( + "Component not found in DB: component=" + componentName + ", host=" + hostname + ", clusterId=" + clusterId); + } + } + + private void writeHaConfiguration(Long clusterId, Long serviceId, EnableHdfsHaReq req) { + // Update core-site.xml + Map coreSiteUpdates = buildCoreSiteUpdates(clusterId, req); + upsertServiceConfigProperties(clusterId, serviceId, "core-site", coreSiteUpdates); + + // Update hdfs-site.xml + Map hdfsSiteUpdates = buildHdfsSiteUpdates(req); + upsertServiceConfigProperties(clusterId, serviceId, "hdfs-site", hdfsSiteUpdates); + } + + private Map buildCoreSiteUpdates(Long clusterId, EnableHdfsHaReq req) { + Map m = new HashMap<>(); + m.put("fs.defaultFS", "hdfs://" + req.getNameservice()); + + String zkAddress = buildZkAddress(clusterId, req.getZookeeperServiceId()); + if (StringUtils.isNotBlank(zkAddress)) { + m.put("ha.zookeeper.quorum", zkAddress); + } + return m; + } + + private Map buildHdfsSiteUpdates(EnableHdfsHaReq req) { + String nameservice = req.getNameservice(); + String nn1Host = req.getActiveNameNodeHost(); + String nn2Host = req.getStandbyNameNodeHost(); + + if (CollectionUtils.isEmpty(req.getJournalNodeHosts()) || req.getJournalNodeHosts().size() < 3) { + throw new ServerException("JournalNode hosts must be provided and have at least 3 nodes."); + } + String journalQuorum = req.getJournalNodeHosts().stream().map(h -> h + ":8485").collect(Collectors.joining(";")); + + Map m = new HashMap<>(); + m.put("dfs.nameservices", nameservice); + m.put("dfs.ha.namenodes." + nameservice, "nn1,nn2"); + m.put("dfs.namenode.rpc-address." + nameservice + ".nn1", nn1Host + ":8020"); + m.put("dfs.namenode.rpc-address." + nameservice + ".nn2", nn2Host + ":8020"); + m.put("dfs.namenode.http-address." + nameservice + ".nn1", nn1Host + ":9870"); + m.put("dfs.namenode.http-address." + nameservice + ".nn2", nn2Host + ":9870"); + m.put("dfs.namenode.shared.edits.dir", "qjournal://" + journalQuorum + "/" + nameservice); + m.put("dfs.client.failover.proxy.provider." + nameservice, "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + m.put("dfs.ha.automatic-failover.enabled", "true"); + m.put("dfs.ha.fencing.methods", "shell(/bin/true)"); + + // Clean up single-node keys + m.put("__delete__.dfs.namenode.rpc-address", ""); + m.put("__delete__.dfs.namenode.http-address", ""); + m.put("__delete__.dfs.namenode.https-address", ""); + + return m; + } + + private void submitStartJournalNodes(Long clusterId, List hosts) { + submitStartComponent(clusterId, "journalnode", hosts); + } + + private void submitStartComponent(Long clusterId, String componentName, String host) { + submitStartComponent(clusterId, componentName, List.of(host)); + } + + private void submitStartComponent(Long clusterId, String componentName, List hosts) { + CommandDTO commandDTO = new CommandDTO(); + commandDTO.setClusterId(clusterId); + commandDTO.setCommand(Command.START); + commandDTO.setCommandLevel(CommandLevel.COMPONENT); + + ComponentCommandDTO componentCommand = new ComponentCommandDTO(); + componentCommand.setComponentName(componentName); + componentCommand.setHostnames(hosts); + commandDTO.setComponentCommands(List.of(componentCommand)); + + commandService.command(commandDTO); + } + + private void submitCustomCommand(Long clusterId, String componentName, String host, String customCommand) { + CommandDTO commandDTO = new CommandDTO(); + commandDTO.setClusterId(clusterId); + commandDTO.setCommand(Command.CUSTOM); + commandDTO.setCustomCommand(customCommand); + commandDTO.setCommandLevel(CommandLevel.COMPONENT); + + ComponentCommandDTO componentCommand = new ComponentCommandDTO(); + componentCommand.setComponentName(componentName); + componentCommand.setHostnames(List.of(host)); + commandDTO.setComponentCommands(List.of(componentCommand)); + + commandService.command(commandDTO); + } + + private org.apache.bigtop.manager.server.model.vo.CommandVO submitFinalConfigure(Long clusterId, Long serviceId) { + ServicePO servicePO = serviceDao.findById(serviceId); + + CommandDTO commandDTO = new CommandDTO(); + commandDTO.setClusterId(clusterId); + commandDTO.setCommand(Command.CONFIGURE); + commandDTO.setCommandLevel(CommandLevel.SERVICE); + + ServiceCommandDTO serviceCommand = new ServiceCommandDTO(); + serviceCommand.setServiceName(servicePO.getName()); + List mergedConfigs = mergeStackAndDbConfigs(serviceId, servicePO.getName()); + serviceCommand.setConfigs(mergedConfigs); + commandDTO.setServiceCommands(List.of(serviceCommand)); + + return commandService.command(commandDTO); + } + + private String buildZkAddress(Long clusterId, Long zookeeperServiceId) { + ServicePO zkService = serviceDao.findById(zookeeperServiceId); + if (zkService == null || !ZOOKEEPER_SERVICE_NAME.equalsIgnoreCase(zkService.getName())) { + throw new ServerException("zookeeperServiceId must point to a valid Zookeeper service."); + } + + ServiceConfigPO zooCfg = serviceConfigDao.findByServiceIdAndName(zookeeperServiceId, "zoo.cfg"); + if (zooCfg == null || StringUtils.isBlank(zooCfg.getPropertiesJson())) { + throw new ServerException("zoo.cfg not found or empty for zookeeper service: " + zookeeperServiceId); + } + + Map props = JsonUtils.readFromString(zooCfg.getPropertiesJson()); + String clientPort = props.getOrDefault("clientPort", "2181").toString().trim(); + + ComponentQuery query = ComponentQuery.builder().serviceId(zookeeperServiceId).name("zookeeper_server").build(); + List zkHosts = componentDao.findByQuery(query).stream() + .map(ComponentPO::getHostname) + .filter(StringUtils::isNotBlank) + .distinct() + .toList(); + + if (CollectionUtils.isEmpty(zkHosts)) { + return ""; // Let stack side handle it + } + + return zkHosts.stream().map(h -> h.trim() + ":" + clientPort).collect(Collectors.joining(",")); + } + + private void upsertServiceConfigProperties(Long clusterId, Long serviceId, String configName, Map updates) { + ServiceConfigPO po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + if (po == null) { + po = new ServiceConfigPO(); + po.setClusterId(clusterId); + po.setServiceId(serviceId); + po.setName(configName); + po.setPropertiesJson("{}"); + serviceConfigDao.save(po); + po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + } + + Map props = new HashMap<>(); + if (StringUtils.isNotBlank(po.getPropertiesJson())) { + props.putAll(JsonUtils.readFromString(po.getPropertiesJson())); + } + + for (Map.Entry e : updates.entrySet()) { + String k = e.getKey(); + if (k != null && k.startsWith("__delete__.")) { + props.remove(k.substring("__delete__.".length())); + } else { + props.put(k, e.getValue()); + } + } + po.setPropertiesJson(JsonUtils.writeAsString(props)); + serviceConfigDao.partialUpdateByIds(List.of(po)); + } + + private List mergeStackAndDbConfigs(Long serviceId, String serviceName) { + List dbConfigs = serviceConfigDao.findByServiceId(serviceId); + List stackConfigs = StackUtils.SERVICE_CONFIG_MAP.get(serviceName); + if (stackConfigs == null) { + stackConfigs = List.of(); + } + + List dbConfigsDTO = new ArrayList<>(); + for (ServiceConfigPO po : dbConfigs) { + ServiceConfigDTO dto = new ServiceConfigDTO(); + dto.setId(po.getId()); + dto.setName(po.getName()); + + Map props = StringUtils.isBlank(po.getPropertiesJson()) + ? Map.of() + : JsonUtils.readFromString(po.getPropertiesJson()); + + List propertyDTOS = new ArrayList<>(); + for (Map.Entry e : props.entrySet()) { + PropertyDTO p = new PropertyDTO(); + p.setName(e.getKey()); + p.setValue(e.getValue() == null ? null : e.getValue().toString()); + propertyDTOS.add(p); + } + dto.setProperties(propertyDTOS); + dbConfigsDTO.add(dto); + } + + return StackConfigUtils.mergeServiceConfigs(stackConfigs, dbConfigsDTO); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/YarnHaServiceImpl.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/YarnHaServiceImpl.java new file mode 100644 index 000000000..b4ff947dc --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/YarnHaServiceImpl.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service.impl; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.common.utils.JsonUtils; +import org.apache.bigtop.manager.dao.po.ServiceConfigPO; +import org.apache.bigtop.manager.dao.po.ServicePO; +import org.apache.bigtop.manager.dao.query.ComponentQuery; +import org.apache.bigtop.manager.dao.repository.ComponentDao; +import org.apache.bigtop.manager.dao.repository.ServiceConfigDao; +import org.apache.bigtop.manager.dao.repository.ServiceDao; +import org.apache.bigtop.manager.server.enums.CommandLevel; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.dto.PropertyDTO; +import org.apache.bigtop.manager.server.model.dto.ServiceConfigDTO; +import org.apache.bigtop.manager.server.model.dto.command.ServiceCommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableYarnRmHaReq; +import org.apache.bigtop.manager.server.service.YarnHaService; +import org.apache.bigtop.manager.server.utils.StackConfigUtils; +import org.apache.bigtop.manager.server.utils.StackUtils; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import jakarta.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class YarnHaServiceImpl implements YarnHaService { + + private static final String HADOOP_SERVICE_NAME = "hadoop"; + private static final String ZOOKEEPER_SERVICE_NAME = "zookeeper"; + + @Resource + private ServiceDao serviceDao; + + @Resource + private ServiceConfigDao serviceConfigDao; + + @Resource + private ComponentDao componentDao; + + @Override + @Transactional + public CommandDTO buildEnableYarnRmHaCommand(Long clusterId, Long serviceId, EnableYarnRmHaReq req) { + ServicePO servicePO = serviceDao.findById(serviceId); + if (servicePO == null) { + throw new ServerException("Service not found: " + serviceId); + } + if (!HADOOP_SERVICE_NAME.equalsIgnoreCase(servicePO.getName())) { + throw new ServerException("enable-yarn-rm-ha only supports service 'hadoop', but got: " + servicePO.getName()); + } + + // 1) 写入 yarn-site 推荐 key + Map yarnSiteUpdates = buildYarnSiteUpdates(clusterId, serviceId, req); + upsertServiceConfigProperties(clusterId, serviceId, "yarn-site", yarnSiteUpdates); + + // 2) 触发一次 service configure(会走 ServiceConfigureJob: configure + stop + start) + // 注意:这里是最小可用实现;如果需要更细粒度(只滚动 RM),可改为 component 级别 restart。 + CommandDTO commandDTO = new CommandDTO(); + commandDTO.setClusterId(clusterId); + commandDTO.setCommandLevel(CommandLevel.SERVICE); + commandDTO.setCommand(Command.CONFIGURE); + + ServiceCommandDTO serviceCommandDTO = new ServiceCommandDTO(); + serviceCommandDTO.setServiceName(servicePO.getName()); + + // 从数据库取出最新配置,并与 stack 配置合并后塞入 command + List mergedConfigs = mergeStackAndDbConfigs(serviceId, servicePO.getName()); + serviceCommandDTO.setConfigs(mergedConfigs); + + commandDTO.setServiceCommands(List.of(serviceCommandDTO)); + return commandDTO; + } + + private Map buildYarnSiteUpdates(Long clusterId, Long serviceId, EnableYarnRmHaReq req) { + if (StringUtils.isBlank(req.getActiveResourceManagerHost()) || StringUtils.isBlank(req.getStandbyResourceManagerHost())) { + throw new ServerException("active/standby resourcemanager host must not be blank"); + } + if (CollectionUtils.isEmpty(req.getRmIds()) || req.getRmIds().size() < 2) { + throw new ServerException("rmIds must be provided and contain at least 2 ids"); + } + + String rm1Id = req.getRmIds().get(0); + String rm2Id = req.getRmIds().get(1); + + Map m = new HashMap<>(); + m.put("yarn.resourcemanager.ha.enabled", "true"); + m.put("yarn.resourcemanager.ha.rm-ids", String.join(",", req.getRmIds())); + m.put("yarn.resourcemanager.cluster-id", req.getYarnClusterId()); + + // hostname.rmX + m.put("yarn.resourcemanager.hostname." + rm1Id, req.getActiveResourceManagerHost()); + m.put("yarn.resourcemanager.hostname." + rm2Id, req.getStandbyResourceManagerHost()); + + // webapp.address.rmX:优先复用现有 yarn.resourcemanager.webapp.address 的端口,否则默认 8088 + int webappPort = resolveWebappPort(clusterId, serviceId); + m.put("yarn.resourcemanager.webapp.address." + rm1Id, req.getActiveResourceManagerHost() + ":" + webappPort); + m.put("yarn.resourcemanager.webapp.address." + rm2Id, req.getStandbyResourceManagerHost() + ":" + webappPort); + + // zk-address:通过 zookeeperServiceId 查询 zookeeper 的 zoo.cfg,并拼接 host:clientPort + String zkAddress = buildZkAddress(clusterId, req.getZookeeperServiceId()); + if (StringUtils.isNotBlank(zkAddress)) { + m.put("yarn.resourcemanager.zk-address", zkAddress); + } + + // 其它 address.rmX:由 stack 侧 HadoopParams.yarnSite() 自动生成。 + // 如果你希望由 server 强制写入,可以在此处补齐: + // - yarn.resourcemanager.address.rmX + // - yarn.resourcemanager.admin.address.rmX + // - yarn.resourcemanager.resource-tracker.address.rmX + // - yarn.resourcemanager.scheduler.address.rmX + + // 避免混杂:服务端侧也清理单 RM key(DB 侧清理,避免 UI/渲染混杂) + m.put("__delete__.yarn.resourcemanager.hostname", ""); + m.put("__delete__.yarn.resourcemanager.address", ""); + m.put("__delete__.yarn.resourcemanager.admin.address", ""); + m.put("__delete__.yarn.resourcemanager.resource-tracker.address", ""); + m.put("__delete__.yarn.resourcemanager.scheduler.address", ""); + m.put("__delete__.yarn.resourcemanager.webapp.address", ""); + m.put("__delete__.yarn.resourcemanager.webapp.https.address", ""); + return m; + } + + private int resolveWebappPort(Long clusterId, Long serviceId) { + // Try reading existing yarn.resourcemanager.webapp.address from current hadoop service yarn-site + // Default: 8088 + int defaultPort = 8088; + try { + ServiceConfigPO yarnSite = serviceConfigDao.findByServiceIdAndName(serviceId, "yarn-site"); + if (yarnSite == null || StringUtils.isBlank(yarnSite.getPropertiesJson())) { + return defaultPort; + } + Map props = JsonUtils.readFromString(yarnSite.getPropertiesJson()); + Object addr = props.get("yarn.resourcemanager.webapp.address"); + if (addr != null && addr.toString().contains(":")) { + String portStr = addr.toString().split(":")[1].trim(); + return Integer.parseInt(portStr); + } + } catch (Exception ignored) { + } + return defaultPort; + } + + private String buildZkAddress(Long clusterId, Long zookeeperServiceId) { + ServicePO zkService = serviceDao.findById(zookeeperServiceId); + if (zkService == null) { + throw new ServerException("zookeeper service not found: " + zookeeperServiceId); + } + if (!ZOOKEEPER_SERVICE_NAME.equalsIgnoreCase(zkService.getName())) { + throw new ServerException( + "zookeeperServiceId must point to service 'zookeeper', but got: " + zkService.getName()); + } + + List zkConfigs = serviceConfigDao.findByServiceId(zookeeperServiceId); + ServiceConfigPO zooCfg = null; + for (ServiceConfigPO po : zkConfigs) { + if ("zoo.cfg".equals(po.getName())) { + zooCfg = po; + break; + } + } + if (zooCfg == null || StringUtils.isBlank(zooCfg.getPropertiesJson())) { + // 允许为空:stack 侧会尝试自动生成;但 server 侧最好能写入 + return ""; + } + + Map props = JsonUtils.readFromString(zooCfg.getPropertiesJson()); + Object clientPortObj = props.get("clientPort"); + String clientPort = clientPortObj == null ? "2181" : clientPortObj.toString().trim(); + + // ZooKeeper hosts: query component table by serviceId + component name + ComponentQuery query = ComponentQuery.builder() + .serviceId(zookeeperServiceId) + .name("zookeeper_server") + .build(); + List zkHosts = componentDao.findByQuery(query).stream() + .map(x -> x.getHostname()) + .filter(StringUtils::isNotBlank) + .distinct() + .toList(); + + if (CollectionUtils.isEmpty(zkHosts)) { + // 允许为空:stack 侧会尝试自动生成;但 server 侧最好能写入 + return ""; + } + + return String.join(",", zkHosts.stream().map(h -> h.trim() + ":" + clientPort).toList()); + } + + private void upsertServiceConfigProperties(Long clusterId, Long serviceId, String configName, Map updates) { + ServiceConfigPO po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + if (po == null) { + po = new ServiceConfigPO(); + po.setClusterId(clusterId); + po.setServiceId(serviceId); + po.setName(configName); + po.setPropertiesJson("{}"); + serviceConfigDao.save(po); + po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + } + + Map props = new HashMap<>(); + if (StringUtils.isNotBlank(po.getPropertiesJson())) { + props.putAll(JsonUtils.readFromString(po.getPropertiesJson())); + } + + // Support in-method delete semantics: keys prefixed with "__delete__." will be removed. + for (Map.Entry e : updates.entrySet()) { + String k = e.getKey(); + if (k != null && k.startsWith("__delete__.")) { + props.remove(k.substring("__delete__.".length())); + } else { + props.put(k, e.getValue()); + } + } + po.setPropertiesJson(JsonUtils.writeAsString(props)); + serviceConfigDao.partialUpdateByIds(List.of(po)); + } + + private List mergeStackAndDbConfigs(Long serviceId, String serviceName) { + List dbConfigs = serviceConfigDao.findByServiceId(serviceId); + List oriConfigs = StackUtils.SERVICE_CONFIG_MAP.get(serviceName); + if (oriConfigs == null) { + oriConfigs = List.of(); + } + + List newConfigs = new ArrayList<>(); + for (ServiceConfigPO po : dbConfigs) { + ServiceConfigDTO dto = new ServiceConfigDTO(); + dto.setId(po.getId()); + dto.setName(po.getName()); + + Map props = StringUtils.isBlank(po.getPropertiesJson()) + ? Map.of() + : JsonUtils.readFromString(po.getPropertiesJson()); + + List propertyDTOS = new ArrayList<>(); + for (Map.Entry e : props.entrySet()) { + PropertyDTO p = new PropertyDTO(); + p.setName(e.getKey()); + p.setValue(e.getValue() == null ? null : e.getValue().toString()); + propertyDTOS.add(p); + } + dto.setProperties(propertyDTOS); + newConfigs.add(dto); + } + + return StackConfigUtils.mergeServiceConfigs(oriConfigs, newConfigs); + } +} + diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java index 6e262d4e8..7978baa79 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; @Getter @Slf4j @@ -48,6 +49,24 @@ @NoArgsConstructor public class HadoopParams extends BigtopParams { + /** + * nameservice 默认值为 nameservice1;当配置中存在 dfs.nameservices 时优先使用。 + * 这样在“单 NN -> 启用 HA”流程中前端修改 nameservice 后,只要写入配置即可生效。 + */ + private String resolveNameService() { + try { + Map hdfsSite = LocalSettings.configurations(getServiceName(), "hdfs-site"); + Object ns = hdfsSite.get("dfs.nameservices"); + if (ns != null && StringUtils.isNotBlank(ns.toString())) { + // 若包含逗号(多 nameservice),当前仅取第一个 + return ns.toString().split("\\s*,\\s*")[0].trim(); + } + } catch (Exception e) { + // ignore and fallback + } + return "nameservice1"; + } + private final String hadoopLogDir = "/var/log/hadoop"; private final String hadoopPidDir = "/var/run/hadoop"; @@ -127,8 +146,10 @@ public Map coreSite() { coreSite.put( "fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost", namenodeList.get(0))); } else if (!namenodeList.isEmpty() && namenodeList.size() == 2) { + String nameservice = resolveNameService(); coreSite.put( - "fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost:8020", "nameservice1")); + "fs.defaultFS", + ((String) coreSite.get("fs.defaultFS")).replace("localhost:8020", nameservice)); coreSite.put("ha.zookeeper.quorum", zkString); } return coreSite; @@ -155,23 +176,31 @@ public Map hdfsSite() { "dfs.namenode.https-address", ((String) hdfsSite.get("dfs.namenode.https-address")).replace("0.0.0.0", namenodeList.get(0))); } else if (!namenodeList.isEmpty() && namenodeList.size() == 2) { + if (journalNodeList == null || journalNodeList.size() < 3) { + throw new IllegalArgumentException("JournalNode host list must be at least 3 for HDFS HA"); + } + + String nameservice = resolveNameService(); + String journalQuorum = journalNodeList.stream().map(x -> x + ":8485").collect(Collectors.joining(";")); + + // 清理单机模式可能存在的 key,避免与 HA 配置混杂 + hdfsSite.remove("dfs.namenode.rpc-address"); + hdfsSite.remove("dfs.namenode.https-address"); hdfsSite.remove("dfs.namenode.http-address"); + hdfsSite.put("dfs.ha.automatic-failover.enabled", "true"); - hdfsSite.put("dfs.nameservices", "nameservice1"); - hdfsSite.put("dfs.ha.namenodes.nameservice1", "nn1,nn2"); - hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn1", namenodeList.get(0) + ":8020"); - hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn2", namenodeList.get(1) + ":8020"); - hdfsSite.put("dfs.namenode.http-address.nameservice1.nn1", namenodeList.get(0) + ":9870"); - hdfsSite.put("dfs.namenode.http-address.nameservice1.nn2", namenodeList.get(1) + ":9870"); - hdfsSite.put( - "dfs.namenode.shared.edits.dir", - "qjournal://" + journalNodeList.get(0) + ":8485;" + journalNodeList.get(1) + ":8485;" - + journalNodeList.get(2) + ":8485" + "/nameservice1"); + hdfsSite.put("dfs.nameservices", nameservice); + hdfsSite.put("dfs.ha.namenodes." + nameservice, "nn1,nn2"); + hdfsSite.put("dfs.namenode.rpc-address." + nameservice + ".nn1", namenodeList.get(0) + ":8020"); + hdfsSite.put("dfs.namenode.rpc-address." + nameservice + ".nn2", namenodeList.get(1) + ":8020"); + hdfsSite.put("dfs.namenode.http-address." + nameservice + ".nn1", namenodeList.get(0) + ":9870"); + hdfsSite.put("dfs.namenode.http-address." + nameservice + ".nn2", namenodeList.get(1) + ":9870"); + hdfsSite.put("dfs.namenode.shared.edits.dir", "qjournal://" + journalQuorum + "/" + nameservice); + hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal"); hdfsSite.put( - "dfs.client.failover.proxy.provider.nameservice1", + "dfs.client.failover.proxy.provider." + nameservice, "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal"); hdfsSite.put("dfs.ha.fencing.methods", "shell(/bin/true)"); hdfsSite.put("dfs.replication", "3"); } @@ -220,32 +249,136 @@ public Map yarnLog4j() { public Map yarnSite() { Map yarnSite = LocalSettings.configurations(getServiceName(), "yarn-site"); List resourcemanagerList = LocalSettings.componentHosts("resourcemanager"); - if (!resourcemanagerList.isEmpty()) { - yarnSite.put("yarn.resourcemanager.hostname", MessageFormat.format("{0}", resourcemanagerList.get(0))); - yarnSite.put( + + // YARN ResourceManager HA + // When there are >= 2 RMs, or `yarn.resourcemanager.ha.enabled=true` is explicitly set, + // enter HA mode. In HA mode, do not set single-RM keys like `yarn.resourcemanager.hostname` + // to avoid conflicts. + boolean haEnabledByConfig = false; + Object haEnabledValue = yarnSite.get("yarn.resourcemanager.ha.enabled"); + if (haEnabledValue != null) { + haEnabledByConfig = "true".equalsIgnoreCase(haEnabledValue.toString().trim()); + } + boolean haMode = (resourcemanagerList != null && resourcemanagerList.size() >= 2) || haEnabledByConfig; + + if (haMode && resourcemanagerList != null && resourcemanagerList.size() >= 2) { + String rm1Host = resourcemanagerList.get(0); + String rm2Host = resourcemanagerList.get(1); + + // rm-ids: Use existing config if present, otherwise default to rm1,rm2 + String rmIds = "rm1,rm2"; + Object rmIdsObj = yarnSite.get("yarn.resourcemanager.ha.rm-ids"); + if (rmIdsObj != null && StringUtils.isNotBlank(rmIdsObj.toString())) { + rmIds = rmIdsObj.toString().trim(); + } + String[] rmIdArr = rmIds.split("\\s*,\\s*"); + String rm1Id = rmIdArr.length > 0 && StringUtils.isNotBlank(rmIdArr[0]) ? rmIdArr[0] : "rm1"; + String rm2Id = rmIdArr.length > 1 && StringUtils.isNotBlank(rmIdArr[1]) ? rmIdArr[1] : "rm2"; + + yarnSite.put("yarn.resourcemanager.ha.enabled", "true"); + yarnSite.put("yarn.resourcemanager.ha.rm-ids", rm1Id + "," + rm2Id); + + // cluster-id: Respect if set by server, otherwise provide a stable default + if (yarnSite.get("yarn.resourcemanager.cluster-id") == null + || StringUtils.isBlank(yarnSite.get("yarn.resourcemanager.cluster-id").toString())) { + yarnSite.put("yarn.resourcemanager.cluster-id", "yarn-cluster"); + } + + // zk-address: Respect if set by server, otherwise auto-generate like in coreSite() + Object zkAddr = yarnSite.get("yarn.resourcemanager.zk-address"); + if (zkAddr == null || StringUtils.isBlank(zkAddr.toString())) { + try { + List zookeeperServerHosts = LocalSettings.componentHosts("zookeeper_server"); + Map ZKPort = LocalSettings.configurations("zookeeper", "zoo.cfg"); + String clientPort = (String) ZKPort.get("clientPort"); + StringBuilder zkString = new StringBuilder(); + for (int i = 0; i < zookeeperServerHosts.size(); i++) { + String host = zookeeperServerHosts.get(i); + if (host == null || host.trim().isEmpty()) { + continue; + } + zkString.append(host.trim()).append(":").append(clientPort); + if (i != zookeeperServerHosts.size() - 1) { + zkString.append(","); + } + } + if (zkString.length() > 0) { + yarnSite.put("yarn.resourcemanager.zk-address", zkString.toString()); + } + } catch (Exception e) { + log.warn("Failed to auto-generate yarn.resourcemanager.zk-address", e); + } + } + + // Set hostname.rmX + yarnSite.put("yarn.resourcemanager.hostname." + rm1Id, rm1Host); + yarnSite.put("yarn.resourcemanager.hostname." + rm2Id, rm2Host); + + // webapp.address.rmX: Extract port from existing webapp.address, or default to 8088 + int webappPort = 8088; + Object webappAddress = yarnSite.get("yarn.resourcemanager.webapp.address"); + if (webappAddress != null && webappAddress.toString().contains(":")) { + try { + String portStr = webappAddress.toString().split(":")[1].trim(); + webappPort = Integer.parseInt(portStr); + } catch (Exception ignored) { + } + } + yarnSite.put("yarn.resourcemanager.webapp.address." + rm1Id, rm1Host + ":" + webappPort); + yarnSite.put("yarn.resourcemanager.webapp.address." + rm2Id, rm2Host + ":" + webappPort); + + // Auto-generate other HA addresses by extracting ports from single-node configs + generateHaAddress(yarnSite, "yarn.resourcemanager.address", rm1Id, rm1Host, rm2Id, rm2Host, 8032); + generateHaAddress(yarnSite, "yarn.resourcemanager.admin.address", rm1Id, rm1Host, rm2Id, rm2Host, 8033); + generateHaAddress( + yarnSite, "yarn.resourcemanager.resource-tracker.address", - ((String) yarnSite.get("yarn.resourcemanager.resource-tracker.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.scheduler.address", - ((String) yarnSite.get("yarn.resourcemanager.scheduler.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.address", - ((String) yarnSite.get("yarn.resourcemanager.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.admin.address", - ((String) yarnSite.get("yarn.resourcemanager.admin.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.webapp.address", - ((String) yarnSite.get("yarn.resourcemanager.webapp.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.webapp.https.address", - ((String) yarnSite.get("yarn.resourcemanager.webapp.https.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); + rm1Id, + rm1Host, + rm2Id, + rm2Host, + 8031); + generateHaAddress( + yarnSite, "yarn.resourcemanager.scheduler.address", rm1Id, rm1Host, rm2Id, rm2Host, 8030); + + // Remove single-RM keys to avoid conflicts + yarnSite.remove("yarn.resourcemanager.hostname"); + yarnSite.remove("yarn.resourcemanager.address"); + yarnSite.remove("yarn.resourcemanager.admin.address"); + yarnSite.remove("yarn.resourcemanager.resource-tracker.address"); + yarnSite.remove("yarn.resourcemanager.scheduler.address"); + yarnSite.remove("yarn.resourcemanager.webapp.address"); + yarnSite.remove("yarn.resourcemanager.webapp.https.address"); + + } else { + // Single ResourceManager + if (resourcemanagerList != null && !resourcemanagerList.isEmpty()) { + yarnSite.put("yarn.resourcemanager.hostname", MessageFormat.format("{0}", resourcemanagerList.get(0))); + yarnSite.put( + "yarn.resourcemanager.resource-tracker.address", + ((String) yarnSite.get("yarn.resourcemanager.resource-tracker.address")) + .replace("0.0.0.0", resourcemanagerList.get(0))); + yarnSite.put( + "yarn.resourcemanager.scheduler.address", + ((String) yarnSite.get("yarn.resourcemanager.scheduler.address")) + .replace("0.0.0.0", resourcemanagerList.get(0))); + yarnSite.put( + "yarn.resourcemanager.address", + ((String) yarnSite.get("yarn.resourcemanager.address")) + .replace("0.0.0.0", resourcemanagerList.get(0))); + yarnSite.put( + "yarn.resourcemanager.admin.address", + ((String) yarnSite.get("yarn.resourcemanager.admin.address")) + .replace("0.0.0.0", resourcemanagerList.get(0))); + yarnSite.put( + "yarn.resourcemanager.webapp.address", + ((String) yarnSite.get("yarn.resourcemanager.webapp.address")) + .replace("0.0.0.0", resourcemanagerList.get(0))); + yarnSite.put( + "yarn.resourcemanager.webapp.https.address", + ((String) yarnSite.get("yarn.resourcemanager.webapp.https.address")) + .replace("0.0.0.0", resourcemanagerList.get(0))); + } } nodeManagerLogDir = (String) yarnSite.get("yarn.nodemanager.log-dirs"); @@ -303,6 +436,29 @@ public String getServiceName() { * * @param hdfsSite The HDFS site configuration map to be modified */ + private static void generateHaAddress( + Map yarnSite, + String baseKey, + String rm1Id, + String rm1Host, + String rm2Id, + String rm2Host, + int defaultPort) { + + int port = defaultPort; + Object base = yarnSite.get(baseKey); + if (base != null && base.toString().contains(":")) { + try { + String portStr = base.toString().split(":")[1].trim(); + port = Integer.parseInt(portStr); + } catch (Exception ignored) { + } + } + + yarnSite.put(baseKey + "." + rm1Id, rm1Host + ":" + port); + yarnSite.put(baseKey + "." + rm2Id, rm2Host + ":" + port); + } + private void configureNativeLibraryDependentSettings(Map hdfsSite) { try { // Detect system glibc version to determine native library support diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java index b979a012c..70215320f 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java @@ -71,6 +71,7 @@ public static ShellResult configure(Params params, String componentName) { hadoopGroup, Constants.PERMISSION_755, true); + break; } case "secondarynamenode": { LinuxFileUtils.createDirectories( @@ -79,6 +80,7 @@ public static ShellResult configure(Params params, String componentName) { hadoopGroup, Constants.PERMISSION_755, true); + break; } case "journalnode": { LinuxFileUtils.createDirectories( @@ -87,6 +89,7 @@ public static ShellResult configure(Params params, String componentName) { hadoopGroup, Constants.PERMISSION_755, true); + break; } case "datanode": { LinuxFileUtils.createDirectories( @@ -102,6 +105,7 @@ public static ShellResult configure(Params params, String componentName) { dir, hadoopUser, hadoopGroup, Constants.PERMISSION_755, true); } } + break; } case "nodemanager": { if (StringUtils.isNotBlank(hadoopParams.getNodeManagerLogDir())) { @@ -119,7 +123,10 @@ public static ShellResult configure(Params params, String componentName) { dir, hadoopUser, hadoopGroup, Constants.PERMISSION_755, true); } } + break; } + default: + break; } } diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java index b57dcb2b1..51fc28e9e 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java @@ -65,6 +65,7 @@ public ShellResult start(Params params) { List namenodeList = LocalSettings.componentHosts("namenode"); try { if (namenodeList != null && !namenodeList.isEmpty() && hostname.equals(namenodeList.get(0))) { + // 主 NN:仅在“全新部署未格式化”时 format;升级启用 HA 时不能 format HadoopSetup.formatNameNode(hadoopParams); String startCmd = MessageFormat.format("{0}/hdfs --daemon start namenode", hadoopParams.binDir()); ShellResult result = LinuxOSUtils.sudoExecCmd(startCmd, hadoopParams.user()); @@ -73,13 +74,12 @@ public ShellResult start(Params params) { } return result; } else if (namenodeList != null && namenodeList.size() >= 2 && hostname.equals(namenodeList.get(1))) { + // Standby NN:此处保留“自动 bootstrap 后启动”的逻辑,兼容初装直接选 2 个 NN 的 HA 模式 boolean isPrimaryReady = waitForNameNodeReady(namenodeList.get(0), hadoopParams); if (!isPrimaryReady) { throw new StackException("Primary NameNode is not ready, cannot bootstrap standby"); } - String bootstrapCmd = MessageFormat.format( - "{0}/hdfs namenode -bootstrapStandby -nonInteractive", hadoopParams.binDir()); - ShellResult bootstrapResult = LinuxOSUtils.sudoExecCmd(bootstrapCmd, hadoopParams.user()); + ShellResult bootstrapResult = bootstrapStandby(hadoopParams); if (bootstrapResult.getExitCode() != 0) { throw new StackException("Failed to bootstrap standby NameNode: " + bootstrapResult.getErrMsg()); } @@ -98,6 +98,41 @@ public ShellResult start(Params params) { } } + /** + * 在启用 HA 流程中由 server 侧通过 custom command 调用:初始化 shared edits + */ + public ShellResult initializeSharedEdits(Params params) { + configure(params); + HadoopParams hadoopParams = (HadoopParams) params; + String cmd = MessageFormat.format( + "{0}/hdfs --config {1} namenode -initializeSharedEdits -nonInteractive", + hadoopParams.binDir(), + hadoopParams.confDir()); + try { + return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user()); + } catch (Exception e) { + throw new StackException(e); + } + } + + /** + * 在启用 HA 流程中由 server 侧通过 custom command 调用:bootstrap standby + */ + public ShellResult bootstrapStandby(Params params) { + configure(params); + HadoopParams hadoopParams = (HadoopParams) params; + try { + return bootstrapStandby(hadoopParams); + } catch (Exception e) { + throw new StackException(e); + } + } + + private ShellResult bootstrapStandby(HadoopParams hadoopParams) throws Exception { + String cmd = MessageFormat.format("{0}/hdfs namenode -bootstrapStandby -nonInteractive", hadoopParams.binDir()); + return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user()); + } + private boolean waitForNameNodeReady(String namenodeHost, HadoopParams hadoopParams) { String httpPort = hadoopParams.getDfsHttpPort(); long timeout = 5 * 60 * 1000; diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java index c19cb270e..96ecb8828 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java @@ -51,6 +51,14 @@ public ShellResult configure(Params params) { @Override public ShellResult init(Params params) { + // 保持原有 init 行为:formatZK + return formatZk(params); + } + + /** + * 在启用 HA 流程中由 server 侧通过 custom command 调用:formatZK + */ + public ShellResult formatZk(Params params) { configure(params); HadoopParams hadoopParams = (HadoopParams) params; diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaSetup.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaSetup.java index adddb041d..e49e72cf3 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaSetup.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/kafka/KafkaSetup.java @@ -28,6 +28,9 @@ import lombok.extern.slf4j.Slf4j; import java.text.MessageFormat; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.bigtop.manager.common.constants.Constants.PERMISSION_644; import static org.apache.bigtop.manager.common.constants.Constants.PERMISSION_755; @@ -37,6 +40,28 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class KafkaSetup { + private static void createKafkaDataDirs(String kafkaDataDir, String kafkaUser, String kafkaGroup) { + // 兼容以下两种写法: + // 1) 单目录:/data/kafka + // 2) 多目录(Kafka 常见配置):/data1/kafka,/data2/kafka + // 同时容忍空格和重复逗号 + List dirs = Arrays.stream(kafkaDataDir == null ? new String[0] : kafkaDataDir.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .distinct() + .collect(Collectors.toList()); + + // 若未解析出任何目录,则保持原行为(可能由上层保证非空) + if (dirs.isEmpty()) { + LinuxFileUtils.createDirectories(kafkaDataDir, kafkaUser, kafkaGroup, PERMISSION_755, true); + return; + } + + for (String dir : dirs) { + LinuxFileUtils.createDirectories(dir, kafkaUser, kafkaGroup, PERMISSION_755, true); + } + } + public static ShellResult configure(Params params) { log.info("Configuring Kafka"); KafkaParams kafkaParams = (KafkaParams) params; @@ -45,7 +70,8 @@ public static ShellResult configure(Params params) { String kafkaUser = kafkaParams.user(); String kafkaGroup = kafkaParams.group(); - LinuxFileUtils.createDirectories(kafkaParams.getKafkaDataDir(), kafkaUser, kafkaGroup, PERMISSION_755, true); + // 支持多数据盘:当配置为逗号分隔的多个目录时,逐个创建 + createKafkaDataDirs(kafkaParams.getKafkaDataDir(), kafkaUser, kafkaGroup); LinuxFileUtils.createDirectories(kafkaParams.getKafkaLogDir(), kafkaUser, kafkaGroup, PERMISSION_755, true); LinuxFileUtils.createDirectories(kafkaParams.getKafkaPidDir(), kafkaUser, kafkaGroup, PERMISSION_755, true); diff --git a/bigtop-manager-ui/src/api/service/index.ts b/bigtop-manager-ui/src/api/service/index.ts index 9b09d6d75..4f12eb351 100644 --- a/bigtop-manager-ui/src/api/service/index.ts +++ b/bigtop-manager-ui/src/api/service/index.ts @@ -61,6 +61,36 @@ export const takeServiceConfigSnapshot = (pathParams: ServiceParams, data: Snaps ) } +export interface EnableHdfsHaReq { + activeNameNodeHost: string + standbyNameNodeHost: string + journalNodeHosts: string[] + zookeeperServiceId: number + zkfcHosts: string[] + nameservice: string +} + +export const enableHdfsHa = (clusterId: number, serviceId: number, data: EnableHdfsHaReq) => { + return post(`/clusters/${clusterId}/services/${serviceId}/actions/enable-hdfs-ha`, data) +} + +export interface EnableYarnRmHaReq { + /** rm1 host */ + activeResourceManagerHost: string + /** rm2 host */ + standbyResourceManagerHost: string + /** yarn.resourcemanager.ha.rm-ids,例如 rm1,rm2 */ + rmIds: string[] + /** yarn.resourcemanager.cluster-id */ + yarnClusterId: string + /** zookeeper service id,用于拼接/下发 yarn.resourcemanager.zk-address */ + zookeeperServiceId: number +} + +export const enableYarnRmHa = (clusterId: number, serviceId: number, data: EnableYarnRmHaReq) => { + return post(`/clusters/${clusterId}/services/${serviceId}/actions/enable-yarn-rm-ha`, data) +} + export const recoveryServiceConfigSnapshot = (pathParams: SnapshotRecovery) => { return post( `/clusters/${pathParams.clusterId}/services/${pathParams.id}/config-snapshots/${pathParams.snapshotId}` From b02f15545cd86754102a26afd3b8c67778a86ebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=95=E5=87=AF=E5=8D=8E?= Date: Tue, 13 Jan 2026 16:42:33 +0800 Subject: [PATCH 02/38] ha --- .../bigtop/v3_3_0/hadoop/HadoopParams.java | 16 +- .../src/features/service-management/index.vue | 206 ++++++++++++++++-- .../src/store/job-progress/index.ts | 8 +- 3 files changed, 209 insertions(+), 21 deletions(-) diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java index 7978baa79..e0e34f93d 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java @@ -221,13 +221,17 @@ public Map hdfsSite() { } } String journalHttpAddress = (String) hdfsSite.get("dfs.namenode.shared.edits.dir"); - Pattern pattern = Pattern.compile(":(\\d{1,5})"); - Matcher matcher = pattern.matcher(journalHttpAddress); - if (matcher.find()) { - journalHttpPort = matcher.group(1); - log.info("find jounalnode port: " + journalHttpPort); + if (StringUtils.isNotBlank(journalHttpAddress)) { + Pattern pattern = Pattern.compile(":(\\d{1,5})"); + Matcher matcher = pattern.matcher(journalHttpAddress); + if (matcher.find()) { + journalHttpPort = matcher.group(1); + log.info("find jounalnode port: " + journalHttpPort); + } else { + log.warn("not found journalnode port!"); + } } else { - log.warn("not found journalnode port!"); + log.warn("dfs.namenode.shared.edits.dir is empty, skip journalnode port parsing"); } String dfsDomainSocketPath = (String) hdfsSite.get("dfs.domain.socket.path"); if (StringUtils.isNotBlank(dfsDomainSocketPath)) { diff --git a/bigtop-manager-ui/src/features/service-management/index.vue b/bigtop-manager-ui/src/features/service-management/index.vue index 7c3e2769f..7ec1b53ce 100644 --- a/bigtop-manager-ui/src/features/service-management/index.vue +++ b/bigtop-manager-ui/src/features/service-management/index.vue @@ -30,12 +30,14 @@ import type { GroupItem } from '@/components/common/button-group/types' import type { ServiceVO } from '@/api/service/types' + import { enableHdfsHa, enableYarnRmHa, getServiceList, type EnableHdfsHaReq, type EnableYarnRmHaReq } from '@/api/service' + interface RouteParams { id: number serviceId: number } - type Key = keyof typeof Command | 'Remove' + type Key = keyof typeof Command | 'Remove' | 'EnableHdfsHa' | 'EnableYarnHa' const { t } = useI18n() const route = useRoute() @@ -61,30 +63,133 @@ { key: '3', title: t('common.configs') } ]) - const actionGroup = computed(() => [ - { - shape: 'default', - type: 'primary', - text: t('common.operation'), - dropdownMenu: [ - { action: 'Start', text: t('common.start', [t('common.service')]) }, - { action: 'Restart', text: t('common.restart', [t('common.service')]) }, - { action: 'Stop', text: t('common.stop', [t('common.service')]) }, - { action: 'Remove', text: t('common.remove', [t('common.service')]), divider: true, danger: true } - ], - dropdownMenuClickEvent: (info) => dropdownMenuClick!(info) + const isHadoopService = computed(() => (serviceDetail.value?.name ?? '').toLowerCase() === 'hadoop') + + const actionGroup = computed(() => { + const baseMenu: any[] = [ + { action: 'Start', text: t('common.start', [t('common.service')]) }, + { action: 'Restart', text: t('common.restart', [t('common.service')]) }, + { action: 'Stop', text: t('common.stop', [t('common.service')]) } + ] + + if (isHadoopService.value) { + baseMenu.push( + { action: 'EnableHdfsHa', text: '启用 HDFS HA' }, + { action: 'EnableYarnHa', text: '启用 YARN HA' } + ) } - ]) + + baseMenu.push({ action: 'Remove', text: t('common.remove', [t('common.service')]), divider: true, danger: true }) + + return [ + { + shape: 'default', + type: 'primary', + text: t('common.operation'), + dropdownMenu: baseMenu, + dropdownMenuClickEvent: (info) => dropdownMenuClick!(info) + } + ] + }) const onServiceDeleted = (clusterId: number) => { router.replace({ path: `/cluster-manage/clusters/${clusterId}` }) } + const hdfsHaModalOpen = ref(false) + const yarnHaModalOpen = ref(false) + + const zookeeperServices = ref<{ id: number; displayName?: string; name?: string }[]>([]) + + const hdfsHaForm = reactive({ + activeNameNodeHost: '', + standbyNameNodeHost: '', + journalNodeHosts: [], + zookeeperServiceId: 0, + zkfcHosts: [], + nameservice: 'nameservice1' + }) + + const yarnHaForm = reactive({ + activeResourceManagerHost: '', + standbyResourceManagerHost: '', + rmIds: ['rm1', 'rm2'], + yarnClusterId: 'yarn-cluster', + zookeeperServiceId: 0 + }) + + const getComponentHosts = (compName: string) => { + const comps = serviceDetail.value?.components ?? [] + return comps + .filter((c) => (c.name ?? '').toLowerCase() === compName) + .map((c) => c.hostname) + .filter(Boolean) as string[] + } + + const loadZookeeperServices = async () => { + const [clusterId] = componentPayload.value + const data: any = await getServiceList(clusterId, { pageNum: 1, pageSize: 200 }) + const list = (data?.data?.list ?? data?.list ?? []) as any[] + zookeeperServices.value = list.filter((s) => (s.name ?? '').toLowerCase() === 'zookeeper') + } + + const openHdfsHaModal = async () => { + await loadZookeeperServices() + const nnHosts = getComponentHosts('namenode') + const jnHosts = getComponentHosts('journalnode') + + hdfsHaForm.activeNameNodeHost = nnHosts[0] ?? '' + hdfsHaForm.standbyNameNodeHost = nnHosts[1] ?? '' + hdfsHaForm.journalNodeHosts = jnHosts + hdfsHaForm.zkfcHosts = [hdfsHaForm.activeNameNodeHost, hdfsHaForm.standbyNameNodeHost].filter(Boolean) + hdfsHaForm.zookeeperServiceId = zookeeperServices.value[0]?.id ?? 0 + + hdfsHaModalOpen.value = true + } + + const openYarnHaModal = async () => { + await loadZookeeperServices() + const rmHosts = getComponentHosts('resourcemanager') + + yarnHaForm.activeResourceManagerHost = rmHosts[0] ?? '' + yarnHaForm.standbyResourceManagerHost = rmHosts[1] ?? '' + yarnHaForm.zookeeperServiceId = zookeeperServices.value[0]?.id ?? 0 + + yarnHaModalOpen.value = true + } + + const submitHdfsHa = async () => { + const [clusterId, serviceId] = componentPayload.value + const res: any = await enableHdfsHa(clusterId, serviceId, hdfsHaForm) + if (res?.id) { + jobProgressStore.trackJob(clusterId, res.id, res.name ?? 'Enable HDFS HA', getServiceDetail) + } + hdfsHaModalOpen.value = false + } + + const submitYarnHa = async () => { + const [clusterId, serviceId] = componentPayload.value + const res: any = await enableYarnRmHa(clusterId, serviceId, yarnHaForm) + if (res?.id) { + jobProgressStore.trackJob(clusterId, res.id, res.name ?? 'Enable YARN HA', getServiceDetail) + } + yarnHaModalOpen.value = false + } + const dropdownMenuClick: GroupItem['dropdownMenuClickEvent'] = async ({ key }) => { const [clusterId, serviceId] = componentPayload.value const service = serviceMap.value[clusterId].filter((s) => Number(serviceId) == s.id)[0] const { name: serviceName, displayName } = service + if (key === 'EnableHdfsHa') { + await openHdfsHaModal() + return + } + if (key === 'EnableYarnHa') { + await openYarnHaModal() + return + } + const processParams = { command: key as Key, clusterId, @@ -125,6 +230,79 @@ :desc="serviceDetail?.desc" :action-groups="actionGroup" /> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +