Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e878ad1
enable ha
Dec 25, 2025
b02f155
ha
Jan 13, 2026
48e2a83
ha2
Jan 14, 2026
d0798b7
haenable4
Jan 19, 2026
48591e1
haenable5
Jan 20, 2026
5d114b1
haenable6
Jan 22, 2026
ecf2061
haenable7
Jan 23, 2026
8270416
haenable8
Jan 27, 2026
6d68723
haenable9
Jan 27, 2026
d51b2e0
haenable10
Jan 27, 2026
790440c
haenable11
Jan 29, 2026
742dd50
haenable12
Jan 29, 2026
44dee07
haenable1
Jan 29, 2026
8f713f6
haenable2
Jan 30, 2026
aa18338
haenable4
Jan 30, 2026
9d5985c
haenable5
Feb 2, 2026
ca23368
haenableall
Feb 3, 2026
0daa0a8
Update KafkaSetup.java
lvkaihua Feb 3, 2026
6e1bea3
Update index.vue
lvkaihua Feb 4, 2026
ac2b0db
Update HadoopParams.java
lvkaihua Feb 4, 2026
b98af46
Update NameNodeScript.java
lvkaihua Feb 4, 2026
fcfdc62
Update HdfsHaService.java
lvkaihua Feb 4, 2026
f07dcdf
Update YarnHaService.java
lvkaihua Feb 4, 2026
433f5f4
Update YarnHaServiceImpl.java
lvkaihua Feb 4, 2026
2d034b8
spotless
Feb 4, 2026
4ee2646
Update EnableHdfsHaJob.java
lvkaihua Feb 4, 2026
3263538
Update EnableYarnRmHaJob.java
lvkaihua Feb 4, 2026
8376ea3
Update ZkfcScript.java
lvkaihua Feb 4, 2026
502e851
Update HadoopParams.java
lvkaihua Feb 4, 2026
4dd2f54
Update index.ts
lvkaihua Feb 4, 2026
811a7d9
Update AbstractComponentStageTest.java
lvkaihua Feb 4, 2026
a771a6e
Update ComponentStartTaskTest.java
lvkaihua Feb 4, 2026
92e7c07
Update ComponentStartTaskTest.java
lvkaihua Feb 4, 2026
b96cf26
Update AbstractComponentStageTest.java
lvkaihua Feb 4, 2026
bf1f9bf
Update ComponentStartTaskTest.java
lvkaihua Feb 4, 2026
171016b
Update ComponentStartTaskTest.java
lvkaihua Feb 4, 2026
12161b5
Update AbstractComponentStageTest.java
lvkaihua Feb 4, 2026
426c16b
Update ComponentStartTaskTest.java
lvkaihua Feb 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public enum Command {
CHECK("check", "Check"),
CONFIGURE("configure", "Configure"),
CUSTOM("custom", "Custom"),
ENABLE_YARN_RM_HA("enable-yarn-rm-ha", "EnableYarnRmHa"),
ENABLE_HDFS_HA("enable-hdfs-ha", "EnableHdfsHa"),

// Internal use only, not available for API call
INIT("init", "Init"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bigtop.manager.server.command.factory.component;

import org.apache.bigtop.manager.common.enums.Command;
import org.apache.bigtop.manager.server.command.CommandIdentifier;
import org.apache.bigtop.manager.server.command.job.Job;
import org.apache.bigtop.manager.server.command.job.JobContext;
import org.apache.bigtop.manager.server.command.job.component.ComponentConfigureJob;
import org.apache.bigtop.manager.server.enums.CommandLevel;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ComponentConfigureJobFactory extends AbstractComponentJobFactory {

@Override
public CommandIdentifier getCommandIdentifier() {
return new CommandIdentifier(CommandLevel.COMPONENT, Command.CONFIGURE);
}

@Override
public Job createJob(JobContext jobContext) {
return new ComponentConfigureJob(jobContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bigtop.manager.server.command.factory.component;

import org.apache.bigtop.manager.common.enums.Command;
import org.apache.bigtop.manager.server.command.CommandIdentifier;
import org.apache.bigtop.manager.server.command.job.Job;
import org.apache.bigtop.manager.server.command.job.JobContext;
import org.apache.bigtop.manager.server.command.job.component.ComponentCustomJob;
import org.apache.bigtop.manager.server.enums.CommandLevel;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ComponentCustomJobFactory extends AbstractComponentJobFactory {

@Override
public CommandIdentifier getCommandIdentifier() {
return new CommandIdentifier(CommandLevel.COMPONENT, Command.CUSTOM);
}

@Override
public Job createJob(JobContext jobContext) {
return new ComponentCustomJob(jobContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bigtop.manager.server.command.factory.service;

import org.apache.bigtop.manager.common.enums.Command;
import org.apache.bigtop.manager.server.command.CommandIdentifier;
import org.apache.bigtop.manager.server.command.job.Job;
import org.apache.bigtop.manager.server.command.job.JobContext;
import org.apache.bigtop.manager.server.command.job.service.EnableHdfsHaJob;
import org.apache.bigtop.manager.server.enums.CommandLevel;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class EnableHdfsHaJobFactory extends AbstractServiceJobFactory {

@Override
public CommandIdentifier getCommandIdentifier() {
return new CommandIdentifier(CommandLevel.SERVICE, Command.ENABLE_HDFS_HA);
}

@Override
public Job createJob(JobContext jobContext) {
log.info(
"EnableHdfsHaJobFactory creating job, jobFactoryClassSource={}, jobClassSource={}, stageClassSource={}, taskClassSource={}, commandDTO.command={}, commandDTO.customCommandLen={}",
getCodeSource(EnableHdfsHaJobFactory.class),
getCodeSource(EnableHdfsHaJob.class),
getCodeSource(org.apache.bigtop.manager.server.command.stage.ComponentCustomStage.class),
getCodeSource(org.apache.bigtop.manager.server.command.task.ComponentCustomTask.class),
jobContext == null || jobContext.getCommandDTO() == null
? null
: jobContext.getCommandDTO().getCommand(),
jobContext == null
|| jobContext.getCommandDTO() == null
|| jobContext.getCommandDTO().getCustomCommand() == null
? null
: jobContext.getCommandDTO().getCustomCommand().length());
return new EnableHdfsHaJob(jobContext);
}

private static String getCodeSource(Class<?> clazz) {
try {
if (clazz == null
|| clazz.getProtectionDomain() == null
|| clazz.getProtectionDomain().getCodeSource() == null) {
return "null";
}
return String.valueOf(clazz.getProtectionDomain().getCodeSource().getLocation());
} catch (Exception e) {
return "error:" + e.getMessage();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bigtop.manager.server.command.factory.service;

import org.apache.bigtop.manager.common.enums.Command;
import org.apache.bigtop.manager.server.command.CommandIdentifier;
import org.apache.bigtop.manager.server.command.job.Job;
import org.apache.bigtop.manager.server.command.job.JobContext;
import org.apache.bigtop.manager.server.command.job.service.EnableYarnRmHaJob;
import org.apache.bigtop.manager.server.enums.CommandLevel;

import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class EnableYarnRmHaJobFactory extends AbstractServiceJobFactory {

@Override
public CommandIdentifier getCommandIdentifier() {
return new CommandIdentifier(CommandLevel.SERVICE, Command.ENABLE_YARN_RM_HA);
}

@Override
public Job createJob(JobContext jobContext) {
return new EnableYarnRmHaJob(jobContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.bigtop.manager.server.model.dto.StackDTO;
import org.apache.bigtop.manager.server.utils.StackUtils;

import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -75,6 +77,12 @@ private static void initialize() {
}

public static void sendJobCache(Long jobId, List<String> hostnames) {
sendJobCache(jobId, hostnames, null);
}

public static void sendJobCache(
Long jobId, List<String> hostnames, Map<String, List<String>> overrideComponentHosts) {
final Map<String, List<String>> finalOverrideComponentHosts = overrideComponentHosts;
if (!INITIALIZED.get()) {
initialize();
}
Expand Down Expand Up @@ -102,6 +110,7 @@ public static void sendJobCache(Long jobId, List<String> hostnames) {
JobCachePayload copiedPayload =
JsonUtils.readFromString(JsonUtils.writeAsString(payload), JobCachePayload.class);
genClusterPayload(copiedPayload, clusterId);
mergeOverrideComponentHosts(copiedPayload, finalOverrideComponentHosts);
JobCacheRequest request = JobCacheRequest.newBuilder()
.setJobId(jobId)
.setPayload(JsonUtils.writeAsString(copiedPayload))
Expand Down Expand Up @@ -162,6 +171,31 @@ private static void genClusterPayload(JobCachePayload payload, Long clusterId) {
payload.setHosts(hosts);
}

private static void mergeOverrideComponentHosts(
JobCachePayload payload, Map<String, List<String>> overrideComponentHosts) {
if (overrideComponentHosts == null || overrideComponentHosts.isEmpty()) {
return;
}
if (payload.getComponentHosts() == null) {
payload.setComponentHosts(new HashMap<>());
}
final Map<String, List<String>> target = payload.getComponentHosts();

overrideComponentHosts.forEach((component, hosts) -> {
if (StringUtils.isBlank(component) || hosts == null || hosts.isEmpty()) {
return;
}
List<String> filtered = hosts.stream()
.filter(StringUtils::isNotBlank)
.map(String::trim)
.distinct()
.toList();
if (!filtered.isEmpty()) {
target.put(component.toLowerCase(), new ArrayList<>(filtered));
}
});
}

private static void genGlobalPayload(JobCachePayload payload) {
List<RepoPO> repoPOList = repoDao.findAll();
Map<String, Map<String, String>> serviceConfigMap = getServiceConfigMap(0L);
Expand Down Expand Up @@ -220,13 +254,11 @@ private static Map<String, List<String>> getComponentHostMap(Long clusterId) {
List<ComponentPO> componentPOList = componentDao.findByQuery(query);
Map<String, List<String>> hostMap = new HashMap<>();
componentPOList.forEach(x -> {
if (hostMap.containsKey(x.getName())) {
hostMap.get(x.getName()).add(x.getHostname());
} else {
List<String> list = new ArrayList<>();
list.add(x.getHostname());
hostMap.put(x.getName(), list);
if (StringUtils.isBlank(x.getHostname())) {
return;
}
hostMap.computeIfAbsent(x.getName(), k -> new ArrayList<>())
.add(x.getHostname().trim());
});

return hostMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
Expand Down Expand Up @@ -75,6 +76,10 @@ public AbstractJob(JobContext jobContext) {
if (CollectionUtils.isEmpty(stages)) {
throw new ApiException(ApiExceptionEnum.JOB_HAS_NO_STAGES);
}

for (Stage stage : stages) {
stage.init();
}
}

protected void injectBeans() {
Expand All @@ -92,6 +97,26 @@ protected void beforeCreateStages() {

protected abstract void createStages();

protected Map<String, List<String>> buildOverrideComponentHosts() {
Map<String, List<String>> m = new java.util.HashMap<>();
if (jobContext == null
|| jobContext.getCommandDTO() == null
|| jobContext.getCommandDTO().getComponentCommands() == null) {
return m;
}
for (org.apache.bigtop.manager.server.model.dto.command.ComponentCommandDTO cc :
jobContext.getCommandDTO().getComponentCommands()) {
if (cc == null
|| cc.getComponentName() == null
|| cc.getHostnames() == null
|| cc.getHostnames().isEmpty()) {
continue;
}
m.put(cc.getComponentName().toLowerCase(), cc.getHostnames());
}
return m;
}

@Override
public void beforeRun() {
jobPO.setState(JobState.PROCESSING.getName());
Expand All @@ -113,7 +138,7 @@ public void run() {
.flatMap(List::stream)
.distinct()
.toList();
JobCacheHelper.sendJobCache(jobPO.getId(), hostnames);
JobCacheHelper.sendJobCache(jobPO.getId(), hostnames, buildOverrideComponentHosts());

LinkedBlockingQueue<Stage> queue = new LinkedBlockingQueue<>(stages);
while (!queue.isEmpty()) {
Expand Down
Loading
Loading