Skip to content

Commit 918a1fb

Browse files
authored
fix(builder): update version 0.5.1 #andy (#407)
1 parent 7838257 commit 918a1fb

File tree

27 files changed

+551
-306
lines changed

27 files changed

+551
-306
lines changed

builder/core/src/main/java/com/antgroup/openspg/builder/core/logical/LogicalPlan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
@AllArgsConstructor
3131
public class LogicalPlan implements Serializable {
3232

33+
private static final long serialVersionUID = -4487139289740223319L;
34+
3335
/** DAG (Directed Acyclic Graph) of the logical execution plan. */
3436
private final Graph<BaseLogicalNode<?>, DefaultEdge> dag;
3537

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/PhysicalPlan.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
@AllArgsConstructor
3131
public class PhysicalPlan implements Serializable {
3232

33+
private static final long serialVersionUID = -5866035535857620657L;
34+
3335
/** DAG (Directed Acyclic Graph) of the physical execution plan. */
3436
private final Graph<BaseProcessor<?>, DefaultEdge> dag;
3537

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/LLMNlExtractProcessor.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ExecutionException;
3333
import java.util.concurrent.Future;
3434
import java.util.concurrent.LinkedBlockingQueue;
35+
import java.util.concurrent.RejectedExecutionHandler;
3536
import java.util.concurrent.ThreadPoolExecutor;
3637
import java.util.concurrent.TimeUnit;
3738
import lombok.extern.slf4j.Slf4j;
@@ -41,14 +42,16 @@ public class LLMNlExtractProcessor extends BasePythonProcessor<LLMNlExtractNodeC
4142

4243
private ExecuteNode node;
4344

44-
private static final ThreadPoolExecutor executor =
45-
new ThreadPoolExecutor(
46-
30,
47-
60,
48-
60 * 60,
49-
TimeUnit.SECONDS,
50-
new LinkedBlockingQueue<>(1000),
51-
new ThreadPoolExecutor.CallerRunsPolicy());
45+
private static final RejectedExecutionHandler handler =
46+
(r, executor) -> {
47+
try {
48+
executor.getQueue().put(r);
49+
} catch (InterruptedException e) {
50+
Thread.currentThread().interrupt();
51+
}
52+
};
53+
54+
private static ThreadPoolExecutor executor;
5255

5356
public LLMNlExtractProcessor(String id, String name, LLMNlExtractNodeConfig config) {
5457
super(id, name, config);
@@ -58,6 +61,16 @@ public LLMNlExtractProcessor(String id, String name, LLMNlExtractNodeConfig conf
5861
public void doInit(BuilderContext context) throws BuilderException {
5962
super.doInit(context);
6063
this.node = context.getExecuteNodes().get(getId());
64+
if (executor == null) {
65+
executor =
66+
new ThreadPoolExecutor(
67+
context.getModelExecuteNum(),
68+
context.getModelExecuteNum(),
69+
60 * 60,
70+
TimeUnit.SECONDS,
71+
new LinkedBlockingQueue<>(100),
72+
handler);
73+
}
6174
}
6275

6376
@Override

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/ParagraphSplitProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ public List<ChunkRecord.Chunk> readFile(String fileUrl) {
146146
case "json":
147147
className = "JSONReader";
148148
break;
149+
case "doc":
150+
case "docx":
151+
className = "DocxReader";
152+
break;
149153
}
150154
node.addTraceLog("invoke chunk operator:%s", className);
151155
pythonInterpreter.exec("from kag.builder.component.reader import " + className);

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/utils/CommonUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public static List<BaseSPGRecord> convertNodes(
9393
return records;
9494
}
9595

96-
private static String labelPrefix(String namespace, String label) {
96+
public static String labelPrefix(String namespace, String label) {
9797
if (label.contains(DOT)) {
9898
return label;
9999
}

builder/core/src/main/java/com/antgroup/openspg/builder/core/runtime/BuilderContext.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
@Accessors(chain = true)
2727
public class BuilderContext implements Serializable {
2828

29+
private static final long serialVersionUID = 2446709406202543546L;
30+
2931
private long projectId;
3032
private String project;
3133
private String jobName;
@@ -41,6 +43,7 @@ public class BuilderContext implements Serializable {
4143
private int batchSize = 1;
4244
private int parallelism = 1;
4345
private boolean enableLeadTo;
46+
private Integer modelExecuteNum = 5;
4447

4548
private Map<String, ExecuteNode> executeNodes;
4649
private String schemaUrl;

builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@
3939
import java.util.concurrent.ExecutorService;
4040
import java.util.concurrent.Future;
4141
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.RejectedExecutionHandler;
4243
import java.util.concurrent.ThreadPoolExecutor;
4344
import java.util.concurrent.TimeUnit;
4445
import java.util.function.Consumer;
4546
import lombok.extern.slf4j.Slf4j;
47+
import org.apache.commons.lang3.StringUtils;
4648

4749
@Slf4j
4850
public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
@@ -53,8 +55,16 @@ public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
5355
private Neo4jStoreClient client;
5456
private Project project;
5557
private static final String DOT = ".";
56-
ExecutorService nodeExecutor;
57-
ExecutorService edgeExecutor;
58+
ExecutorService executor;
59+
60+
RejectedExecutionHandler handler =
61+
(r, executor) -> {
62+
try {
63+
executor.getQueue().put(r);
64+
} catch (InterruptedException e) {
65+
Thread.currentThread().interrupt();
66+
}
67+
};
5868

5969
public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) {
6070
super(id, name, config);
@@ -69,22 +79,14 @@ public void doInit(BuilderContext context) throws BuilderException {
6979
}
7080
client = new Neo4jStoreClient(context.getGraphStoreUrl());
7181
project = JSON.parseObject(context.getProject(), Project.class);
72-
nodeExecutor =
73-
new ThreadPoolExecutor(
74-
NUM_THREADS,
75-
NUM_THREADS,
76-
2 * 60L,
77-
TimeUnit.SECONDS,
78-
new LinkedBlockingQueue<>(1000),
79-
new ThreadPoolExecutor.CallerRunsPolicy());
80-
edgeExecutor =
82+
executor =
8183
new ThreadPoolExecutor(
8284
NUM_THREADS,
8385
NUM_THREADS,
8486
2 * 60L,
8587
TimeUnit.SECONDS,
86-
new LinkedBlockingQueue<>(1000),
87-
new ThreadPoolExecutor.CallerRunsPolicy());
88+
new LinkedBlockingQueue<>(100),
89+
handler);
8890
}
8991

9092
@Override
@@ -115,7 +117,7 @@ public void writeToNeo4j(SubGraphRecord subGraphRecord) {
115117
try {
116118
node.addTraceLog("Start Writer Nodes processor...");
117119
List<Future<Void>> nodeFutures =
118-
submitTasks(nodeExecutor, subGraphRecord.getResultNodes(), this::writeNode);
120+
submitTasks(executor, subGraphRecord.getResultNodes(), this::writeNode);
119121
awaitAllTasks(nodeFutures);
120122
node.addTraceLog("Writer Nodes succeed");
121123
} catch (InterruptedException | ExecutionException e) {
@@ -125,7 +127,7 @@ public void writeToNeo4j(SubGraphRecord subGraphRecord) {
125127
try {
126128
node.addTraceLog("Start Writer Edges processor...");
127129
List<Future<Void>> edgeFutures =
128-
submitTasks(edgeExecutor, subGraphRecord.getResultEdges(), this::writeEdge);
130+
submitTasks(executor, subGraphRecord.getResultEdges(), this::writeEdge);
129131
awaitAllTasks(edgeFutures);
130132
node.addTraceLog("Writer Edges succeed");
131133
} catch (InterruptedException | ExecutionException e) {
@@ -174,7 +176,10 @@ private void writeNode(SubGraphRecord.Node node) {
174176
try {
175177
Long statr = System.currentTimeMillis();
176178
RecordAlterOperationEnum operation = context.getOperation();
177-
if (node.getId() == null || node.getName() == null) {
179+
if (StringUtils.isBlank(node.getId())
180+
|| StringUtils.isBlank(node.getName())
181+
|| StringUtils.isBlank(node.getLabel())) {
182+
log.info(String.format("write Node ignore node:%s", JSON.toJSONString(node)));
178183
return;
179184
}
180185
String label = labelPrefix(node.getLabel());
@@ -212,7 +217,10 @@ private void writeEdge(SubGraphRecord.Edge edge) {
212217
try {
213218
Long statr = System.currentTimeMillis();
214219
RecordAlterOperationEnum operation = context.getOperation();
215-
if (edge.getFrom() == null || edge.getTo() == null) {
220+
if (StringUtils.isBlank(edge.getFrom())
221+
|| StringUtils.isBlank(edge.getTo())
222+
|| StringUtils.isBlank(edge.getLabel())) {
223+
log.info(String.format("write Edge ignore edge:%s", JSON.toJSONString(edge)));
216224
return;
217225
}
218226
List<EdgeRecord> edgeRecords = Lists.newArrayList();

common/util/src/main/java/com/antgroup/openspg/common/util/neo4j/Neo4jDriverManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static Driver getNeo4jDriver(String uri, String user, String password) {
3737
Config config =
3838
Config.builder()
3939
.withMaxConnectionPoolSize(200)
40-
.withMaxConnectionLifetime(2, TimeUnit.HOURS)
40+
.withMaxConnectionLifetime(4, TimeUnit.HOURS)
4141
.withMaxTransactionRetryTime(300, TimeUnit.SECONDS)
4242
.withConnectionAcquisitionTimeout(300, TimeUnit.SECONDS)
4343
.build();

common/util/src/main/java/com/antgroup/openspg/common/util/neo4j/Neo4jGraphUtils.java

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package com.antgroup.openspg.common.util.neo4j;
1515

16+
import com.antgroup.openspg.common.util.Md5Utils;
1617
import com.antgroup.openspg.common.util.tuple.Tuple2;
1718
import com.antgroup.openspg.core.schema.model.predicate.IndexTypeEnum;
1819
import com.antgroup.openspg.core.schema.model.predicate.Property;
@@ -21,6 +22,7 @@
2122
import java.util.HashMap;
2223
import java.util.List;
2324
import java.util.Map;
25+
import java.util.UUID;
2426
import java.util.concurrent.TimeUnit;
2527
import java.util.stream.Collectors;
2628
import lombok.extern.slf4j.Slf4j;
@@ -35,8 +37,6 @@
3537
@Slf4j
3638
public class Neo4jGraphUtils {
3739

38-
private static final String ALL_GRAPH = "allGraph";
39-
4040
private Driver driver;
4141
private String database;
4242
private Neo4jIndexUtils neo4jIndex;
@@ -127,22 +127,22 @@ public List<String> getAllLabels() {
127127
return labels;
128128
}
129129

130-
public void createAllGraph() {
130+
public void createAllGraph(String allGraph) {
131131
Session session = driver.session(SessionConfig.forDatabase(this.database));
132132
String existsQuery =
133133
String.format(
134134
"CALL gds.graph.exists('%s') YIELD exists "
135135
+ "WHERE exists "
136136
+ "CALL gds.graph.drop('%s') YIELD graphName "
137137
+ "RETURN graphName",
138-
ALL_GRAPH, ALL_GRAPH);
138+
allGraph, allGraph);
139139

140140
Result result = session.run(existsQuery);
141141
ResultSummary summary = result.consume();
142142
log.debug(
143143
"create pagerank graph exists graph_name: {} database: {} succeed "
144144
+ "executed: {} consumed: {}",
145-
ALL_GRAPH,
145+
allGraph,
146146
database,
147147
summary.resultAvailableAfter(TimeUnit.MILLISECONDS),
148148
summary.resultConsumedAfter(TimeUnit.MILLISECONDS));
@@ -152,13 +152,33 @@ public void createAllGraph() {
152152
"CALL gds.graph.project('%s','*','*') "
153153
+ "YIELD graphName, nodeCount AS nodes, relationshipCount AS rels "
154154
+ "RETURN graphName, nodes, rels",
155-
ALL_GRAPH);
155+
allGraph);
156156

157157
result = session.run(projectQuery);
158158
summary = result.consume();
159159
log.debug(
160160
"create pagerank graph graph_name: {} database: {} succeed " + "executed: {} consumed: {}",
161-
ALL_GRAPH,
161+
allGraph,
162+
database,
163+
summary.resultAvailableAfter(TimeUnit.MILLISECONDS),
164+
summary.resultConsumedAfter(TimeUnit.MILLISECONDS));
165+
}
166+
167+
public void dropAllGraph(String allGraph) {
168+
Session session = driver.session(SessionConfig.forDatabase(this.database));
169+
String existsQuery =
170+
String.format(
171+
"CALL gds.graph.exists('%s') YIELD exists "
172+
+ "WHERE exists "
173+
+ "CALL gds.graph.drop('%s') YIELD graphName "
174+
+ "RETURN graphName",
175+
allGraph, allGraph);
176+
177+
Result result = session.run(existsQuery);
178+
ResultSummary summary = result.consume();
179+
log.debug(
180+
"drop pagerank graph graph_name: {} database: {} succeed executed: {} consumed: {}",
181+
allGraph,
162182
database,
163183
summary.resultAvailableAfter(TimeUnit.MILLISECONDS),
164184
summary.resultConsumedAfter(TimeUnit.MILLISECONDS));
@@ -167,12 +187,18 @@ public void createAllGraph() {
167187
public List<Map<String, Object>> getPageRankScores(
168188
List<Map<String, String>> startNodes, String targetType) {
169189
Session session = driver.session(SessionConfig.forDatabase(this.database));
170-
createAllGraph();
171-
return session.writeTransaction(tx -> getPageRankScores(tx, startNodes, targetType));
190+
String allGraph = "allGraph_" + Md5Utils.md5Of(UUID.randomUUID().toString());
191+
createAllGraph(allGraph);
192+
try {
193+
return session.writeTransaction(
194+
tx -> getPageRankScores(tx, allGraph, startNodes, targetType));
195+
} finally {
196+
dropAllGraph(allGraph);
197+
}
172198
}
173199

174200
private List<Map<String, Object>> getPageRankScores(
175-
Transaction tx, List<Map<String, String>> startNodes, String returnType) {
201+
Transaction tx, String allGraph, List<Map<String, String>> startNodes, String returnType) {
176202
List<String> matchClauses = new ArrayList<>();
177203
List<String> matchIdentifiers = new ArrayList<>();
178204

@@ -205,7 +231,7 @@ private List<Map<String, Object>> getPageRankScores(
205231
+ "RETURN id(m) AS g_id, gds.util.asNode(nodeId).id AS id, score "
206232
+ "ORDER BY score DESC",
207233
matchQuery,
208-
ALL_GRAPH,
234+
allGraph,
209235
matchIdentifierStr,
210236
Neo4jCommonUtils.escapeNeo4jIdentifier(returnType));
211237

@@ -231,6 +257,16 @@ public void createDatabase(String database) {
231257
});
232258
}
233259

260+
public void dropDatabase(String database) {
261+
Session session = driver.session(SessionConfig.forDatabase(this.database));
262+
session.writeTransaction(
263+
tx -> {
264+
tx.run(String.format("DROP DATABASE %s IF EXISTS", database));
265+
tx.commit();
266+
return null;
267+
});
268+
}
269+
234270
public void deleteAllData(String database) {
235271
if (!this.database.equals(database)) {
236272
throw new IllegalArgumentException(

dev/release/docker-compose.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@ services:
1717
command: [
1818
"java",
1919
"-Dfile.encoding=UTF-8",
20-
"-Xms4096m",
21-
"-Xmx4096m",
20+
"-Xms2048m",
21+
"-Xmx8192m",
2222
"-jar",
2323
"arks-sofaboot-0.0.1-SNAPSHOT-executable.jar",
2424
'--server.repository.impl.jdbc.host=mysql',
2525
'--server.repository.impl.jdbc.password=openspg',
26+
'--builder.model.execute.num=5',
2627
'--cloudext.graphstore.url=neo4j://release-openspg-neo4j:7687?user=neo4j&password=neo4j@openspg&database=neo4j',
2728
'--cloudext.searchengine.url=neo4j://release-openspg-neo4j:7687?user=neo4j&password=neo4j@openspg&database=neo4j'
2829
]

0 commit comments

Comments
 (0)