diff --git a/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/enums/Command.java b/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/enums/Command.java index 3ca6f2b9e..ad235c50d 100644 --- a/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/enums/Command.java +++ b/bigtop-manager-common/src/main/java/org/apache/bigtop/manager/common/enums/Command.java @@ -39,6 +39,8 @@ public enum Command { CHECK("check", "Check"), CONFIGURE("configure", "Configure"), CUSTOM("custom", "Custom"), + ENABLE_YARN_RM_HA("enable-yarn-rm-ha", "EnableYarnRmHa"), + ENABLE_HDFS_HA("enable-hdfs-ha", "EnableHdfsHa"), // Internal use only, not available for API call INIT("init", "Init"), diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentConfigureJobFactory.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentConfigureJobFactory.java new file mode 100644 index 000000000..ab43a952d --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentConfigureJobFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.factory.component; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.CommandIdentifier; +import org.apache.bigtop.manager.server.command.job.Job; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.command.job.component.ComponentConfigureJob; +import org.apache.bigtop.manager.server.enums.CommandLevel; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class ComponentConfigureJobFactory extends AbstractComponentJobFactory { + + @Override + public CommandIdentifier getCommandIdentifier() { + return new CommandIdentifier(CommandLevel.COMPONENT, Command.CONFIGURE); + } + + @Override + public Job createJob(JobContext jobContext) { + return new ComponentConfigureJob(jobContext); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentCustomJobFactory.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentCustomJobFactory.java new file mode 100644 index 000000000..933e06ed2 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/component/ComponentCustomJobFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.factory.component; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.CommandIdentifier; +import org.apache.bigtop.manager.server.command.job.Job; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.command.job.component.ComponentCustomJob; +import org.apache.bigtop.manager.server.enums.CommandLevel; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class ComponentCustomJobFactory extends AbstractComponentJobFactory { + + @Override + public CommandIdentifier getCommandIdentifier() { + return new CommandIdentifier(CommandLevel.COMPONENT, Command.CUSTOM); + } + + @Override + public Job createJob(JobContext jobContext) { + return new ComponentCustomJob(jobContext); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/service/EnableHdfsHaJobFactory.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/service/EnableHdfsHaJobFactory.java new file mode 100644 index 000000000..41584d437 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/service/EnableHdfsHaJobFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.factory.service; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.CommandIdentifier; +import org.apache.bigtop.manager.server.command.job.Job; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.command.job.service.EnableHdfsHaJob; +import org.apache.bigtop.manager.server.enums.CommandLevel; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class EnableHdfsHaJobFactory extends AbstractServiceJobFactory { + + @Override + public CommandIdentifier getCommandIdentifier() { + return new CommandIdentifier(CommandLevel.SERVICE, Command.ENABLE_HDFS_HA); + } + + @Override + public Job createJob(JobContext jobContext) { + log.info( + "EnableHdfsHaJobFactory creating job, jobFactoryClassSource={}, jobClassSource={}, stageClassSource={}, taskClassSource={}, commandDTO.command={}, commandDTO.customCommandLen={}", + getCodeSource(EnableHdfsHaJobFactory.class), + getCodeSource(EnableHdfsHaJob.class), + getCodeSource(org.apache.bigtop.manager.server.command.stage.ComponentCustomStage.class), + getCodeSource(org.apache.bigtop.manager.server.command.task.ComponentCustomTask.class), + jobContext == null || jobContext.getCommandDTO() == null + ? null + : jobContext.getCommandDTO().getCommand(), + jobContext == null + || jobContext.getCommandDTO() == null + || jobContext.getCommandDTO().getCustomCommand() == null + ? null + : jobContext.getCommandDTO().getCustomCommand().length()); + return new EnableHdfsHaJob(jobContext); + } + + private static String getCodeSource(Class clazz) { + try { + if (clazz == null + || clazz.getProtectionDomain() == null + || clazz.getProtectionDomain().getCodeSource() == null) { + return "null"; + } + return String.valueOf(clazz.getProtectionDomain().getCodeSource().getLocation()); + } catch (Exception e) { + return "error:" + e.getMessage(); + } + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/service/EnableYarnRmHaJobFactory.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/service/EnableYarnRmHaJobFactory.java new file mode 100644 index 000000000..275526e6d --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/factory/service/EnableYarnRmHaJobFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.factory.service; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.CommandIdentifier; +import org.apache.bigtop.manager.server.command.job.Job; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.command.job.service.EnableYarnRmHaJob; +import org.apache.bigtop.manager.server.enums.CommandLevel; + +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class EnableYarnRmHaJobFactory extends AbstractServiceJobFactory { + + @Override + public CommandIdentifier getCommandIdentifier() { + return new CommandIdentifier(CommandLevel.SERVICE, Command.ENABLE_YARN_RM_HA); + } + + @Override + public Job createJob(JobContext jobContext) { + return new EnableYarnRmHaJob(jobContext); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java index 63c988dca..40cd6842f 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java @@ -44,6 +44,8 @@ import org.apache.bigtop.manager.server.model.dto.StackDTO; import org.apache.bigtop.manager.server.utils.StackUtils; +import org.apache.commons.lang3.StringUtils; + import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -75,6 +77,12 @@ private static void initialize() { } public static void sendJobCache(Long jobId, List hostnames) { + sendJobCache(jobId, hostnames, null); + } + + public static void sendJobCache( + Long jobId, List hostnames, Map> overrideComponentHosts) { + final Map> finalOverrideComponentHosts = overrideComponentHosts; if (!INITIALIZED.get()) { initialize(); } @@ -102,6 +110,7 @@ public static void sendJobCache(Long jobId, List hostnames) { JobCachePayload copiedPayload = JsonUtils.readFromString(JsonUtils.writeAsString(payload), JobCachePayload.class); genClusterPayload(copiedPayload, clusterId); + mergeOverrideComponentHosts(copiedPayload, finalOverrideComponentHosts); JobCacheRequest request = JobCacheRequest.newBuilder() .setJobId(jobId) .setPayload(JsonUtils.writeAsString(copiedPayload)) @@ -162,6 +171,31 @@ private static void genClusterPayload(JobCachePayload payload, Long clusterId) { payload.setHosts(hosts); } + private static void mergeOverrideComponentHosts( + JobCachePayload payload, Map> overrideComponentHosts) { + if (overrideComponentHosts == null || overrideComponentHosts.isEmpty()) { + return; + } + if (payload.getComponentHosts() == null) { + payload.setComponentHosts(new HashMap<>()); + } + final Map> target = payload.getComponentHosts(); + + overrideComponentHosts.forEach((component, hosts) -> { + if (StringUtils.isBlank(component) || hosts == null || hosts.isEmpty()) { + return; + } + List filtered = hosts.stream() + .filter(StringUtils::isNotBlank) + .map(String::trim) + .distinct() + .toList(); + if (!filtered.isEmpty()) { + target.put(component.toLowerCase(), new ArrayList<>(filtered)); + } + }); + } + private static void genGlobalPayload(JobCachePayload payload) { List repoPOList = repoDao.findAll(); Map> serviceConfigMap = getServiceConfigMap(0L); @@ -220,13 +254,11 @@ private static Map> getComponentHostMap(Long clusterId) { List componentPOList = componentDao.findByQuery(query); Map> hostMap = new HashMap<>(); componentPOList.forEach(x -> { - if (hostMap.containsKey(x.getName())) { - hostMap.get(x.getName()).add(x.getHostname()); - } else { - List list = new ArrayList<>(); - list.add(x.getHostname()); - hostMap.put(x.getName(), list); + if (StringUtils.isBlank(x.getHostname())) { + return; } + hostMap.computeIfAbsent(x.getName(), k -> new ArrayList<>()) + .add(x.getHostname().trim()); }); return hostMap; diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java index 64e838769..34e83aa5b 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/AbstractJob.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; @Slf4j @@ -75,6 +76,10 @@ public AbstractJob(JobContext jobContext) { if (CollectionUtils.isEmpty(stages)) { throw new ApiException(ApiExceptionEnum.JOB_HAS_NO_STAGES); } + + for (Stage stage : stages) { + stage.init(); + } } protected void injectBeans() { @@ -92,6 +97,26 @@ protected void beforeCreateStages() { protected abstract void createStages(); + protected Map> buildOverrideComponentHosts() { + Map> m = new java.util.HashMap<>(); + if (jobContext == null + || jobContext.getCommandDTO() == null + || jobContext.getCommandDTO().getComponentCommands() == null) { + return m; + } + for (org.apache.bigtop.manager.server.model.dto.command.ComponentCommandDTO cc : + jobContext.getCommandDTO().getComponentCommands()) { + if (cc == null + || cc.getComponentName() == null + || cc.getHostnames() == null + || cc.getHostnames().isEmpty()) { + continue; + } + m.put(cc.getComponentName().toLowerCase(), cc.getHostnames()); + } + return m; + } + @Override public void beforeRun() { jobPO.setState(JobState.PROCESSING.getName()); @@ -113,7 +138,7 @@ public void run() { .flatMap(List::stream) .distinct() .toList(); - JobCacheHelper.sendJobCache(jobPO.getId(), hostnames); + JobCacheHelper.sendJobCache(jobPO.getId(), hostnames, buildOverrideComponentHosts()); LinkedBlockingQueue queue = new LinkedBlockingQueue<>(stages); while (!queue.isEmpty()) { diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentConfigureJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentConfigureJob.java new file mode 100644 index 000000000..e5904c4f3 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentConfigureJob.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.job.component; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.helper.ComponentStageHelper; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; + +import java.util.List; +import java.util.Map; + +public class ComponentConfigureJob extends AbstractComponentJob { + + public ComponentConfigureJob(JobContext jobContext) { + super(jobContext); + } + + @Override + protected void createStages() { + CommandDTO commandDTO = jobContext.getCommandDTO(); + Map> componentHostsMap = getComponentHostsMap(); + + stages.addAll(ComponentStageHelper.createComponentStages(componentHostsMap, Command.CONFIGURE, commandDTO)); + } + + @Override + public String getName() { + return "Configure components"; + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentCustomJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentCustomJob.java new file mode 100644 index 000000000..5a68b1c46 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/component/ComponentCustomJob.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.job.component; + +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.command.stage.ComponentCustomStage; +import org.apache.bigtop.manager.server.command.stage.StageContext; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.ServiceDTO; +import org.apache.bigtop.manager.server.utils.StackUtils; + +import java.util.List; +import java.util.Map; + +/** + * Job for component custom command. + */ +public class ComponentCustomJob extends AbstractComponentJob { + + public ComponentCustomJob(JobContext jobContext) { + super(jobContext); + } + + @Override + protected void createStages() { + String customCommand = jobContext.getCommandDTO().getCustomCommand(); + if (customCommand == null || customCommand.isBlank()) { + throw new ServerException("customCommand must not be blank for CUSTOM command"); + } + + Map> componentHostsMap = getComponentHostsMap(); + for (Map.Entry> e : componentHostsMap.entrySet()) { + String componentName = e.getKey(); + if (componentName != null) { + componentName = componentName.toLowerCase(); + } + List hostnames = e.getValue(); + if (hostnames == null || hostnames.isEmpty()) { + continue; + } + + StageContext stageContext = StageContext.fromCommandDTO(jobContext.getCommandDTO()); + stageContext.setHostnames(hostnames); + stageContext.setComponentName(componentName); + + // AbstractComponentStage#createTaskContext() will call + // StackUtils.getServiceDTO(stageContext.getServiceName()). + // So serviceName must be set here. + ServiceDTO serviceDTO = StackUtils.getServiceDTOByComponentName(componentName); + stageContext.setServiceName(serviceDTO.getName()); + + stages.add(new ComponentCustomStage(stageContext, customCommand)); + } + } + + @Override + public String getName() { + return "Custom component command"; + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/EnableHdfsHaJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/EnableHdfsHaJob.java new file mode 100644 index 000000000..cb0faf746 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/EnableHdfsHaJob.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.job.service; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.common.utils.JsonUtils; +import org.apache.bigtop.manager.dao.po.ComponentPO; +import org.apache.bigtop.manager.dao.po.HostPO; +import org.apache.bigtop.manager.dao.po.ServicePO; +import org.apache.bigtop.manager.server.command.helper.ComponentStageHelper; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.command.stage.ComponentCustomStage; +import org.apache.bigtop.manager.server.command.stage.StageContext; +import org.apache.bigtop.manager.server.command.stage.WaitPortStage; +import org.apache.bigtop.manager.server.command.stage.WaitUrlStage; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.dto.command.ComponentCommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableHdfsHaReq; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Slf4j +public class EnableHdfsHaJob extends AbstractServiceJob { + + private static final String HADOOP_SERVICE_NAME = "hadoop"; + private static final String NAMENODE_COMPONENT_NAME = "namenode"; + private static final String SECONDARY_NAMENODE_COMPONENT_NAME = "secondarynamenode"; + + private static final String CUSTOM_COMMAND_PREFIX = "enableHdfsHa:"; + + private static String getCodeSource(Class clazz) { + try { + if (clazz == null + || clazz.getProtectionDomain() == null + || clazz.getProtectionDomain().getCodeSource() == null) { + return "null"; + } + return String.valueOf(clazz.getProtectionDomain().getCodeSource().getLocation()); + } catch (Exception e) { + return "error:" + e.getMessage(); + } + } + + public EnableHdfsHaJob(JobContext jobContext) { + super(jobContext); + } + + @Override + protected void createStages() { + CommandDTO commandDTO = jobContext.getCommandDTO(); + EnableHdfsHaReq req = parseReq(commandDTO); + validateReq(req); + + Map> componentHostsMap = getComponentHostsMap(); + Map> activeNN = Map.of(NAMENODE_COMPONENT_NAME, List.of(req.getActiveNameNodeHost())); + Map> standbyNN = Map.of(NAMENODE_COMPONENT_NAME, List.of(req.getStandbyNameNodeHost())); + + // 1. Prepare JournalNodes and ZKFCs (install/configure) + Map> jn = pick(componentHostsMap, "journalnode"); + stages.addAll(ComponentStageHelper.createComponentStages(jn, Command.ADD, commandDTO)); + stages.addAll(ComponentStageHelper.createComponentStages(jn, Command.CONFIGURE, commandDTO)); + stages.addAll(ComponentStageHelper.createComponentStages(jn, Command.START, commandDTO)); + stages.add(new WaitPortStage( + createStageContext("journalnode", req.getJournalNodeHosts(), commandDTO), + req.getJournalNodeHosts(), + 8485, + 10 * 60_000L, + 1000L)); + + Map> zkfcHosts = pick(componentHostsMap, "zkfc"); + stages.addAll(ComponentStageHelper.createComponentStages(zkfcHosts, Command.ADD, commandDTO)); + stages.addAll(ComponentStageHelper.createComponentStages(zkfcHosts, Command.CONFIGURE, commandDTO)); + + // 2. Initialize Active NameNode (NN1) + stages.addAll(ComponentStageHelper.createComponentStages(activeNN, Command.STOP, commandDTO)); + stages.add(new ComponentCustomStage( + createStageContext("namenode", List.of(req.getActiveNameNodeHost()), commandDTO), + "initializeSharedEdits")); + stages.add(new ComponentCustomStage( + createStageContext("zkfc", List.of(req.getActiveNameNodeHost()), commandDTO), "formatZk")); + + // 3. Start Active NameNode and its ZKFC, then wait for it to become active + stages.addAll(ComponentStageHelper.createComponentStages(activeNN, Command.START, commandDTO)); + stages.addAll(ComponentStageHelper.createComponentStages( + Map.of("zkfc", List.of(req.getActiveNameNodeHost())), Command.START, commandDTO)); + stages.add(new WaitUrlStage( + createStageContext("namenode", List.of(req.getActiveNameNodeHost()), commandDTO), + List.of(req.getActiveNameNodeHost()), + "http://{host}:9870/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus", + "active", + 10 * 60_000L, + 3000L)); + + // 4. Initialize and Start Standby NameNode (NN2) and its ZKFC + stages.addAll(ComponentStageHelper.createComponentStages(standbyNN, Command.ADD, commandDTO)); + stages.addAll(ComponentStageHelper.createComponentStages(standbyNN, Command.CONFIGURE, commandDTO)); + stages.add(new ComponentCustomStage( + createStageContext("namenode", List.of(req.getStandbyNameNodeHost()), commandDTO), "bootstrapStandby")); + stages.addAll(ComponentStageHelper.createComponentStages(standbyNN, Command.START, commandDTO)); + stages.addAll(ComponentStageHelper.createComponentStages( + Map.of("zkfc", List.of(req.getStandbyNameNodeHost())), Command.START, commandDTO)); + + // 5. Finalize + // Convert secondarynamenode on standby host to standby namenode + removeSecondaryNameNode(req.getStandbyNameNodeHost()); + + // Ensure standby host has namenode component record + ensureStandbyNameNodeComponent(req.getStandbyNameNodeHost()); + + Map> dn = pick(componentHostsMap, "datanode"); + stages.addAll(ComponentStageHelper.createComponentStages(dn, Command.RESTART, commandDTO)); + + Map> yarn = pick(componentHostsMap, "resourcemanager", "nodemanager", "history_server"); + stages.addAll(ComponentStageHelper.createComponentStages(yarn, Command.CONFIGURE, commandDTO)); + + if (stages.isEmpty()) { + throw new IllegalStateException("EnableHdfsHaJob has no stages to execute"); + } + } + + @Override + public String getName() { + return "Enable HDFS HA"; + } + + @Override + protected Map> getComponentHostsMap() { + List ccs = jobContext.getCommandDTO().getComponentCommands(); + if (ccs == null) { + return new HashMap<>(); + } + return ccs.stream() + .collect(Collectors.toMap( + cc -> cc.getComponentName().toLowerCase(), + cc -> cc.getHostnames() == null ? List.of() : cc.getHostnames(), + (a, b) -> a)); + } + + private static Map> pick(Map> all, String... componentNames) { + Map> m = new HashMap<>(); + for (String name : componentNames) { + List hosts = all.get(name); + if (hosts != null && !hosts.isEmpty()) { + m.put(name, hosts); + } + } + return m; + } + + private EnableHdfsHaReq parseReq(CommandDTO commandDTO) { + String cc = commandDTO.getCustomCommand(); + if (StringUtils.isBlank(cc) || !cc.startsWith(CUSTOM_COMMAND_PREFIX)) { + throw new ServerException( + "EnableHdfsHaJob requires customCommand with prefix '" + CUSTOM_COMMAND_PREFIX + "'"); + } + String json = cc.substring(CUSTOM_COMMAND_PREFIX.length()); + if (StringUtils.isBlank(json)) { + throw new ServerException("EnableHdfsHaReq payload is empty in customCommand"); + } + try { + return JsonUtils.readFromString(json, EnableHdfsHaReq.class); + } catch (Exception e) { + throw new ServerException("Failed to parse EnableHdfsHaReq from customCommand: " + e.getMessage()); + } + } + + private void validateReq(EnableHdfsHaReq req) { + if (req == null) { + throw new ServerException("request is required"); + } + if (StringUtils.isBlank(req.getActiveNameNodeHost()) || StringUtils.isBlank(req.getStandbyNameNodeHost())) { + throw new ServerException("activeNameNodeHost/standbyNameNodeHost must not be blank"); + } + if (req.getActiveNameNodeHost().equalsIgnoreCase(req.getStandbyNameNodeHost())) { + throw new ServerException("activeNameNodeHost and standbyNameNodeHost must be different"); + } + if (StringUtils.isBlank(req.getNameservice())) { + throw new ServerException("nameservice must not be blank"); + } + if (CollectionUtils.isEmpty(req.getJournalNodeHosts()) + || req.getJournalNodeHosts().size() < 3) { + throw new ServerException("journalNodeHosts must be provided and have at least 3 nodes"); + } + if (CollectionUtils.isEmpty(req.getZkfcHosts())) { + throw new ServerException("zkfcHosts must not be empty"); + } + } + + private StageContext createStageContext(String componentName, List hostnames, CommandDTO commandDTO) { + StageContext stageContext = StageContext.fromCommandDTO(commandDTO); + stageContext.setHostnames(hostnames); + stageContext.setServiceName(HADOOP_SERVICE_NAME); + stageContext.setComponentName(componentName); + return stageContext; + } + + private void removeSecondaryNameNode(String hostname) { + log.info("Attempting to remove Secondary NameNode on host: {}", hostname); + ComponentPO secondaryNameNode = componentDao.findByNameAndHostname(SECONDARY_NAMENODE_COMPONENT_NAME, hostname); + if (secondaryNameNode != null) { + log.info( + "Found Secondary NameNode component with ID {} on host {}. Deleting it.", + secondaryNameNode.getId(), + hostname); + componentDao.deleteById(secondaryNameNode.getId()); + } else { + log.info("No Secondary NameNode component found on host {}. Nothing to remove.", hostname); + } + } + + private void ensureStandbyNameNodeComponent(String hostname) { + log.info("Ensuring NameNode component exists on standby host: {}", hostname); + ComponentPO nameNode = componentDao.findByNameAndHostname(NAMENODE_COMPONENT_NAME, hostname); + if (nameNode == null) { + log.info("NameNode component not found on standby host {}. Creating a new entry.", hostname); + Long clusterId = jobContext.getCommandDTO().getClusterId(); + HostPO hostPO = hostDao.findByHostname(hostname); + if (hostPO == null) { + throw new ServerException("Host not found in database: " + hostname); + } + ServicePO servicePO = serviceDao.findByClusterIdAndName(clusterId, HADOOP_SERVICE_NAME); + if (servicePO == null) { + throw new ServerException("Service 'hadoop' not found for clusterId: " + clusterId); + } + + ComponentPO standbyNameNodePO = new ComponentPO(); + standbyNameNodePO.setName(NAMENODE_COMPONENT_NAME); + standbyNameNodePO.setDisplayName("NameNode"); + standbyNameNodePO.setClusterId(clusterId); + standbyNameNodePO.setHostId(hostPO.getId()); + standbyNameNodePO.setServiceId(servicePO.getId()); + standbyNameNodePO.setStatus(org.apache.bigtop.manager.server.enums.HealthyStatusEnum.UNKNOWN.getCode()); + componentDao.save(standbyNameNodePO); + log.info("Successfully created NameNode component entry for standby host {}.", hostname); + } else { + log.info("NameNode component already exists on standby host {}. Nothing to do.", hostname); + } + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/EnableYarnRmHaJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/EnableYarnRmHaJob.java new file mode 100644 index 000000000..d50f04f3c --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/EnableYarnRmHaJob.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.job.service; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.helper.ComponentStageHelper; +import org.apache.bigtop.manager.server.command.job.JobContext; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; + +import java.util.List; +import java.util.Map; + +public class EnableYarnRmHaJob extends AbstractServiceJob { + + public EnableYarnRmHaJob(JobContext jobContext) { + super(jobContext); + } + + @Override + protected void createStages() { + CommandDTO commandDTO = jobContext.getCommandDTO(); + Map> componentHostsMap = getComponentHostsMap(); + + // 1. CONFIGURE resourcemanager + stages.addAll(ComponentStageHelper.createComponentStages(componentHostsMap, Command.CONFIGURE, commandDTO)); + // 2. STOP resourcemanager + stages.addAll(ComponentStageHelper.createComponentStages(componentHostsMap, Command.STOP, commandDTO)); + // 3. START resourcemanager + stages.addAll(ComponentStageHelper.createComponentStages(componentHostsMap, Command.START, commandDTO)); + + if (stages.isEmpty()) { + throw new IllegalStateException("EnableYarnRmHaJob has no stages to execute. Check componentHostsMap"); + } + } + + @Override + public String getName() { + return "Enable YARN RM HA"; + } + + @Override + protected Map> getComponentHostsMap() { + return jobContext.getCommandDTO().getComponentCommands().stream() + .collect(java.util.stream.Collectors.toMap( + cc -> cc.getComponentName().toLowerCase(), cc -> cc.getHostnames())); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java index 4d3d286fe..ea50c6df3 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/job/service/ServiceAddJob.java @@ -43,11 +43,14 @@ import org.apache.commons.collections4.CollectionUtils; +import lombok.extern.slf4j.Slf4j; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +@Slf4j public class ServiceAddJob extends AbstractServiceJob { public ServiceAddJob(JobContext jobContext) { @@ -159,17 +162,28 @@ private void saveService(ServiceCommandDTO serviceCommand) { // Persist services StackDTO stackDTO = StackUtils.getServiceStack(serviceName); ServiceDTO serviceDTO = StackUtils.getServiceDTO(serviceName); - ServicePO servicePO = ServiceConverter.INSTANCE.fromDTO2PO(serviceDTO); - servicePO.setClusterId(clusterId); - servicePO.setStack(StackUtils.getFullStackName(stackDTO)); - servicePO.setStatus(HealthyStatusEnum.UNHEALTHY.getCode()); - serviceDao.save(servicePO); + ServicePO servicePO = serviceDao.findByClusterIdAndName(clusterId, serviceName); + if (servicePO == null) { + servicePO = ServiceConverter.INSTANCE.fromDTO2PO(serviceDTO); + servicePO.setClusterId(clusterId); + servicePO.setStack(StackUtils.getFullStackName(stackDTO)); + servicePO.setStatus(HealthyStatusEnum.UNHEALTHY.getCode()); + serviceDao.save(servicePO); + } else { + log.warn("Service [{}] already exists in cluster [{}], skipping creation.", serviceName, clusterId); + } // Persist components List componentPOList = new ArrayList<>(); for (ComponentHostDTO componentHostDTO : serviceCommand.getComponentHosts()) { String componentName = componentHostDTO.getComponentName(); - List hostPOList = hostDao.findAllByHostnames(componentHostDTO.getHostnames()); + List hostnames = componentHostDTO.getHostnames(); + if (CollectionUtils.isEmpty(hostnames)) { + log.info("Skipping component [{}] because no hosts are assigned.", componentName); + continue; + } + + List hostPOList = hostDao.findAllByHostnames(hostnames); for (HostPO hostPO : hostPOList) { ComponentDTO componentDTO = StackUtils.getComponentDTO(componentName); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStage.java index 5908cbad2..f025c3f29 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStage.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStage.java @@ -19,8 +19,11 @@ package org.apache.bigtop.manager.server.command.stage; import org.apache.bigtop.manager.dao.po.ClusterPO; +import org.apache.bigtop.manager.dao.po.ServicePO; import org.apache.bigtop.manager.dao.repository.ClusterDao; +import org.apache.bigtop.manager.dao.repository.ServiceDao; import org.apache.bigtop.manager.server.command.task.TaskContext; +import org.apache.bigtop.manager.server.holder.SessionUserHolder; import org.apache.bigtop.manager.server.holder.SpringContextHolder; import org.apache.bigtop.manager.server.model.dto.ComponentDTO; import org.apache.bigtop.manager.server.model.dto.ServiceDTO; @@ -30,6 +33,8 @@ public abstract class AbstractComponentStage extends AbstractStage { private ClusterDao clusterDao; + private ServiceDao serviceDao; + private ClusterPO clusterPO; public AbstractComponentStage(StageContext stageContext) { @@ -41,6 +46,7 @@ protected void injectBeans() { super.injectBeans(); this.clusterDao = SpringContextHolder.getBean(ClusterDao.class); + this.serviceDao = SpringContextHolder.getBean(ServiceDao.class); } @Override @@ -63,9 +69,12 @@ protected TaskContext createTaskContext(String hostname) { ComponentDTO componentDTO = StackUtils.getComponentDTO(stageContext.getComponentName()); TaskContext taskContext = new TaskContext(); + taskContext.setOperatorId(SessionUserHolder.getUserId()); taskContext.setHostname(hostname); taskContext.setClusterId(clusterPO == null ? null : clusterPO.getId()); taskContext.setClusterName(clusterPO == null ? null : clusterPO.getName()); + ServicePO servicePO = serviceDao.findByClusterIdAndName(taskContext.getClusterId(), serviceDTO.getName()); + taskContext.setServiceId(servicePO == null ? null : servicePO.getId()); taskContext.setServiceName(serviceDTO.getName()); taskContext.setComponentName(componentDTO.getName()); taskContext.setComponentDisplayName(componentDTO.getDisplayName()); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractStage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractStage.java index 7aeab038d..77d5a9dda 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractStage.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/AbstractStage.java @@ -57,7 +57,10 @@ public AbstractStage(StageContext stageContext) { injectBeans(); beforeCreateTasks(); + } + @Override + public void init() { for (String hostname : stageContext.getHostnames()) { tasks.add(createTask(hostname)); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/ComponentCustomStage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/ComponentCustomStage.java new file mode 100644 index 000000000..03ce7f632 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/ComponentCustomStage.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.stage; + +import org.apache.bigtop.manager.server.command.task.ComponentCustomTask; +import org.apache.bigtop.manager.server.command.task.Task; +import org.apache.bigtop.manager.server.utils.StackUtils; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +/** + * Stage for component custom command. + */ +@Slf4j +public class ComponentCustomStage extends AbstractComponentStage { + + private final String customCommand; + + public ComponentCustomStage(StageContext stageContext, String customCommand) { + super(stageContext); + if (customCommand == null || customCommand.isBlank()) { + throw new IllegalArgumentException( + "customCommand must not be blank for ComponentCustomStage, stageContext=" + stageContext); + } + this.customCommand = customCommand; + } + + @Override + protected Task createTask(String hostname) { + log.info( + "ComponentCustomStage.createTask: this={}, customCommand='{}', len={}, blank={}, stageContext={}, hostname={}", + System.identityHashCode(this), + customCommand, + customCommand == null ? null : customCommand.length(), + customCommand == null ? null : customCommand.isBlank(), + getStageContext(), + hostname); + return new ComponentCustomTask(createTaskContext(hostname), customCommand); + } + + @Override + public String getName() { + String componentDisplay = + StackUtils.getComponentDTO(stageContext.getComponentName()).getDisplayName(); + String stageName = String.format("Custom: %s (%s)", componentDisplay, customCommand); + // Limit the length to prevent DataTruncation + return StringUtils.abbreviate(stageName, 32); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/Stage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/Stage.java index 91f55b8be..8f05ed9f3 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/Stage.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/Stage.java @@ -25,6 +25,8 @@ public interface Stage { + void init(); + String getName(); void beforeRun(); diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/WaitPortStage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/WaitPortStage.java new file mode 100644 index 000000000..38410307c --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/WaitPortStage.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.stage; + +import org.apache.bigtop.manager.server.command.task.PortCheckTask; +import org.apache.bigtop.manager.server.command.task.Task; +import org.apache.bigtop.manager.server.command.task.TaskContext; + +import java.util.List; + +/** + * A stage to wait for a given port to become available on a list of hosts. + */ +public class WaitPortStage extends AbstractStage { + + private final int port; + private final long timeoutMs; + private final long intervalMs; + + public WaitPortStage(StageContext stageContext, List hosts, int port, long timeoutMs, long intervalMs) { + super(stageContext); + this.port = port; + this.timeoutMs = timeoutMs; + this.intervalMs = intervalMs; + this.stageContext.setHostnames(hosts); + } + + @Override + protected void beforeCreateTasks() { + // No-op + } + + @Override + protected Task createTask(String hostname) { + TaskContext taskContext = new TaskContext(); + taskContext.setClusterId(stageContext.getClusterId()); + taskContext.setClusterName(stageContext.getClusterName()); + taskContext.setHostname(hostname); + taskContext.setServiceName(stageContext.getServiceName()); + taskContext.setComponentName(stageContext.getComponentName()); + taskContext.setComponentDisplayName(stageContext.getComponentName()); + taskContext.setUserGroup(stageContext.getUserGroup()); + taskContext.setRootDir(stageContext.getRootDir()); + + return new PortCheckTask(taskContext, hostname, port, timeoutMs, intervalMs); + } + + @Override + public String getName() { + return "Wait ports " + port; + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/WaitUrlStage.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/WaitUrlStage.java new file mode 100644 index 000000000..95ead0751 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/WaitUrlStage.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.stage; + +import org.apache.bigtop.manager.server.command.task.Task; +import org.apache.bigtop.manager.server.command.task.TaskContext; +import org.apache.bigtop.manager.server.command.task.UrlCheckTask; + +import java.util.List; + +/** + * Server-side stage to wait for a URL to contain expected content. + */ +public class WaitUrlStage extends AbstractStage { + + private final String urlTemplate; + private final String expectedContent; + private final long timeoutMs; + private final long intervalMs; + + public WaitUrlStage( + StageContext stageContext, + List hosts, + String urlTemplate, + String expectedContent, + long timeoutMs, + long intervalMs) { + super(stageContext); + this.urlTemplate = urlTemplate; + this.expectedContent = expectedContent; + this.timeoutMs = timeoutMs; + this.intervalMs = intervalMs; + this.stageContext.setHostnames(hosts); + } + + @Override + protected void beforeCreateTasks() { + // No-op + } + + @Override + protected Task createTask(String hostname) { + TaskContext taskContext = new TaskContext(); + taskContext.setClusterId(stageContext.getClusterId()); + taskContext.setClusterName(stageContext.getClusterName()); + taskContext.setHostname(hostname); + taskContext.setServiceName(stageContext.getServiceName()); + taskContext.setComponentName(stageContext.getComponentName()); + taskContext.setComponentDisplayName(stageContext.getComponentName()); + taskContext.setUserGroup(stageContext.getUserGroup()); + taskContext.setRootDir(stageContext.getRootDir()); + + String url = urlTemplate.replace("{host}", hostname); + return new UrlCheckTask(taskContext, url, expectedContent, timeoutMs, intervalMs); + } + + @Override + public String getName() { + return "Wait URL contains '" + expectedContent + "'"; + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/AbstractTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/AbstractTask.java index 818b868ab..93febe222 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/AbstractTask.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/AbstractTask.java @@ -25,6 +25,7 @@ import org.apache.bigtop.manager.dao.po.TaskPO; import org.apache.bigtop.manager.dao.repository.HostDao; import org.apache.bigtop.manager.dao.repository.TaskDao; +import org.apache.bigtop.manager.server.holder.SessionUserHolder; import org.apache.bigtop.manager.server.holder.SpringContextHolder; import lombok.extern.slf4j.Slf4j; @@ -62,6 +63,9 @@ protected String getCustomCommand() { @Override public void beforeRun() { + if (taskContext.getOperatorId() != null && SessionUserHolder.getUserId() == null) { + SessionUserHolder.setUserId(taskContext.getOperatorId()); + } taskPO.setState(JobState.PROCESSING.getName()); taskDao.partialUpdateById(taskPO); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentAddTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentAddTask.java index 53469e186..211a6bcbb 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentAddTask.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentAddTask.java @@ -21,11 +21,21 @@ import org.apache.bigtop.manager.common.constants.ComponentCategories; import org.apache.bigtop.manager.common.enums.Command; import org.apache.bigtop.manager.dao.po.ComponentPO; +import org.apache.bigtop.manager.dao.po.HostPO; import org.apache.bigtop.manager.dao.query.ComponentQuery; import org.apache.bigtop.manager.server.enums.HealthyStatusEnum; +import org.apache.bigtop.manager.server.exception.ServerException; import org.apache.bigtop.manager.server.model.dto.ComponentDTO; +import org.apache.bigtop.manager.server.model.dto.StackDTO; import org.apache.bigtop.manager.server.utils.StackUtils; +import org.apache.commons.collections4.CollectionUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j public class ComponentAddTask extends AbstractComponentTask { public ComponentAddTask(TaskContext taskContext) { @@ -48,9 +58,42 @@ public void onSuccess() { .hostname(hostname) .name(componentName) .build(); - ComponentPO componentPO = componentDao.findByQuery(componentQuery).get(0); + List componentPOList = componentDao.findByQuery(componentQuery); + ComponentPO componentPO; + boolean isNew = CollectionUtils.isEmpty(componentPOList); + if (isNew) { + log.info("Component [{}] on host [{}] not found in DB, creating new entry.", componentName, hostname); + componentPO = new ComponentPO(); + } else { + componentPO = componentPOList.get(0); + } ComponentDTO componentDTO = StackUtils.getComponentDTO(componentName); + + // If new or existing but incomplete, fill in the details + if (isNew || componentPO.getHostId() == null || componentPO.getDisplayName() == null) { + log.info( + "Populating full component details for component [{}] on host [{}]. New entry: {}", + componentName, + hostname, + isNew); + HostPO hostPO = hostDao.findByHostname(hostname); + if (hostPO == null) { + throw new ServerException("Host not found in database: " + hostname); + } + StackDTO stackDTO = StackUtils.getServiceStack(taskContext.getServiceName()); + + componentPO.setName(componentName); + componentPO.setDisplayName(componentDTO.getDisplayName()); + componentPO.setHostname(hostname); + componentPO.setClusterId(taskContext.getClusterId()); + componentPO.setHostId(hostPO.getId()); + componentPO.setServiceId(taskContext.getServiceId()); + componentPO.setServiceName(taskContext.getServiceName()); + componentPO.setServiceUser(taskContext.getServiceUser()); + componentPO.setStack(stackDTO.getStackName() + "-" + stackDTO.getStackVersion()); + } + if (componentDTO.getCategory().equalsIgnoreCase(ComponentCategories.CLIENT)) { // Client components should always be healthy after added componentPO.setStatus(HealthyStatusEnum.HEALTHY.getCode()); @@ -59,7 +102,11 @@ public void onSuccess() { componentPO.setStatus(HealthyStatusEnum.UNHEALTHY.getCode()); } - componentDao.partialUpdateById(componentPO); + if (componentPO.getId() == null) { + componentDao.save(componentPO); + } else { + componentDao.partialUpdateById(componentPO); + } } @Override diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentCustomTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentCustomTask.java new file mode 100644 index 000000000..623628e04 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentCustomTask.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.task; + +import org.apache.bigtop.manager.common.enums.Command; + +import lombok.extern.slf4j.Slf4j; + +/** + * Component custom command task. + * + * It will send Command=CUSTOM and customCommand to agent. + */ +@Slf4j +public class ComponentCustomTask extends AbstractComponentTask { + + private final String customCommand; + + public ComponentCustomTask(TaskContext taskContext, String customCommand) { + super(taskContext); + if (customCommand == null || customCommand.isBlank()) { + throw new IllegalArgumentException("customCommand must not be blank for ComponentCustomTask"); + } + this.customCommand = customCommand; + } + + @Override + protected Command getCommand() { + return Command.CUSTOM; + } + + @Override + protected String getCustomCommand() { + return customCommand; + } + + @Override + public String getName() { + return "Custom " + taskContext.getComponentDisplayName() + " (" + String.valueOf(customCommand) + ") on " + + taskContext.getHostname(); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStartTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStartTask.java index 44620e04a..b116da570 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStartTask.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStartTask.java @@ -20,9 +20,21 @@ import org.apache.bigtop.manager.common.enums.Command; import org.apache.bigtop.manager.dao.po.ComponentPO; +import org.apache.bigtop.manager.dao.po.HostPO; import org.apache.bigtop.manager.dao.query.ComponentQuery; import org.apache.bigtop.manager.server.enums.HealthyStatusEnum; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.ComponentDTO; +import org.apache.bigtop.manager.server.model.dto.StackDTO; +import org.apache.bigtop.manager.server.utils.StackUtils; +import org.apache.commons.collections4.CollectionUtils; + +import lombok.extern.slf4j.Slf4j; + +import java.util.List; + +@Slf4j public class ComponentStartTask extends AbstractComponentTask { public ComponentStartTask(TaskContext taskContext) { @@ -45,9 +57,52 @@ public void onSuccess() { .hostname(hostname) .name(componentName) .build(); - ComponentPO componentPO = componentDao.findByQuery(componentQuery).get(0); + List componentPOList = componentDao.findByQuery(componentQuery); + ComponentPO componentPO; + boolean isNew = CollectionUtils.isEmpty(componentPOList); + if (isNew) { + log.warn( + "Component [{}] on host [{}] not found in DB during START, creating new entry. This may indicate an issue in the ADD task.", + componentName, + hostname); + componentPO = new ComponentPO(); + } else { + componentPO = componentPOList.get(0); + } + + ComponentDTO componentDTO = StackUtils.getComponentDTO(componentName); + + // If new or existing but incomplete, fill in the details + if (isNew || componentPO.getHostId() == null || componentPO.getDisplayName() == null) { + log.info( + "Populating full component details for component [{}] on host [{}]. New entry: {}", + componentName, + hostname, + isNew); + HostPO hostPO = hostDao.findByHostname(hostname); + if (hostPO == null) { + throw new ServerException("Host not found in database: " + hostname); + } + StackDTO stackDTO = StackUtils.getServiceStack(taskContext.getServiceName()); + + componentPO.setName(componentName); + componentPO.setDisplayName(componentDTO.getDisplayName()); + componentPO.setHostname(hostname); + componentPO.setClusterId(taskContext.getClusterId()); + componentPO.setHostId(hostPO.getId()); + componentPO.setServiceId(taskContext.getServiceId()); + componentPO.setServiceName(taskContext.getServiceName()); + componentPO.setServiceUser(taskContext.getServiceUser()); + componentPO.setStack(stackDTO.getStackName() + "-" + stackDTO.getStackVersion()); + } + componentPO.setStatus(HealthyStatusEnum.HEALTHY.getCode()); - componentDao.partialUpdateById(componentPO); + + if (componentPO.getId() == null) { + componentDao.save(componentPO); + } else { + componentDao.partialUpdateById(componentPO); + } } @Override diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStopTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStopTask.java index 73e0f4c42..680e0c649 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStopTask.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/ComponentStopTask.java @@ -45,7 +45,11 @@ public void onSuccess() { .hostname(hostname) .name(componentName) .build(); - ComponentPO componentPO = componentDao.findByQuery(componentQuery).get(0); + java.util.List componentPOList = componentDao.findByQuery(componentQuery); + if (componentPOList == null || componentPOList.isEmpty()) { + return; + } + ComponentPO componentPO = componentPOList.get(0); componentPO.setStatus(HealthyStatusEnum.UNHEALTHY.getCode()); componentDao.partialUpdateById(componentPO); } diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/PortCheckTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/PortCheckTask.java new file mode 100644 index 000000000..6a71700c9 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/PortCheckTask.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.task; + +import org.apache.bigtop.manager.common.enums.Command; + +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; +import java.net.Socket; + +/** + * Server-side task: wait for a TCP port to become reachable on a host. + */ +@Slf4j +public class PortCheckTask extends AbstractTask { + + private final String targetHost; + private final int targetPort; + private final long timeoutMs; + private final long intervalMs; + + public PortCheckTask(TaskContext taskContext, String targetHost, int targetPort, long timeoutMs, long intervalMs) { + super(taskContext); + this.targetHost = targetHost; + this.targetPort = targetPort; + this.timeoutMs = timeoutMs; + this.intervalMs = intervalMs; + } + + @Override + protected Command getCommand() { + // Server-side only + return Command.CUSTOM; + } + + @Override + protected String getCustomCommand() { + return "port_check"; + } + + @Override + protected Boolean doRun(String hostname, Integer grpcPort) { + // Not used + return true; + } + + @Override + public Boolean run() { + boolean ok = waitForPortOpen(targetHost, targetPort, timeoutMs, intervalMs); + if (ok) { + onSuccess(); + } else { + onFailure(); + } + return ok; + } + + private static boolean waitForPortOpen(String host, int port, long timeoutMs, long intervalMs) { + long deadline = System.currentTimeMillis() + timeoutMs; + Throwable last = null; + + while (System.currentTimeMillis() < deadline) { + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(host, port), 2000); + return true; + } catch (Throwable t) { + last = t; + try { + Thread.sleep(intervalMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + } + + log.warn("Port check timeout for {}:{}, lastError={}", host, port, last == null ? null : last.getMessage()); + return false; + } + + @Override + public String getName() { + return "Wait port " + targetHost + ":" + targetPort; + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/TaskContext.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/TaskContext.java index 92e99e4d0..4b8ef1b9d 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/TaskContext.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/TaskContext.java @@ -31,6 +31,10 @@ public class TaskContext { private String hostname; + private Long operatorId; + + private Long serviceId; + private String serviceName; private String serviceUser; diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/UrlCheckTask.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/UrlCheckTask.java new file mode 100644 index 000000000..2cf6c6e27 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/task/UrlCheckTask.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.task; + +import org.apache.bigtop.manager.common.enums.Command; + +import lombok.extern.slf4j.Slf4j; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.stream.Collectors; + +@Slf4j +public class UrlCheckTask extends AbstractTask { + + private final String targetUrl; + private final String expectedContent; + private final long timeoutMs; + private final long intervalMs; + + public UrlCheckTask( + TaskContext taskContext, String targetUrl, String expectedContent, long timeoutMs, long intervalMs) { + super(taskContext); + this.targetUrl = targetUrl; + this.expectedContent = expectedContent; + this.timeoutMs = timeoutMs; + this.intervalMs = intervalMs; + } + + @Override + protected Command getCommand() { + return Command.CUSTOM; + } + + @Override + protected String getCustomCommand() { + return "url_check"; + } + + @Override + protected Boolean doRun(String hostname, Integer grpcPort) { + return true; // Server-side task + } + + @Override + public Boolean run() { + log.info( + "Starting URL check for [{}] with timeout {}ms, waiting for content: [{}]", + targetUrl, + timeoutMs, + expectedContent); + boolean isReady = waitForUrlContent(); + if (isReady) { + log.info("URL [{}] is now ready.", targetUrl); + onSuccess(); + } else { + log.error("URL check timed out for [{}]. It did not become ready within {}ms.", targetUrl, timeoutMs); + onFailure(); + } + return isReady; + } + + private boolean waitForUrlContent() { + long deadline = System.currentTimeMillis() + timeoutMs; + Throwable lastException = null; + + while (System.currentTimeMillis() < deadline) { + HttpURLConnection connection = null; + try { + URL url = new URL(targetUrl); + connection = (HttpURLConnection) url.openConnection(); + connection.setConnectTimeout(2000); + connection.setReadTimeout(2000); + connection.setRequestMethod("GET"); + + if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) { + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String response = reader.lines().collect(Collectors.joining()); + if (response.contains(expectedContent)) { + return true; + } + } + } + } catch (Exception e) { + lastException = e; + } finally { + if (connection != null) { + connection.disconnect(); + } + } + + try { + Thread.sleep(intervalMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("URL check for [{}] was interrupted.", targetUrl); + return false; + } + } + + log.warn( + "URL check failed for [{}]. Last error: {}", + targetUrl, + lastException != null ? lastException.getMessage() : "N/A"); + return false; + } + + @Override + public String getName() { + return "Wait for URL " + targetUrl; + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/validator/ComponentCustomValidator.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/validator/ComponentCustomValidator.java new file mode 100644 index 000000000..fd7b87cbf --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/validator/ComponentCustomValidator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.command.validator; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.server.command.CommandIdentifier; +import org.apache.bigtop.manager.server.enums.ApiExceptionEnum; +import org.apache.bigtop.manager.server.enums.CommandLevel; +import org.apache.bigtop.manager.server.exception.ApiException; + +import org.apache.commons.lang3.StringUtils; + +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Validate component CUSTOM command. + */ +@Component +public class ComponentCustomValidator implements CommandValidator { + + @Override + public List getCommandIdentifiers() { + return List.of(new CommandIdentifier(CommandLevel.COMPONENT, Command.CUSTOM)); + } + + @Override + public void validate(ValidatorContext context) { + String customCommand = context.getCommandDTO().getCustomCommand(); + if (StringUtils.isBlank(customCommand)) { + throw new ApiException(ApiExceptionEnum.OPERATION_FAILED, "customCommand must not be blank"); + } + if (context.getCommandDTO().getComponentCommands() == null + || context.getCommandDTO().getComponentCommands().isEmpty()) { + throw new ApiException(ApiExceptionEnum.OPERATION_FAILED, "componentCommands must not be empty"); + } + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/HdfsHaController.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/HdfsHaController.java new file mode 100644 index 000000000..af4025dc2 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/HdfsHaController.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.controller; + +import org.apache.bigtop.manager.server.annotations.Audit; +import org.apache.bigtop.manager.server.model.req.EnableHdfsHaReq; +import org.apache.bigtop.manager.server.model.vo.CommandVO; +import org.apache.bigtop.manager.server.service.CommandService; +import org.apache.bigtop.manager.server.service.HdfsHaService; +import org.apache.bigtop.manager.server.utils.ResponseEntity; + +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; + +import jakarta.annotation.Resource; +import jakarta.validation.Valid; + +@Tag(name = "HDFS HA Controller") +@RestController +@RequestMapping("/clusters/{clusterId}/services/{serviceId}/actions") +public class HdfsHaController { + + @Resource + private HdfsHaService hdfsHaService; + + @Resource + private CommandService commandService; + + @Audit + @Operation(summary = "enableHdfsHa", description = "Enable HDFS HA for an existing single NameNode cluster") + @PostMapping("/enable-hdfs-ha") + public ResponseEntity enableHdfsHa( + @PathVariable Long clusterId, @PathVariable Long serviceId, @RequestBody @Valid EnableHdfsHaReq req) { + return ResponseEntity.success(hdfsHaService.buildEnableHdfsHaCommand(clusterId, serviceId, req)); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/YarnHaController.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/YarnHaController.java new file mode 100644 index 000000000..25136d9e6 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/YarnHaController.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.controller; + +import org.apache.bigtop.manager.server.annotations.Audit; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableYarnRmHaReq; +import org.apache.bigtop.manager.server.model.vo.CommandVO; +import org.apache.bigtop.manager.server.service.CommandService; +import org.apache.bigtop.manager.server.service.YarnHaService; +import org.apache.bigtop.manager.server.utils.ResponseEntity; + +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; + +import jakarta.annotation.Resource; +import jakarta.validation.Valid; + +@Tag(name = "YARN HA Controller") +@RestController +@RequestMapping("/clusters/{clusterId}/services/{serviceId}/actions") +public class YarnHaController { + + @Resource + private YarnHaService yarnHaService; + + @Resource + private CommandService commandService; + + @Audit + @Operation(summary = "enableYarnRmHa", description = "Enable YARN ResourceManager HA") + @PostMapping("/enable-yarn-rm-ha") + public ResponseEntity enableYarnRmHa( + @PathVariable Long clusterId, @PathVariable Long serviceId, @RequestBody @Valid EnableYarnRmHaReq req) { + + CommandDTO commandDTO = yarnHaService.buildEnableYarnRmHaCommand(clusterId, serviceId, req); + return ResponseEntity.success(commandService.command(commandDTO)); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java index b93fcced3..c60e1be01 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/enums/ApiExceptionEnum.java @@ -82,6 +82,7 @@ public enum ApiExceptionEnum { // Command Exceptions -- 18000 ~ 18999 COMMAND_NOT_FOUND(18000, LocaleKeys.COMMAND_NOT_FOUND), COMMAND_NOT_SUPPORTED(18001, LocaleKeys.COMMAND_NOT_SUPPORTED), + INVALID_PARAMS(18002, LocaleKeys.OPERATION_FAILED), // LLM Exceptions -- 19000 ~ 19999 PLATFORM_NOT_FOUND(19000, LocaleKeys.PLATFORM_NOT_FOUND), diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableHdfsHaReq.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableHdfsHaReq.java new file mode 100644 index 000000000..1952d23a8 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableHdfsHaReq.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.model.req; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotEmpty; +import java.util.List; + +@Data +public class EnableHdfsHaReq { + + @NotBlank + @Schema(description = "Active NameNode hostname", example = "nn-a") + private String activeNameNodeHost; + + @NotBlank + @Schema(description = "Standby NameNode hostname", example = "nn-b") + private String standbyNameNodeHost; + + @NotEmpty + @Schema(description = "JournalNode hostnames, must be >=3", example = "[\"jn-1\",\"jn-2\",\"jn-3\"]") + private List journalNodeHosts; + + @Schema(description = "Zookeeper service id (legacy, optional)", example = "12") + private Long zookeeperServiceId; + + @Schema( + description = + "Zookeeper hosts (preferred). If specified, server will build quorum as host:2181 and ignore zookeeperServiceId.", + example = "[\"zk-1\",\"zk-2\",\"zk-3\"]") + private List zookeeperHosts; + + @NotEmpty + @Schema(description = "ZKFC hostnames (usually 2 namenodes)", example = "[\"nn-a\",\"nn-b\"]") + private List zkfcHosts; + + @NotBlank + @Schema(description = "HDFS nameservice, allow rename", example = "nameservice1") + private String nameservice; +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableYarnRmHaReq.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableYarnRmHaReq.java new file mode 100644 index 000000000..b558d5b9b --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/model/req/EnableYarnRmHaReq.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.model.req; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotEmpty; +import java.util.List; + +@Data +public class EnableYarnRmHaReq { + + @NotBlank + @Schema(description = "Active ResourceManager hostname (rm1)", example = "rm-a") + private String activeResourceManagerHost; + + @NotBlank + @Schema(description = "Standby ResourceManager hostname (rm2)", example = "rm-b") + private String standbyResourceManagerHost; + + @NotEmpty + @Schema(description = "RM IDs used by yarn.resourcemanager.ha.rm-ids", example = "[\"rm1\",\"rm2\"]") + private List rmIds; + + @NotBlank + @Schema(description = "YARN cluster id (yarn.resourcemanager.cluster-id)", example = "yarn-cluster") + private String yarnClusterId; + + @Schema(description = "Zookeeper service id (legacy, optional)", example = "12") + private Long zookeeperServiceId; + + @Schema( + description = + "Zookeeper hosts (preferred). If specified, server will build zk-address as host:2181 and ignore zookeeperServiceId.", + example = "[\"zk-1\",\"zk-2\",\"zk-3\"]") + private List zookeeperHosts; +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/HdfsHaService.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/HdfsHaService.java new file mode 100644 index 000000000..9f88ee4f5 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/HdfsHaService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service; + +import org.apache.bigtop.manager.server.model.req.EnableHdfsHaReq; + +public interface HdfsHaService { + org.apache.bigtop.manager.server.model.vo.CommandVO buildEnableHdfsHaCommand( + Long clusterId, Long serviceId, EnableHdfsHaReq req); +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/YarnHaService.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/YarnHaService.java new file mode 100644 index 000000000..91f162e6f --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/YarnHaService.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service; + +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableYarnRmHaReq; + +public interface YarnHaService { + CommandDTO buildEnableYarnRmHaCommand(Long clusterId, Long serviceId, EnableYarnRmHaReq req); +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/HdfsHaServiceImpl.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/HdfsHaServiceImpl.java new file mode 100644 index 000000000..84d47c158 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/HdfsHaServiceImpl.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service.impl; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.common.utils.JsonUtils; +import org.apache.bigtop.manager.dao.po.ComponentPO; +import org.apache.bigtop.manager.dao.po.ServiceConfigPO; +import org.apache.bigtop.manager.dao.po.ServicePO; +import org.apache.bigtop.manager.dao.query.ComponentQuery; +import org.apache.bigtop.manager.dao.repository.ComponentDao; +import org.apache.bigtop.manager.dao.repository.ServiceConfigDao; +import org.apache.bigtop.manager.dao.repository.ServiceDao; +import org.apache.bigtop.manager.server.enums.CommandLevel; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.dto.PropertyDTO; +import org.apache.bigtop.manager.server.model.dto.command.ComponentCommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableHdfsHaReq; +import org.apache.bigtop.manager.server.service.CommandService; +import org.apache.bigtop.manager.server.service.HdfsHaService; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import org.springframework.dao.CannotAcquireLockException; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import com.fasterxml.jackson.core.type.TypeReference; + +import jakarta.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Service +public class HdfsHaServiceImpl implements HdfsHaService { + + private static final String ZOOKEEPER_SERVICE_NAME = "zookeeper"; + + @Resource + private ServiceDao serviceDao; + + @Resource + private ServiceConfigDao serviceConfigDao; + + @Resource + private ComponentDao componentDao; + + @Resource + private CommandService commandService; + + @Override + public org.apache.bigtop.manager.server.model.vo.CommandVO buildEnableHdfsHaCommand( + Long clusterId, Long serviceId, EnableHdfsHaReq req) { + // 0. Validate prerequisites (components exist, ZK is available, etc.) + validatePrerequisites(clusterId, serviceId, req); + + // 1. Write HA configurations to core-site.xml and hdfs-site.xml + writeHaConfigurationWithRetry(clusterId, serviceId, req); + + // 2. Trigger a single service-level job (EnableHdfsHaJob) which orchestrates stages internally. + // IMPORTANT: Do NOT use SERVICE CONFIGURE here, otherwise it will restart all hadoop components (including + // YARN). + CommandDTO commandDTO = new CommandDTO(); + commandDTO.setClusterId(clusterId); + commandDTO.setCommandLevel(CommandLevel.SERVICE); + commandDTO.setCommand(Command.ENABLE_HDFS_HA); + + // Embed request payload into customCommand for the service job to consume. + commandDTO.setCustomCommand("enableHdfsHa:" + JsonUtils.writeAsString(req)); + + // Component host map used by EnableHdfsHaJob + List componentCommands = new ArrayList<>(); + + // HDFS components + componentCommands.add(componentCommand("journalnode", req.getJournalNodeHosts())); + componentCommands.add( + componentCommand("namenode", List.of(req.getActiveNameNodeHost(), req.getStandbyNameNodeHost()))); + componentCommands.add(componentCommand("zkfc", req.getZkfcHosts())); + + // DataNode hosts: restart to pick up HA config + List datanodeHosts = getHostsByComponent(clusterId, "datanode"); + if (CollectionUtils.isNotEmpty(datanodeHosts)) { + componentCommands.add(componentCommand("datanode", datanodeHosts)); + } + + // YARN components: configure only (EnableHdfsHaJob will not stop/start them) + List rmHosts = getHostsByComponent(clusterId, "resourcemanager"); + if (CollectionUtils.isNotEmpty(rmHosts)) { + componentCommands.add(componentCommand("resourcemanager", rmHosts)); + } + List nmHosts = getHostsByComponent(clusterId, "nodemanager"); + if (CollectionUtils.isNotEmpty(nmHosts)) { + componentCommands.add(componentCommand("nodemanager", nmHosts)); + } + List hsHosts = getHostsByComponent(clusterId, "history_server"); + if (CollectionUtils.isNotEmpty(hsHosts)) { + componentCommands.add(componentCommand("history_server", hsHosts)); + } + + commandDTO.setComponentCommands(componentCommands); + + return commandService.command(commandDTO); + } + + private void writeHaConfigurationWithRetry(Long clusterId, Long serviceId, EnableHdfsHaReq req) { + int maxAttempts = 3; + long sleepMs = 200; + CannotAcquireLockException last = null; + for (int i = 1; i <= maxAttempts; i++) { + try { + writeHaConfigurationInNewTx(clusterId, serviceId, req); + return; + } catch (CannotAcquireLockException e) { + last = e; + if (i == maxAttempts) { + break; + } + try { + Thread.sleep(sleepMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw e; + } + sleepMs *= 2; + } + } + throw last; + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + protected void writeHaConfigurationInNewTx(Long clusterId, Long serviceId, EnableHdfsHaReq req) { + writeHaConfiguration(clusterId, serviceId, req); + } + + private ComponentCommandDTO componentCommand(String name, List hosts) { + ComponentCommandDTO cc = new ComponentCommandDTO(); + cc.setComponentName(name); + cc.setHostnames(hosts == null ? List.of() : hosts); + return cc; + } + + private List getHostsByComponent(Long clusterId, String componentName) { + ComponentQuery q = ComponentQuery.builder() + .clusterId(clusterId) + .name(componentName) + .build(); + List list = componentDao.findByQuery(q); + if (CollectionUtils.isEmpty(list)) { + return List.of(); + } + return list.stream() + .map(ComponentPO::getHostname) + .filter(StringUtils::isNotBlank) + .distinct() + .toList(); + } + + private void validatePrerequisites(Long clusterId, Long serviceId, EnableHdfsHaReq req) { + if (clusterId == null) { + throw new ServerException("clusterId is required"); + } + if (serviceId == null) { + throw new ServerException("serviceId is required"); + } + if (req == null) { + throw new ServerException("request body is required"); + } + if (StringUtils.isBlank(req.getActiveNameNodeHost()) || StringUtils.isBlank(req.getStandbyNameNodeHost())) { + throw new ServerException("activeNameNodeHost/standbyNameNodeHost must not be blank"); + } + if (req.getActiveNameNodeHost().equalsIgnoreCase(req.getStandbyNameNodeHost())) { + throw new ServerException("activeNameNodeHost and standbyNameNodeHost must be different"); + } + if (StringUtils.isBlank(req.getNameservice())) { + throw new ServerException("nameservice must not be blank"); + } + if (CollectionUtils.isEmpty(req.getJournalNodeHosts()) + || req.getJournalNodeHosts().size() < 3) { + throw new ServerException("journalNodeHosts must be provided and have at least 3 nodes"); + } + if (CollectionUtils.isEmpty(req.getZkfcHosts())) { + throw new ServerException("zkfcHosts must not be empty"); + } + + // Verify required components exist on specified hosts + // NameNode is required before enabling HA. + assertComponentExists(clusterId, "namenode", req.getActiveNameNodeHost()); + assertComponentExists(clusterId, "namenode", req.getStandbyNameNodeHost()); + + // journalnode/zkfc may not exist before enable HA, because this action will ADD/install them. + + // Validate ZK quorum can be generated (must for automatic failover) + String zk = buildZkAddress(clusterId, req); + if (StringUtils.isBlank(zk)) { + throw new ServerException( + "Failed to build ha.zookeeper.quorum, please check zookeeper hosts/service and components"); + } + } + + private void assertComponentExists(Long clusterId, String componentName, String hostname) { + ComponentQuery q = ComponentQuery.builder() + .clusterId(clusterId) + .name(componentName) + .hostname(hostname) + .build(); + List list = componentDao.findByQuery(q); + if (CollectionUtils.isEmpty(list)) { + throw new ServerException("Component not found in DB: component=" + componentName + ", host=" + hostname + + ", clusterId=" + clusterId); + } + } + + private void writeHaConfiguration(Long clusterId, Long serviceId, EnableHdfsHaReq req) { + // Update core-site.xml + Map coreSiteUpdates = buildCoreSiteUpdates(clusterId, req); + upsertServiceConfigProperties(clusterId, serviceId, "core-site", coreSiteUpdates); + + // Update hdfs-site.xml + Map hdfsSiteUpdates = buildHdfsSiteUpdates(req); + upsertServiceConfigProperties(clusterId, serviceId, "hdfs-site", hdfsSiteUpdates); + } + + private Map buildCoreSiteUpdates(Long clusterId, EnableHdfsHaReq req) { + Map m = new HashMap<>(); + m.put("fs.defaultFS", "hdfs://" + req.getNameservice()); + + String zkAddress = buildZkAddress(clusterId, req); + if (StringUtils.isNotBlank(zkAddress)) { + m.put("ha.zookeeper.quorum", zkAddress); + } + return m; + } + + private Map buildHdfsSiteUpdates(EnableHdfsHaReq req) { + String nameservice = req.getNameservice(); + String nn1Host = req.getActiveNameNodeHost(); + String nn2Host = req.getStandbyNameNodeHost(); + + if (CollectionUtils.isEmpty(req.getJournalNodeHosts()) + || req.getJournalNodeHosts().size() < 3) { + throw new ServerException("JournalNode hosts must be provided and have at least 3 nodes."); + } + String journalQuorum = + req.getJournalNodeHosts().stream().map(h -> h + ":8485").collect(Collectors.joining(";")); + + Map m = new HashMap<>(); + m.put("dfs.nameservices", nameservice); + m.put("dfs.ha.namenodes." + nameservice, "nn1,nn2"); + m.put("dfs.namenode.rpc-address." + nameservice + ".nn1", nn1Host + ":8020"); + m.put("dfs.namenode.rpc-address." + nameservice + ".nn2", nn2Host + ":8020"); + m.put("dfs.namenode.http-address." + nameservice + ".nn1", nn1Host + ":9870"); + m.put("dfs.namenode.http-address." + nameservice + ".nn2", nn2Host + ":9870"); + m.put("dfs.namenode.shared.edits.dir", "qjournal://" + journalQuorum + "/" + nameservice); + m.put( + "dfs.client.failover.proxy.provider." + nameservice, + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + m.put("dfs.ha.automatic-failover.enabled", "true"); + m.put("dfs.ha.fencing.methods", "shell(/bin/true)"); + + // Clean up single-node keys + m.put("__delete__.dfs.namenode.rpc-address", ""); + m.put("__delete__.dfs.namenode.http-address", ""); + m.put("__delete__.dfs.namenode.https-address", ""); + + return m; + } + + private static final String CUSTOM_COMMAND_PREFIX = "enableHdfsHa:"; + + private String buildZkAddress(Long clusterId, EnableHdfsHaReq req) { + // Preferred: zookeeperHosts from request + if (CollectionUtils.isNotEmpty(req.getZookeeperHosts())) { + return req.getZookeeperHosts().stream() + .filter(StringUtils::isNotBlank) + .map(h -> h.trim() + ":2181") + .distinct() + .collect(Collectors.joining(",")); + } + + Long zookeeperServiceId = req.getZookeeperServiceId(); + if (zookeeperServiceId == null) { + return ""; + } + + ServicePO zkService = serviceDao.findById(zookeeperServiceId); + if (zkService == null || !ZOOKEEPER_SERVICE_NAME.equalsIgnoreCase(zkService.getName())) { + throw new ServerException("zookeeperServiceId must point to a valid Zookeeper service."); + } + + ServiceConfigPO zooCfg = serviceConfigDao.findByServiceIdAndName(zookeeperServiceId, "zoo.cfg"); + if (zooCfg == null || StringUtils.isBlank(zooCfg.getPropertiesJson())) { + throw new ServerException("zoo.cfg not found or empty for zookeeper service: " + zookeeperServiceId); + } + + Map props = JsonUtils.readFromString(zooCfg.getPropertiesJson()); + String clientPort = props.getOrDefault("clientPort", "2181").toString().trim(); + + ComponentQuery query = ComponentQuery.builder() + .serviceId(zookeeperServiceId) + .name("zookeeper_server") + .build(); + List zkHosts = componentDao.findByQuery(query).stream() + .map(ComponentPO::getHostname) + .filter(StringUtils::isNotBlank) + .distinct() + .toList(); + + if (CollectionUtils.isEmpty(zkHosts)) { + return ""; + } + + return zkHosts.stream().map(h -> h.trim() + ":" + clientPort).collect(Collectors.joining(",")); + } + + private void upsertServiceConfigProperties( + Long clusterId, Long serviceId, String configName, Map updates) { + ServiceConfigPO po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + if (po == null) { + po = new ServiceConfigPO(); + po.setClusterId(clusterId); + po.setServiceId(serviceId); + po.setName(configName); + po.setPropertiesJson("[]"); // Initialize with empty JSON array + serviceConfigDao.save(po); + po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + } + + List properties = new ArrayList<>(); + if (StringUtils.isNotBlank(po.getPropertiesJson())) { + properties.addAll(JsonUtils.readFromString(po.getPropertiesJson(), new TypeReference<>() {})); + } + + Map propsMap = + properties.stream().collect(Collectors.toMap(PropertyDTO::getName, Function.identity(), (a, b) -> b)); + + for (Map.Entry e : updates.entrySet()) { + String k = e.getKey(); + if (k != null && k.startsWith("__delete__.")) { + propsMap.remove(k.substring("__delete__.".length())); + } else { + PropertyDTO prop = propsMap.getOrDefault(k, new PropertyDTO()); + prop.setName(k); + prop.setValue(e.getValue()); + propsMap.put(k, prop); + } + } + po.setPropertiesJson(JsonUtils.writeAsString(new ArrayList<>(propsMap.values()))); + serviceConfigDao.partialUpdateByIds(List.of(po)); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/YarnHaServiceImpl.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/YarnHaServiceImpl.java new file mode 100644 index 000000000..de7fbbb71 --- /dev/null +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/YarnHaServiceImpl.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bigtop.manager.server.service.impl; + +import org.apache.bigtop.manager.common.enums.Command; +import org.apache.bigtop.manager.common.utils.JsonUtils; +import org.apache.bigtop.manager.dao.po.ServiceConfigPO; +import org.apache.bigtop.manager.dao.po.ServicePO; +import org.apache.bigtop.manager.dao.query.ComponentQuery; +import org.apache.bigtop.manager.dao.repository.ComponentDao; +import org.apache.bigtop.manager.dao.repository.ServiceConfigDao; +import org.apache.bigtop.manager.dao.repository.ServiceDao; +import org.apache.bigtop.manager.server.enums.CommandLevel; +import org.apache.bigtop.manager.server.exception.ServerException; +import org.apache.bigtop.manager.server.model.dto.CommandDTO; +import org.apache.bigtop.manager.server.model.dto.PropertyDTO; +import org.apache.bigtop.manager.server.model.dto.command.ComponentCommandDTO; +import org.apache.bigtop.manager.server.model.req.EnableYarnRmHaReq; +import org.apache.bigtop.manager.server.service.YarnHaService; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.fasterxml.jackson.core.type.TypeReference; + +import jakarta.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Service +public class YarnHaServiceImpl implements YarnHaService { + + private static final String HADOOP_SERVICE_NAME = "hadoop"; + private static final String ZOOKEEPER_SERVICE_NAME = "zookeeper"; + + @Resource + private ServiceDao serviceDao; + + @Resource + private ServiceConfigDao serviceConfigDao; + + @Resource + private ComponentDao componentDao; + + @Override + @Transactional + public CommandDTO buildEnableYarnRmHaCommand(Long clusterId, Long serviceId, EnableYarnRmHaReq req) { + ServicePO servicePO = serviceDao.findById(serviceId); + if (servicePO == null) { + throw new ServerException("Service not found: " + serviceId); + } + if (!HADOOP_SERVICE_NAME.equalsIgnoreCase(servicePO.getName())) { + throw new ServerException( + "enable-yarn-rm-ha only supports service 'hadoop', but got: " + servicePO.getName()); + } + + Map yarnSiteUpdates = buildYarnSiteUpdates(clusterId, serviceId, req); + upsertServiceConfigProperties(clusterId, serviceId, "yarn-site", yarnSiteUpdates); + + CommandDTO commandDTO = new CommandDTO(); + commandDTO.setClusterId(clusterId); + commandDTO.setCommandLevel(CommandLevel.SERVICE); + commandDTO.setCommand(Command.ENABLE_YARN_RM_HA); + + ComponentCommandDTO rmCmd = new ComponentCommandDTO(); + rmCmd.setComponentName("resourcemanager"); + rmCmd.setHostnames(List.of(req.getActiveResourceManagerHost(), req.getStandbyResourceManagerHost())); + commandDTO.setComponentCommands(List.of(rmCmd)); + + return commandDTO; + } + + private Map buildYarnSiteUpdates(Long clusterId, Long serviceId, EnableYarnRmHaReq req) { + if (StringUtils.isBlank(req.getActiveResourceManagerHost()) + || StringUtils.isBlank(req.getStandbyResourceManagerHost())) { + throw new ServerException("active/standby resourcemanager host must not be blank"); + } + if (CollectionUtils.isEmpty(req.getRmIds()) || req.getRmIds().size() < 2) { + throw new ServerException("rmIds must be provided and contain at least 2 ids"); + } + + String rm1Id = req.getRmIds().get(0); + String rm2Id = req.getRmIds().get(1); + + Map m = new HashMap<>(); + m.put("yarn.resourcemanager.ha.enabled", "true"); + m.put("yarn.resourcemanager.ha.rm-ids", String.join(",", req.getRmIds())); + m.put("yarn.resourcemanager.cluster-id", req.getYarnClusterId()); + + // hostname.rmX + m.put("yarn.resourcemanager.hostname." + rm1Id, req.getActiveResourceManagerHost()); + m.put("yarn.resourcemanager.hostname." + rm2Id, req.getStandbyResourceManagerHost()); + + int webappPort = + resolvePortFromExistingKey(serviceId, "yarn-site", "yarn.resourcemanager.webapp.address", 8088); + m.put("yarn.resourcemanager.webapp.address." + rm1Id, req.getActiveResourceManagerHost() + ":" + webappPort); + m.put("yarn.resourcemanager.webapp.address." + rm2Id, req.getStandbyResourceManagerHost() + ":" + webappPort); + + int rmAddressPort = resolvePortFromExistingKey(serviceId, "yarn-site", "yarn.resourcemanager.address", 8032); + int rmAdminPort = + resolvePortFromExistingKey(serviceId, "yarn-site", "yarn.resourcemanager.admin.address", 8033); + int rmRtPort = resolvePortFromExistingKey( + serviceId, "yarn-site", "yarn.resourcemanager.resource-tracker.address", 8031); + int rmSchedulerPort = + resolvePortFromExistingKey(serviceId, "yarn-site", "yarn.resourcemanager.scheduler.address", 8030); + + m.put("yarn.resourcemanager.address." + rm1Id, req.getActiveResourceManagerHost() + ":" + rmAddressPort); + m.put("yarn.resourcemanager.address." + rm2Id, req.getStandbyResourceManagerHost() + ":" + rmAddressPort); + + m.put("yarn.resourcemanager.admin.address." + rm1Id, req.getActiveResourceManagerHost() + ":" + rmAdminPort); + m.put("yarn.resourcemanager.admin.address." + rm2Id, req.getStandbyResourceManagerHost() + ":" + rmAdminPort); + + m.put( + "yarn.resourcemanager.resource-tracker.address." + rm1Id, + req.getActiveResourceManagerHost() + ":" + rmRtPort); + m.put( + "yarn.resourcemanager.resource-tracker.address." + rm2Id, + req.getStandbyResourceManagerHost() + ":" + rmRtPort); + + m.put( + "yarn.resourcemanager.scheduler.address." + rm1Id, + req.getActiveResourceManagerHost() + ":" + rmSchedulerPort); + m.put( + "yarn.resourcemanager.scheduler.address." + rm2Id, + req.getStandbyResourceManagerHost() + ":" + rmSchedulerPort); + + String zkAddress = buildZkAddress(clusterId, req); + if (StringUtils.isNotBlank(zkAddress)) { + m.put("yarn.resourcemanager.zk-address", zkAddress); + } + + m.put("__delete__.yarn.resourcemanager.hostname", ""); + m.put("__delete__.yarn.resourcemanager.address", ""); + m.put("__delete__.yarn.resourcemanager.admin.address", ""); + m.put("__delete__.yarn.resourcemanager.resource-tracker.address", ""); + m.put("__delete__.yarn.resourcemanager.scheduler.address", ""); + m.put("__delete__.yarn.resourcemanager.webapp.address", ""); + m.put("__delete__.yarn.resourcemanager.webapp.https.address", ""); + return m; + } + + private int resolvePortFromExistingKey(Long serviceId, String configName, String key, int defaultPort) { + try { + Map existing = getExistingConfigAsMap(serviceId, configName); + String value = existing.get(key); + if (StringUtils.isBlank(value) || !value.contains(":")) { + return defaultPort; + } + String portStr = value.split(":")[1].trim(); + return Integer.parseInt(portStr); + } catch (Exception ignored) { + } + return defaultPort; + } + + private Map getExistingConfigAsMap(Long serviceId, String configName) { + Map map = new HashMap<>(); + try { + ServiceConfigPO cfg = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + if (cfg == null || StringUtils.isBlank(cfg.getPropertiesJson())) { + return map; + } + List properties = JsonUtils.readFromString(cfg.getPropertiesJson(), new TypeReference<>() {}); + for (PropertyDTO p : properties) { + if (p.getName() != null && p.getValue() != null) { + map.put(p.getName(), p.getValue().toString()); + } + } + } catch (Exception ignored) { + } + return map; + } + + private String buildZkAddress(Long clusterId, EnableYarnRmHaReq req) { + // Preferred: zookeeperHosts from request + if (CollectionUtils.isNotEmpty(req.getZookeeperHosts())) { + return req.getZookeeperHosts().stream() + .filter(StringUtils::isNotBlank) + .map(h -> h.trim() + ":2181") // Assume default port 2181 + .distinct() + .collect(Collectors.joining(",")); + } + + // Fallback: zookeeperServiceId + Long zookeeperServiceId = req.getZookeeperServiceId(); + if (zookeeperServiceId == null) { + return ""; + } + + ServicePO zkService = serviceDao.findById(zookeeperServiceId); + if (zkService == null) { + throw new ServerException("zookeeper service not found: " + zookeeperServiceId); + } + if (!ZOOKEEPER_SERVICE_NAME.equalsIgnoreCase(zkService.getName())) { + throw new ServerException( + "zookeeperServiceId must point to service 'zookeeper', but got: " + zkService.getName()); + } + + ServiceConfigPO zooCfg = serviceConfigDao.findByServiceIdAndName(zookeeperServiceId, "zoo.cfg"); + if (zooCfg == null || StringUtils.isBlank(zooCfg.getPropertiesJson())) { + return ""; + } + + List props = JsonUtils.readFromString(zooCfg.getPropertiesJson(), new TypeReference<>() {}); + String clientPort = "2181"; + for (PropertyDTO prop : props) { + if ("clientPort".equals(prop.getName())) { + clientPort = prop.getValue(); + break; + } + } + + ComponentQuery query = ComponentQuery.builder() + .serviceId(zookeeperServiceId) + .name("zookeeper_server") + .build(); + List zkHosts = componentDao.findByQuery(query).stream() + .map(x -> x.getHostname()) + .filter(StringUtils::isNotBlank) + .distinct() + .toList(); + + if (CollectionUtils.isEmpty(zkHosts)) { + return ""; + } + + String finalClientPort = clientPort; + return zkHosts.stream().map(h -> h.trim() + ":" + finalClientPort).collect(Collectors.joining(",")); + } + + private void upsertServiceConfigProperties( + Long clusterId, Long serviceId, String configName, Map updates) { + ServiceConfigPO po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + if (po == null) { + po = new ServiceConfigPO(); + po.setClusterId(clusterId); + po.setServiceId(serviceId); + po.setName(configName); + po.setPropertiesJson("[]"); // Initialize with empty JSON array + serviceConfigDao.save(po); + po = serviceConfigDao.findByServiceIdAndName(serviceId, configName); + } + + List properties = new ArrayList<>(); + if (StringUtils.isNotBlank(po.getPropertiesJson())) { + properties.addAll(JsonUtils.readFromString(po.getPropertiesJson(), new TypeReference<>() {})); + } + + Map propsMap = + properties.stream().collect(Collectors.toMap(PropertyDTO::getName, Function.identity(), (a, b) -> b)); + + for (Map.Entry e : updates.entrySet()) { + String k = e.getKey(); + if (k != null && k.startsWith("__delete__.")) { + propsMap.remove(k.substring("__delete__.".length())); + } else { + PropertyDTO prop = propsMap.getOrDefault(k, new PropertyDTO()); + prop.setName(k); + prop.setValue(e.getValue()); + propsMap.put(k, prop); + } + } + po.setPropertiesJson(JsonUtils.writeAsString(new ArrayList<>(propsMap.values()))); + serviceConfigDao.partialUpdateByIds(List.of(po)); + } +} diff --git a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/timer/ComponentStatusTimer.java b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/timer/ComponentStatusTimer.java index 1d654deb6..cd2b9bb91 100644 --- a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/timer/ComponentStatusTimer.java +++ b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/timer/ComponentStatusTimer.java @@ -74,10 +74,27 @@ public void execute() { ComponentPO componentDetailsPO = componentDao.findDetailsById(componentPO.getId()); HostPO hostPO = hostDao.findById(componentPO.getHostId()); + if (hostPO == null) { + log.warn( + "Component [{}] has an invalid hostId [{}], skipping status check.", + componentPO.getName(), + componentPO.getHostId()); + continue; + } + + String stack = componentDetailsPO.getStack(); + if (stack == null || !stack.contains("-")) { + log.warn( + "Component [{}] on host [{}] has invalid stack: [{}], skipping status check.", + componentPO.getName(), + hostPO.getHostname(), + stack); + continue; + } + ComponentStatusRequest request = ComponentStatusRequest.newBuilder() - .setStackName( - CaseUtils.toLowerCase(componentDetailsPO.getStack().split("-")[0])) - .setStackVersion(componentDetailsPO.getStack().split("-")[1]) + .setStackName(CaseUtils.toLowerCase(stack.split("-")[0])) + .setStackVersion(stack.split("-")[1]) .setServiceName(componentDetailsPO.getServiceName()) .setServiceUser(componentDetailsPO.getServiceUser()) .setComponentName(componentDetailsPO.getName()) @@ -99,8 +116,9 @@ public void execute() { componentDao.partialUpdateByIds(componentPOList); // Update services - Map> componentPOMap = - componentPOList.stream().collect(Collectors.groupingBy(ComponentPO::getServiceId)); + Map> componentPOMap = componentPOList.stream() + .filter(c -> c.getServiceId() != null) + .collect(Collectors.groupingBy(ComponentPO::getServiceId)); for (Map.Entry> entry : componentPOMap.entrySet()) { Long serviceId = entry.getKey(); List components = entry.getValue(); diff --git a/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql b/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql index aeab69311..13318dee4 100644 --- a/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql +++ b/bigtop-manager-server/src/main/resources/ddl/MySQL-DDL-CREATE.sql @@ -212,7 +212,7 @@ CREATE TABLE `job` ( `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, `name` VARCHAR(255), - `context` TEXT NOT NULL, + `context` LONGTEXT NOT NULL, `state` VARCHAR(32) NOT NULL, `cluster_id` BIGINT(20) UNSIGNED DEFAULT NULL, `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, @@ -226,10 +226,10 @@ CREATE TABLE `job` CREATE TABLE `stage` ( `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT, - `name` VARCHAR(32) NOT NULL, + `name` VARCHAR(255) NOT NULL, `service_name` VARCHAR(255), `component_name` VARCHAR(255), - `context` TEXT, + `context` LONGTEXT, `order` INTEGER, `state` VARCHAR(32) NOT NULL, `cluster_id` BIGINT(20) UNSIGNED DEFAULT NULL, @@ -253,7 +253,7 @@ CREATE TABLE `task` `component_name` VARCHAR(255), `command` VARCHAR(255), `custom_command` VARCHAR(255), - `content` TEXT, + `content` LONGTEXT, `context` TEXT NOT NULL, `state` VARCHAR(255), `cluster_id` BIGINT, diff --git a/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql b/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql index 35feb63ff..7f79d37be 100644 --- a/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql +++ b/bigtop-manager-server/src/main/resources/ddl/PostgreSQL-DDL-CREATE.sql @@ -232,7 +232,7 @@ CREATE INDEX idx_job_cluster_id ON job (cluster_id); CREATE TABLE stage ( id BIGINT CHECK (id > 0) NOT NULL GENERATED ALWAYS AS IDENTITY, - name VARCHAR(32) NOT NULL, + name VARCHAR(255) NOT NULL, service_name VARCHAR(255), component_name VARCHAR(255), context TEXT, diff --git a/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStageTest.java b/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStageTest.java index cdc385e17..0eaa7a54a 100644 --- a/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStageTest.java +++ b/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/stage/AbstractComponentStageTest.java @@ -20,11 +20,17 @@ import org.apache.bigtop.manager.common.utils.Environments; import org.apache.bigtop.manager.dao.po.ClusterPO; +import org.apache.bigtop.manager.dao.po.ServicePO; +import org.apache.bigtop.manager.dao.repository.ClusterDao; +import org.apache.bigtop.manager.dao.repository.ServiceDao; import org.apache.bigtop.manager.server.command.task.TaskContext; +import org.apache.bigtop.manager.server.holder.SessionUserHolder; +import org.apache.bigtop.manager.server.holder.SpringContextHolder; import org.apache.bigtop.manager.server.utils.StackUtils; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -44,16 +50,41 @@ public class AbstractComponentStageTest { @Mock private AbstractComponentStage stage; + @Mock + private ClusterDao clusterDao; + + @Mock + private ServiceDao serviceDao; + private static MockedStatic mocked; + private MockedStatic springContextHolderMocked; + private MockedStatic sessionUserHolderMocked; - @BeforeAll - public static void setup() { + @BeforeEach + public void setup() { mocked = mockStatic(Environments.class); when(Environments.isDevMode()).thenReturn(true); + springContextHolderMocked = mockStatic(SpringContextHolder.class); + when(SpringContextHolder.getBean(ClusterDao.class)).thenReturn(clusterDao); + when(SpringContextHolder.getBean(ServiceDao.class)).thenReturn(serviceDao); + + sessionUserHolderMocked = mockStatic(SessionUserHolder.class); + when(SessionUserHolder.getUserId()).thenReturn(1001L); + StackUtils.parseStack(); } + @AfterEach + public void tearDown() { + if (springContextHolderMocked != null) { + springContextHolderMocked.close(); + } + if (sessionUserHolderMocked != null) { + sessionUserHolderMocked.close(); + } + } + @AfterAll public static void teardown() { mocked.close(); @@ -75,6 +106,15 @@ public void testCreateTaskContext() { ReflectionTestUtils.setField(stage, "stageContext", stageContext); ReflectionTestUtils.setField(stage, "clusterPO", clusterPO); + // Mock serviceDao.findByClusterIdAndName to avoid NPE in createTaskContext + ServicePO servicePO = new ServicePO(); + servicePO.setId(2L); + when(serviceDao.findByClusterIdAndName(any(), any())).thenReturn(servicePO); + + // Directly set the mocked serviceDao and clusterDao to avoid injection issues + ReflectionTestUtils.setField(stage, "serviceDao", serviceDao); + ReflectionTestUtils.setField(stage, "clusterDao", clusterDao); + doCallRealMethod().when(stage).createTaskContext(any()); TaskContext taskContext = stage.createTaskContext("host1"); @@ -86,5 +126,7 @@ public void testCreateTaskContext() { assertEquals("zookeeper", taskContext.getServiceUser()); assertEquals("test", taskContext.getUserGroup()); assertEquals("/opt", taskContext.getRootDir()); + assertEquals(1001L, taskContext.getOperatorId()); + assertEquals(2L, taskContext.getServiceId()); } } diff --git a/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/task/ComponentStartTaskTest.java b/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/task/ComponentStartTaskTest.java index d7e53e4f5..6357f863d 100644 --- a/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/task/ComponentStartTaskTest.java +++ b/bigtop-manager-server/src/test/java/org/apache/bigtop/manager/server/command/task/ComponentStartTaskTest.java @@ -21,11 +21,15 @@ import org.apache.bigtop.manager.common.enums.Command; import org.apache.bigtop.manager.common.utils.JsonUtils; import org.apache.bigtop.manager.dao.po.ComponentPO; +import org.apache.bigtop.manager.dao.po.HostPO; import org.apache.bigtop.manager.dao.po.TaskPO; import org.apache.bigtop.manager.dao.repository.ComponentDao; import org.apache.bigtop.manager.dao.repository.HostDao; import org.apache.bigtop.manager.dao.repository.TaskDao; import org.apache.bigtop.manager.server.holder.SpringContextHolder; +import org.apache.bigtop.manager.server.model.dto.ComponentDTO; +import org.apache.bigtop.manager.server.model.dto.StackDTO; +import org.apache.bigtop.manager.server.utils.StackUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -65,6 +69,8 @@ public class ComponentStartTaskTest { @Mock private ComponentDao componentDao; + private MockedStatic stackUtilsMocked; + @Spy private TaskContext taskContext; @@ -80,6 +86,20 @@ public void setUp() { when(SpringContextHolder.getBean(TaskDao.class)).thenReturn(taskDao); when(SpringContextHolder.getBean(ComponentDao.class)).thenReturn(componentDao); + // Mock StackUtils static methods + stackUtilsMocked = mockStatic(StackUtils.class); + ComponentDTO componentDTO = new ComponentDTO(); + componentDTO.setName("TestComponentName"); + componentDTO.setDisplayName("TestComponentDisplayName"); + stackUtilsMocked + .when(() -> StackUtils.getComponentDTO("TestComponentName")) + .thenReturn(componentDTO); + + StackDTO stackDTO = new StackDTO("test-stack", "1.0.0"); + stackUtilsMocked + .when(() -> StackUtils.getServiceStack("TestServiceName")) + .thenReturn(stackDTO); + componentStartTask = mock(ComponentStartTask.class); taskContext.setComponentDisplayName("TestComponentDisplayName"); @@ -102,6 +122,9 @@ public void setUp() { @AfterEach public void tearDown() { springContextHolderMockedStatic.close(); + if (stackUtilsMocked != null) { + stackUtilsMocked.close(); + } } @Test @@ -120,8 +143,18 @@ public void testGetCommand() { @Test public void testOnSuccess() { doCallRealMethod().when(componentStartTask).onSuccess(); + + // Mock hostDao.findByHostname to avoid Host not found exception + HostPO hostPO = new HostPO(); + hostPO.setId(1L); + hostPO.setHostname("TestHostname"); + when(hostDao.findByHostname("TestHostname")).thenReturn(hostPO); + List componentPOS = new ArrayList<>(); - componentPOS.add(new ComponentPO()); + ComponentPO existing = new ComponentPO(); + existing.setId(1L); + existing.setName("TestComponentName"); + componentPOS.add(existing); when(componentDao.findByQuery(any())).thenReturn(componentPOS); componentStartTask.onSuccess(); diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java index 6e262d4e8..c38e5fc12 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopParams.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; @Getter @Slf4j @@ -48,6 +49,19 @@ @NoArgsConstructor public class HadoopParams extends BigtopParams { + private String resolveNameService() { + try { + Map hdfsSite = LocalSettings.configurations(getServiceName(), "hdfs-site"); + Object ns = hdfsSite.get("dfs.nameservices"); + if (ns != null && StringUtils.isNotBlank(ns.toString())) { + return ns.toString().split("\\s*,\\s*")[0].trim(); + } + } catch (Exception e) { + // ignore and fallback + } + return "nameservice1"; + } + private final String hadoopLogDir = "/var/log/hadoop"; private final String hadoopPidDir = "/var/run/hadoop"; @@ -127,8 +141,9 @@ public Map coreSite() { coreSite.put( "fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost", namenodeList.get(0))); } else if (!namenodeList.isEmpty() && namenodeList.size() == 2) { + String nameservice = resolveNameService(); coreSite.put( - "fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost:8020", "nameservice1")); + "fs.defaultFS", ((String) coreSite.get("fs.defaultFS")).replace("localhost:8020", nameservice)); coreSite.put("ha.zookeeper.quorum", zkString); } return coreSite; @@ -144,36 +159,111 @@ public Map hdfsSite() { Map hdfsSite = LocalSettings.configurations(getServiceName(), "hdfs-site"); List namenodeList = LocalSettings.componentHosts("namenode"); List journalNodeList = LocalSettings.componentHosts("journalnode"); - if (!namenodeList.isEmpty() && namenodeList.size() == 1) { - hdfsSite.put( - "dfs.namenode.rpc-address", - ((String) hdfsSite.get("dfs.namenode.rpc-address")).replace("0.0.0.0", namenodeList.get(0))); - hdfsSite.put( - "dfs.datanode.https.address", - ((String) hdfsSite.get("dfs.datanode.https.address")).replace("0.0.0.0", namenodeList.get(0))); - hdfsSite.put( - "dfs.namenode.https-address", - ((String) hdfsSite.get("dfs.namenode.https-address")).replace("0.0.0.0", namenodeList.get(0))); - } else if (!namenodeList.isEmpty() && namenodeList.size() == 2) { + + String nameservice = resolveNameService(); + boolean haByConfig = false; + Object haNn = hdfsSite.get("dfs.ha.namenodes." + nameservice); + Object haRpc1 = hdfsSite.get("dfs.namenode.rpc-address." + nameservice + ".nn1"); + Object haRpc2 = hdfsSite.get("dfs.namenode.rpc-address." + nameservice + ".nn2"); + if ((haNn != null && StringUtils.isNotBlank(haNn.toString())) + || (haRpc1 != null && StringUtils.isNotBlank(haRpc1.toString())) + || (haRpc2 != null && StringUtils.isNotBlank(haRpc2.toString()))) { + haByConfig = true; + } + + if (haByConfig) { + // HA mode: do not rely on components.json namenode list, because it may be stale. + // During enable-ha bootstrap, journalnode components may be in the process of being installed, + // so allow falling back when journalnode list is not ready yet. + if (journalNodeList == null || journalNodeList.size() < 3) { + log.warn( + "JournalNode host list is not ready (size < 3), skip HA hdfs-site generation for now and fall back to non-HA config. journalNodeList={}", + journalNodeList); + haByConfig = false; + } + + List filteredJournalNodes = journalNodeList.stream() + .filter(StringUtils::isNotBlank) + .map(String::trim) + .distinct() + .toList(); + if (filteredJournalNodes.size() < 3) { + log.warn( + "JournalNode host list is invalid after filtering blanks (size < 3), skip HA hdfs-site generation for now and fall back to non-HA config. journalNodeList={} filteredJournalNodes={}", + journalNodeList, + filteredJournalNodes); + haByConfig = false; + } + + String journalQuorum = + filteredJournalNodes.stream().map(x -> x + ":8485").collect(Collectors.joining(";")); + + hdfsSite.remove("dfs.namenode.rpc-address"); + hdfsSite.remove("dfs.namenode.https-address"); hdfsSite.remove("dfs.namenode.http-address"); + hdfsSite.put("dfs.ha.automatic-failover.enabled", "true"); - hdfsSite.put("dfs.nameservices", "nameservice1"); - hdfsSite.put("dfs.ha.namenodes.nameservice1", "nn1,nn2"); - hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn1", namenodeList.get(0) + ":8020"); - hdfsSite.put("dfs.namenode.rpc-address.nameservice1.nn2", namenodeList.get(1) + ":8020"); - hdfsSite.put("dfs.namenode.http-address.nameservice1.nn1", namenodeList.get(0) + ":9870"); - hdfsSite.put("dfs.namenode.http-address.nameservice1.nn2", namenodeList.get(1) + ":9870"); - hdfsSite.put( - "dfs.namenode.shared.edits.dir", - "qjournal://" + journalNodeList.get(0) + ":8485;" + journalNodeList.get(1) + ":8485;" - + journalNodeList.get(2) + ":8485" + "/nameservice1"); - hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal"); - hdfsSite.put( - "dfs.client.failover.proxy.provider.nameservice1", - "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); - hdfsSite.put("dfs.journalnode.edits.dir", "/hadoop/dfs/journal"); - hdfsSite.put("dfs.ha.fencing.methods", "shell(/bin/true)"); - hdfsSite.put("dfs.replication", "3"); + hdfsSite.put("dfs.nameservices", nameservice); + if (hdfsSite.get("dfs.ha.namenodes." + nameservice) == null) { + hdfsSite.put("dfs.ha.namenodes." + nameservice, "nn1,nn2"); + } + hdfsSite.put("dfs.namenode.shared.edits.dir", "qjournal://" + journalQuorum + "/" + nameservice); + + // Ensure required HA keys exist (respect existing values if present) + if (hdfsSite.get("dfs.namenode.rpc-address." + nameservice + ".nn1") == null + && namenodeList != null + && namenodeList.size() >= 1) { + hdfsSite.put("dfs.namenode.rpc-address." + nameservice + ".nn1", namenodeList.get(0) + ":8020"); + } + if (hdfsSite.get("dfs.namenode.rpc-address." + nameservice + ".nn2") == null + && namenodeList != null + && namenodeList.size() >= 2) { + hdfsSite.put("dfs.namenode.rpc-address." + nameservice + ".nn2", namenodeList.get(1) + ":8020"); + } + if (hdfsSite.get("dfs.namenode.http-address." + nameservice + ".nn1") == null + && namenodeList != null + && namenodeList.size() >= 1) { + hdfsSite.put("dfs.namenode.http-address." + nameservice + ".nn1", namenodeList.get(0) + ":9870"); + } + if (hdfsSite.get("dfs.namenode.http-address." + nameservice + ".nn2") == null + && namenodeList != null + && namenodeList.size() >= 2) { + hdfsSite.put("dfs.namenode.http-address." + nameservice + ".nn2", namenodeList.get(1) + ":9870"); + } + + if (hdfsSite.get("dfs.client.failover.proxy.provider." + nameservice) == null) { + hdfsSite.put( + "dfs.client.failover.proxy.provider." + nameservice, + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + } + if (hdfsSite.get("dfs.ha.fencing.methods") == null) { + hdfsSite.put("dfs.ha.fencing.methods", "shell(/bin/true)"); + } + if (hdfsSite.get("dfs.replication") == null) { + hdfsSite.put("dfs.replication", "3"); + } + + } else if (namenodeList != null && !namenodeList.isEmpty()) { + // Single NN mode + String nnHost = namenodeList.get(0); + + Object rpcAddr = hdfsSite.get("dfs.namenode.rpc-address"); + if (rpcAddr == null) { + throw new IllegalArgumentException("Missing required hdfs-site key: dfs.namenode.rpc-address"); + } + hdfsSite.put("dfs.namenode.rpc-address", rpcAddr.toString().replace("0.0.0.0", nnHost)); + + Object dnHttpsAddr = hdfsSite.get("dfs.datanode.https.address"); + if (dnHttpsAddr == null) { + throw new IllegalArgumentException("Missing required hdfs-site key: dfs.datanode.https.address"); + } + hdfsSite.put("dfs.datanode.https.address", dnHttpsAddr.toString().replace("0.0.0.0", nnHost)); + + Object nnHttpsAddr = hdfsSite.get("dfs.namenode.https-address"); + if (nnHttpsAddr == null) { + throw new IllegalArgumentException("Missing required hdfs-site key: dfs.namenode.https-address"); + } + hdfsSite.put("dfs.namenode.https-address", nnHttpsAddr.toString().replace("0.0.0.0", nnHost)); } // Configure native library dependent settings @@ -181,10 +271,21 @@ public Map hdfsSite() { dfsDataDir = (String) hdfsSite.get("dfs.datanode.data.dir"); dfsNameNodeDir = (String) hdfsSite.get("dfs.namenode.name.dir"); - nameNodeFormattedDirs = Arrays.stream(dfsNameNodeDir.split(",")) - .map(x -> x + "/namenode-formatted/") - .toList(); - String dfsHttpAddress = (String) hdfsSite.get("dfs.namenode.http-address.nameservice1.nn1"); + if (StringUtils.isNotBlank(dfsNameNodeDir)) { + nameNodeFormattedDirs = Arrays.stream(dfsNameNodeDir.split(",")) + .map(x -> x + "/namenode-formatted/") + .toList(); + } else { + nameNodeFormattedDirs = List.of(); + log.warn("dfs.namenode.name.dir is empty, skip namenode formatted dirs generation"); + } + + String resolvedNameService = resolveNameService(); + String dfsHttpAddress = (String) hdfsSite.get("dfs.namenode.http-address." + resolvedNameService + ".nn1"); + if (StringUtils.isBlank(dfsHttpAddress)) { + // backward compatibility / older templates + dfsHttpAddress = (String) hdfsSite.get("dfs.namenode.http-address.nameservice1.nn1"); + } if (dfsHttpAddress != null && dfsHttpAddress.contains(":")) { String[] parts = dfsHttpAddress.split(":"); if (parts.length >= 2) { @@ -192,13 +293,17 @@ public Map hdfsSite() { } } String journalHttpAddress = (String) hdfsSite.get("dfs.namenode.shared.edits.dir"); - Pattern pattern = Pattern.compile(":(\\d{1,5})"); - Matcher matcher = pattern.matcher(journalHttpAddress); - if (matcher.find()) { - journalHttpPort = matcher.group(1); - log.info("find jounalnode port: " + journalHttpPort); + if (StringUtils.isNotBlank(journalHttpAddress)) { + Pattern pattern = Pattern.compile(":(\\d{1,5})"); + Matcher matcher = pattern.matcher(journalHttpAddress); + if (matcher.find()) { + journalHttpPort = matcher.group(1); + log.info("find jounalnode port: " + journalHttpPort); + } else { + log.warn("not found journalnode port!"); + } } else { - log.warn("not found journalnode port!"); + log.warn("dfs.namenode.shared.edits.dir is empty, skip journalnode port parsing"); } String dfsDomainSocketPath = (String) hdfsSite.get("dfs.domain.socket.path"); if (StringUtils.isNotBlank(dfsDomainSocketPath)) { @@ -220,32 +325,138 @@ public Map yarnLog4j() { public Map yarnSite() { Map yarnSite = LocalSettings.configurations(getServiceName(), "yarn-site"); List resourcemanagerList = LocalSettings.componentHosts("resourcemanager"); - if (!resourcemanagerList.isEmpty()) { - yarnSite.put("yarn.resourcemanager.hostname", MessageFormat.format("{0}", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.resource-tracker.address", - ((String) yarnSite.get("yarn.resourcemanager.resource-tracker.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.scheduler.address", - ((String) yarnSite.get("yarn.resourcemanager.scheduler.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.address", - ((String) yarnSite.get("yarn.resourcemanager.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.admin.address", - ((String) yarnSite.get("yarn.resourcemanager.admin.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.webapp.address", - ((String) yarnSite.get("yarn.resourcemanager.webapp.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); - yarnSite.put( - "yarn.resourcemanager.webapp.https.address", - ((String) yarnSite.get("yarn.resourcemanager.webapp.https.address")) - .replace("0.0.0.0", resourcemanagerList.get(0))); + + // YARN ResourceManager HA + // When there are >= 2 RMs, or `yarn.resourcemanager.ha.enabled=true` is explicitly set, + // enter HA mode. In HA mode, do not set single-RM keys like `yarn.resourcemanager.hostname` + // to avoid conflicts. + boolean haEnabledByConfig = false; + Object haEnabledValue = yarnSite.get("yarn.resourcemanager.ha.enabled"); + if (haEnabledValue != null) { + haEnabledByConfig = + "true".equalsIgnoreCase(haEnabledValue.toString().trim()); + } + boolean haMode = (resourcemanagerList != null && resourcemanagerList.size() >= 2) || haEnabledByConfig; + + if (haMode && resourcemanagerList != null && resourcemanagerList.size() >= 2) { + String rm1Host = resourcemanagerList.get(0); + String rm2Host = resourcemanagerList.get(1); + + // rm-ids: Use existing config if present, otherwise default to rm1,rm2 + String rmIds = "rm1,rm2"; + Object rmIdsObj = yarnSite.get("yarn.resourcemanager.ha.rm-ids"); + if (rmIdsObj != null && StringUtils.isNotBlank(rmIdsObj.toString())) { + rmIds = rmIdsObj.toString().trim(); + } + String[] rmIdArr = rmIds.split("\\s*,\\s*"); + String rm1Id = rmIdArr.length > 0 && StringUtils.isNotBlank(rmIdArr[0]) ? rmIdArr[0] : "rm1"; + String rm2Id = rmIdArr.length > 1 && StringUtils.isNotBlank(rmIdArr[1]) ? rmIdArr[1] : "rm2"; + + yarnSite.put("yarn.resourcemanager.ha.enabled", "true"); + yarnSite.put("yarn.resourcemanager.ha.rm-ids", rm1Id + "," + rm2Id); + + // cluster-id: Respect if set by server, otherwise provide a stable default + if (yarnSite.get("yarn.resourcemanager.cluster-id") == null + || StringUtils.isBlank( + yarnSite.get("yarn.resourcemanager.cluster-id").toString())) { + yarnSite.put("yarn.resourcemanager.cluster-id", "yarn-cluster"); + } + + // zk-address: Respect if set by server, otherwise auto-generate like in coreSite() + Object zkAddr = yarnSite.get("yarn.resourcemanager.zk-address"); + if (zkAddr == null || StringUtils.isBlank(zkAddr.toString())) { + try { + List zookeeperServerHosts = LocalSettings.componentHosts("zookeeper_server"); + Map ZKPort = LocalSettings.configurations("zookeeper", "zoo.cfg"); + String clientPort = (String) ZKPort.get("clientPort"); + StringBuilder zkString = new StringBuilder(); + for (int i = 0; i < zookeeperServerHosts.size(); i++) { + String host = zookeeperServerHosts.get(i); + if (host == null || host.trim().isEmpty()) { + continue; + } + zkString.append(host.trim()).append(":").append(clientPort); + if (i != zookeeperServerHosts.size() - 1) { + zkString.append(","); + } + } + if (zkString.length() > 0) { + yarnSite.put("yarn.resourcemanager.zk-address", zkString.toString()); + } + } catch (Exception e) { + log.warn("Failed to auto-generate yarn.resourcemanager.zk-address", e); + } + } + + // Set hostname.rmX + yarnSite.put("yarn.resourcemanager.hostname." + rm1Id, rm1Host); + yarnSite.put("yarn.resourcemanager.hostname." + rm2Id, rm2Host); + + // webapp.address.rmX: Extract port from existing webapp.address, or default to 8088 + int webappPort = 8088; + Object webappAddress = yarnSite.get("yarn.resourcemanager.webapp.address"); + if (webappAddress != null && webappAddress.toString().contains(":")) { + try { + String portStr = webappAddress.toString().split(":")[1].trim(); + webappPort = Integer.parseInt(portStr); + } catch (Exception ignored) { + } + } + yarnSite.put("yarn.resourcemanager.webapp.address." + rm1Id, rm1Host + ":" + webappPort); + yarnSite.put("yarn.resourcemanager.webapp.address." + rm2Id, rm2Host + ":" + webappPort); + + // Auto-generate other HA addresses by extracting ports from single-node configs + generateHaAddress(yarnSite, "yarn.resourcemanager.address", rm1Id, rm1Host, rm2Id, rm2Host, 8032); + generateHaAddress(yarnSite, "yarn.resourcemanager.admin.address", rm1Id, rm1Host, rm2Id, rm2Host, 8033); + generateHaAddress( + yarnSite, "yarn.resourcemanager.resource-tracker.address", rm1Id, rm1Host, rm2Id, rm2Host, 8031); + generateHaAddress(yarnSite, "yarn.resourcemanager.scheduler.address", rm1Id, rm1Host, rm2Id, rm2Host, 8030); + + // Remove single-RM keys to avoid conflicts + yarnSite.remove("yarn.resourcemanager.hostname"); + yarnSite.remove("yarn.resourcemanager.address"); + yarnSite.remove("yarn.resourcemanager.admin.address"); + yarnSite.remove("yarn.resourcemanager.resource-tracker.address"); + yarnSite.remove("yarn.resourcemanager.scheduler.address"); + yarnSite.remove("yarn.resourcemanager.webapp.address"); + yarnSite.remove("yarn.resourcemanager.webapp.https.address"); + + } else { + // Single ResourceManager + if (resourcemanagerList != null && !resourcemanagerList.isEmpty()) { + String rmHost = resourcemanagerList.get(0); + yarnSite.put("yarn.resourcemanager.hostname", MessageFormat.format("{0}", rmHost)); + + String rt = (String) yarnSite.get("yarn.resourcemanager.resource-tracker.address"); + if (rt != null) { + yarnSite.put("yarn.resourcemanager.resource-tracker.address", rt.replace("0.0.0.0", rmHost)); + } + + String scheduler = (String) yarnSite.get("yarn.resourcemanager.scheduler.address"); + if (scheduler != null) { + yarnSite.put("yarn.resourcemanager.scheduler.address", scheduler.replace("0.0.0.0", rmHost)); + } + + String addr = (String) yarnSite.get("yarn.resourcemanager.address"); + if (addr != null) { + yarnSite.put("yarn.resourcemanager.address", addr.replace("0.0.0.0", rmHost)); + } + + String admin = (String) yarnSite.get("yarn.resourcemanager.admin.address"); + if (admin != null) { + yarnSite.put("yarn.resourcemanager.admin.address", admin.replace("0.0.0.0", rmHost)); + } + + String webapp = (String) yarnSite.get("yarn.resourcemanager.webapp.address"); + if (webapp != null) { + yarnSite.put("yarn.resourcemanager.webapp.address", webapp.replace("0.0.0.0", rmHost)); + } + + String https = (String) yarnSite.get("yarn.resourcemanager.webapp.https.address"); + if (https != null) { + yarnSite.put("yarn.resourcemanager.webapp.https.address", https.replace("0.0.0.0", rmHost)); + } + } } nodeManagerLogDir = (String) yarnSite.get("yarn.nodemanager.log-dirs"); @@ -303,6 +514,29 @@ public String getServiceName() { * * @param hdfsSite The HDFS site configuration map to be modified */ + private static void generateHaAddress( + Map yarnSite, + String baseKey, + String rm1Id, + String rm1Host, + String rm2Id, + String rm2Host, + int defaultPort) { + + int port = defaultPort; + Object base = yarnSite.get(baseKey); + if (base != null && base.toString().contains(":")) { + try { + String portStr = base.toString().split(":")[1].trim(); + port = Integer.parseInt(portStr); + } catch (Exception ignored) { + } + } + + yarnSite.put(baseKey + "." + rm1Id, rm1Host + ":" + port); + yarnSite.put(baseKey + "." + rm2Id, rm2Host + ":" + port); + } + private void configureNativeLibraryDependentSettings(Map hdfsSite) { try { // Detect system glibc version to determine native library support diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java index b979a012c..7b9b19039 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/HadoopSetup.java @@ -71,6 +71,7 @@ public static ShellResult configure(Params params, String componentName) { hadoopGroup, Constants.PERMISSION_755, true); + break; } case "secondarynamenode": { LinuxFileUtils.createDirectories( @@ -79,6 +80,7 @@ public static ShellResult configure(Params params, String componentName) { hadoopGroup, Constants.PERMISSION_755, true); + break; } case "journalnode": { LinuxFileUtils.createDirectories( @@ -87,6 +89,7 @@ public static ShellResult configure(Params params, String componentName) { hadoopGroup, Constants.PERMISSION_755, true); + break; } case "datanode": { LinuxFileUtils.createDirectories( @@ -102,6 +105,7 @@ public static ShellResult configure(Params params, String componentName) { dir, hadoopUser, hadoopGroup, Constants.PERMISSION_755, true); } } + break; } case "nodemanager": { if (StringUtils.isNotBlank(hadoopParams.getNodeManagerLogDir())) { @@ -119,7 +123,10 @@ public static ShellResult configure(Params params, String componentName) { dir, hadoopUser, hadoopGroup, Constants.PERMISSION_755, true); } } + break; } + default: + break; } } @@ -265,12 +272,20 @@ public static void formatNameNode(HadoopParams hadoopParams) { } } - private static boolean checkAllJournalNodesPortReachable(HadoopParams hadoopParams) throws InterruptedException { + public static boolean checkAllJournalNodesPortReachable(HadoopParams hadoopParams) throws InterruptedException { + // Only required for HDFS HA (qjournal). In single NameNode mode, JournalNode is not used. List journalNodeList = LocalSettings.componentHosts("journalnode"); - String port = hadoopParams.getJournalHttpPort(); if (journalNodeList == null || journalNodeList.isEmpty()) { - throw new IllegalArgumentException("JournalNode host list cannot be empty!"); + log.info("JournalNode host list is empty, skip JournalNode reachability check (single NameNode mode)"); + return true; } + + String port = hadoopParams.getJournalHttpPort(); + if (StringUtils.isBlank(port)) { + log.warn("JournalNode http port is empty, skip JournalNode reachability check"); + return true; + } + int retryCount = 0; int maxRetry = 100; long retryIntervalMs = 2000; @@ -278,12 +293,10 @@ private static boolean checkAllJournalNodesPortReachable(HadoopParams hadoopPara while (retryCount < maxRetry) { boolean allReachable = true; for (String host : journalNodeList) { - boolean isReachable = false; Socket socket = null; try { socket = new Socket(); socket.connect(new InetSocketAddress(host, Integer.parseInt(port)), connectTimeoutMs); - isReachable = true; log.info("JournalNode [{}:{}] is reachable.", host, port); } catch (Exception e) { allReachable = false; diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java index b57dcb2b1..bbad37827 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/NameNodeScript.java @@ -63,39 +63,57 @@ public ShellResult start(Params params) { HadoopParams hadoopParams = (HadoopParams) params; String hostname = hadoopParams.hostname(); List namenodeList = LocalSettings.componentHosts("namenode"); + // The first namenode in the list is the one that formats the cluster. + if (namenodeList != null && !namenodeList.isEmpty() && hostname.equals(namenodeList.get(0))) { + // Only format if not already formatted. + HadoopSetup.formatNameNode(hadoopParams); + } + + // For both active and standby, the start command is the same. + // The role is determined by ZKFC at runtime. + String startCmd = MessageFormat.format("{0}/hdfs --daemon start namenode", hadoopParams.binDir()); try { - if (namenodeList != null && !namenodeList.isEmpty() && hostname.equals(namenodeList.get(0))) { - HadoopSetup.formatNameNode(hadoopParams); - String startCmd = MessageFormat.format("{0}/hdfs --daemon start namenode", hadoopParams.binDir()); - ShellResult result = LinuxOSUtils.sudoExecCmd(startCmd, hadoopParams.user()); - if (result.getExitCode() != 0) { - throw new StackException("Failed to start primary NameNode: " + result.getErrMsg()); - } - return result; - } else if (namenodeList != null && namenodeList.size() >= 2 && hostname.equals(namenodeList.get(1))) { - boolean isPrimaryReady = waitForNameNodeReady(namenodeList.get(0), hadoopParams); - if (!isPrimaryReady) { - throw new StackException("Primary NameNode is not ready, cannot bootstrap standby"); - } - String bootstrapCmd = MessageFormat.format( - "{0}/hdfs namenode -bootstrapStandby -nonInteractive", hadoopParams.binDir()); - ShellResult bootstrapResult = LinuxOSUtils.sudoExecCmd(bootstrapCmd, hadoopParams.user()); - if (bootstrapResult.getExitCode() != 0) { - throw new StackException("Failed to bootstrap standby NameNode: " + bootstrapResult.getErrMsg()); - } + return LinuxOSUtils.sudoExecCmd(startCmd, hadoopParams.user()); + } catch (Exception e) { + throw new StackException(e); + } + } - String startCmd = MessageFormat.format("{0}/hdfs --daemon start namenode", hadoopParams.binDir()); - ShellResult startResult = LinuxOSUtils.sudoExecCmd(startCmd, hadoopParams.user()); - if (startResult.getExitCode() != 0) { - throw new StackException("Failed to start standby NameNode: " + startResult.getErrMsg()); - } - return startResult; - } else { - throw new StackException("Current host is not in NameNode HA list: " + hostname); + public ShellResult initializeSharedEdits(Params params) { + configure(params); + HadoopParams hadoopParams = (HadoopParams) params; + try { + boolean allJnReachable = HadoopSetup.checkAllJournalNodesPortReachable(hadoopParams); + if (!allJnReachable) { + throw new StackException("Cannot initializeSharedEdits: Some JournalNodes are unreachable."); } } catch (Exception e) { throw new StackException(e); } + + String cmd = MessageFormat.format( + "{0}/hdfs --config {1} namenode -initializeSharedEdits -nonInteractive", + hadoopParams.binDir(), hadoopParams.confDir()); + try { + return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user()); + } catch (Exception e) { + throw new StackException(e); + } + } + + public ShellResult bootstrapStandby(Params params) { + configure(params); + HadoopParams hadoopParams = (HadoopParams) params; + try { + return bootstrapStandby(hadoopParams); + } catch (Exception e) { + throw new StackException(e); + } + } + + private ShellResult bootstrapStandby(HadoopParams hadoopParams) throws Exception { + String cmd = MessageFormat.format("{0}/hdfs namenode -bootstrapStandby -nonInteractive", hadoopParams.binDir()); + return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user()); } private boolean waitForNameNodeReady(String namenodeHost, HadoopParams hadoopParams) { diff --git a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java index c19cb270e..ba97d672c 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java +++ b/bigtop-manager-stack/bigtop-manager-stack-bigtop/src/main/java/org/apache/bigtop/manager/stack/bigtop/v3_3_0/hadoop/ZkfcScript.java @@ -51,6 +51,10 @@ public ShellResult configure(Params params) { @Override public ShellResult init(Params params) { + return formatZk(params); + } + + public ShellResult formatZk(Params params) { configure(params); HadoopParams hadoopParams = (HadoopParams) params; @@ -68,9 +72,30 @@ public ShellResult start(Params params) { configure(params); HadoopParams hadoopParams = (HadoopParams) params; - String cmd = MessageFormat.format("{0}/hdfs --daemon start zkfc", hadoopParams.binDir()); + // Ensure ZKFC is formatted before starting. + // If the parent znode does not exist, zkfc will fail with: + // "Parent znode does not exist. Run with -formatZK flag to initialize ZooKeeper." + // Make this step idempotent: try start, if it indicates not formatted then format and retry. + String startCmd = MessageFormat.format("{0}/hdfs --daemon start zkfc", hadoopParams.binDir()); try { - return LinuxOSUtils.sudoExecCmd(cmd, hadoopParams.user()); + ShellResult startRes = LinuxOSUtils.sudoExecCmd(startCmd, hadoopParams.user()); + if (startRes.getExitCode() == 0) { + return startRes; + } + + String err = (startRes.getErrMsg() == null ? "" : startRes.getErrMsg()); + String out = (startRes.getOutput() == null ? "" : startRes.getOutput()); + String combined = out + "\n" + err; + if (combined.contains("Parent znode does not exist") || combined.contains("-formatZK")) { + log.warn("ZKFC not formatted, attempting to formatZK and retry start. stdout/stderr: {}", combined); + ShellResult fmt = formatZk(params); + if (fmt.getExitCode() != 0) { + return fmt; + } + return LinuxOSUtils.sudoExecCmd(startCmd, hadoopParams.user()); + } + + return startRes; } catch (Exception e) { throw new StackException(e); } diff --git a/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java b/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java index dd0278bca..a0bd5dd97 100644 --- a/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java +++ b/bigtop-manager-stack/bigtop-manager-stack-core/src/main/java/org/apache/bigtop/manager/stack/core/executor/StackExecutor.java @@ -73,10 +73,20 @@ public static ShellResult execute(ComponentCommandPayload payload) { String command = payload.getCommand().equalsIgnoreCase(Command.CUSTOM.getCode()) ? payload.getCustomCommand() : payload.getCommand(); + if (command == null || command.isBlank()) { + throw new StackException("CUSTOM command requires non-blank customCommand, payload: {0}", payload); + } Script script = getCommandScript(payload); String methodName = CaseUtils.toCamelCase(command, CaseUtils.SEPARATOR_UNDERSCORE, false); - Method method = script.getClass().getMethod(methodName, Params.class); + Method method; + try { + method = script.getClass().getMethod(methodName, Params.class); + } catch (NoSuchMethodException e) { + // Backward/forward compatibility: allow customCommand to be already in camelCase. + methodName = command; + method = script.getClass().getMethod(methodName, Params.class); + } Params params = PARAMS_MAP .get(payload.getServiceName()) diff --git a/bigtop-manager-ui/src/api/service/index.ts b/bigtop-manager-ui/src/api/service/index.ts index 9b09d6d75..a8bf7db44 100644 --- a/bigtop-manager-ui/src/api/service/index.ts +++ b/bigtop-manager-ui/src/api/service/index.ts @@ -61,6 +61,39 @@ export const takeServiceConfigSnapshot = (pathParams: ServiceParams, data: Snaps ) } +export interface EnableHdfsHaReq { + activeNameNodeHost: string + standbyNameNodeHost: string + journalNodeHosts: string[] + zookeeperServiceId?: number + zookeeperHosts?: string[] + zkfcHosts: string[] + nameservice: string +} + +export const enableHdfsHa = (clusterId: number, serviceId: number, data: EnableHdfsHaReq) => { + return post(`/clusters/${clusterId}/services/${serviceId}/actions/enable-hdfs-ha`, data) +} + +export interface EnableYarnRmHaReq { + /** rm1 host */ + activeResourceManagerHost: string + /** rm2 host */ + standbyResourceManagerHost: string + /** yarn.resourcemanager.ha.rm-ids,rm1,rm2 */ + rmIds: string[] + /** yarn.resourcemanager.cluster-id */ + yarnClusterId: string + /** zookeeper service id */ + zookeeperServiceId?: number + /** zookeeper hosts */ + zookeeperHosts?: string[] +} + +export const enableYarnRmHa = (clusterId: number, serviceId: number, data: EnableYarnRmHaReq) => { + return post(`/clusters/${clusterId}/services/${serviceId}/actions/enable-yarn-rm-ha`, data) +} + export const recoveryServiceConfigSnapshot = (pathParams: SnapshotRecovery) => { return post( `/clusters/${pathParams.clusterId}/services/${pathParams.id}/config-snapshots/${pathParams.snapshotId}` diff --git a/bigtop-manager-ui/src/features/service-management/index.vue b/bigtop-manager-ui/src/features/service-management/index.vue index 7c3e2769f..66377f9c6 100644 --- a/bigtop-manager-ui/src/features/service-management/index.vue +++ b/bigtop-manager-ui/src/features/service-management/index.vue @@ -18,6 +18,7 @@ -->