Skip to content

Commit 0e687f7

Browse files
authored
fix(builder): add embedding model (#385)
1 parent 3f863f8 commit 0e687f7

File tree

19 files changed

+177
-163
lines changed

19 files changed

+177
-163
lines changed

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/LLMNlExtractProcessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ public List<SubGraphRecord> call() throws Exception {
117117
log.info("LLMNlExtractProcessor invoke Chunks: {}", names);
118118
List<Object> result =
119119
(List<Object>) operatorFactory.invoke(config.getOperatorConfig(), record);
120-
log.info("LLMNlExtractProcessor invoke result: {}", JSON.toJSONString(result));
121120
List<SubGraphRecord> records =
122121
JSON.parseObject(JSON.toJSONString(result), new TypeReference<List<SubGraphRecord>>() {});
123122
node.addTraceLog(

builder/core/src/main/java/com/antgroup/openspg/builder/core/physical/process/VectorizerProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,12 @@ public List<BaseRecord> process(List<BaseRecord> inputs) {
4747
node.addTraceLog("Start vectorizer processor...");
4848
List<BaseRecord> results = new ArrayList<>();
4949
SubGraphRecord subGraph = new SubGraphRecord(Lists.newArrayList(), Lists.newArrayList());
50+
SubGraphRecord outputs = new SubGraphRecord(Lists.newArrayList(), Lists.newArrayList());
5051

5152
for (BaseRecord record : inputs) {
5253
SubGraphRecord spgRecord = (SubGraphRecord) record;
54+
outputs.getResultNodes().addAll(spgRecord.getResultNodes());
55+
outputs.getResultEdges().addAll(spgRecord.getResultEdges());
5356
Map map = mapper.convertValue(spgRecord, Map.class);
5457
node.addTraceLog(
5558
"invoke vectorizer processor operator:%s", config.getOperatorConfig().getClassName());
@@ -69,7 +72,7 @@ public List<BaseRecord> process(List<BaseRecord> inputs) {
6972
}
7073
results.add(subGraph);
7174
node.addTraceLog("post vectorizer complete...");
72-
node.setOutputs(subGraph);
75+
node.setOutputs(outputs);
7376
node.setStatus(StatusEnum.FINISH);
7477
return results;
7578
}

builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.record.LPGPropertyRecord;
3131
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.record.VertexRecord;
3232
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.schema.EdgeTypeName;
33-
import com.antgroup.openspg.reasoner.runner.local.impl.LocalRunnerThreadPool;
3433
import com.antgroup.openspg.server.common.model.project.Project;
3534
import com.google.common.collect.Lists;
3635
import java.util.ArrayList;
@@ -39,16 +38,23 @@
3938
import java.util.concurrent.ExecutionException;
4039
import java.util.concurrent.ExecutorService;
4140
import java.util.concurrent.Future;
41+
import java.util.concurrent.LinkedBlockingQueue;
42+
import java.util.concurrent.ThreadPoolExecutor;
43+
import java.util.concurrent.TimeUnit;
4244
import java.util.function.Consumer;
4345
import lombok.extern.slf4j.Slf4j;
4446

4547
@Slf4j
4648
public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
4749

50+
private static final int NUM_THREADS = 10;
51+
4852
private ExecuteNode node;
4953
private Neo4jStoreClient client;
5054
private Project project;
5155
private static final String DOT = ".";
56+
ExecutorService nodeExecutor;
57+
ExecutorService edgeExecutor;
5258

5359
public Neo4jSinkWriter(String id, String name, Neo4jSinkNodeConfig config) {
5460
super(id, name, config);
@@ -63,6 +69,22 @@ public void doInit(BuilderContext context) throws BuilderException {
6369
}
6470
client = new Neo4jStoreClient(context.getGraphStoreUrl());
6571
project = JSON.parseObject(context.getProject(), Project.class);
72+
nodeExecutor =
73+
new ThreadPoolExecutor(
74+
NUM_THREADS,
75+
NUM_THREADS,
76+
2 * 60L,
77+
TimeUnit.SECONDS,
78+
new LinkedBlockingQueue<>(1000),
79+
new ThreadPoolExecutor.CallerRunsPolicy());
80+
edgeExecutor =
81+
new ThreadPoolExecutor(
82+
NUM_THREADS,
83+
NUM_THREADS,
84+
2 * 60L,
85+
TimeUnit.SECONDS,
86+
new LinkedBlockingQueue<>(1000),
87+
new ThreadPoolExecutor.CallerRunsPolicy());
6688
}
6789

6890
@Override
@@ -90,8 +112,6 @@ private void batchWriteToNeo4j(List<BaseRecord> records) {
90112
public void writeToNeo4j(SubGraphRecord subGraphRecord) {
91113
subGraphRecord.getResultNodes().forEach(node -> convertProperties(node.getProperties()));
92114
subGraphRecord.getResultEdges().forEach(edge -> convertProperties(edge.getProperties()));
93-
94-
ExecutorService nodeExecutor = LocalRunnerThreadPool.getThreadPoolExecutor(null);
95115
try {
96116
node.addTraceLog("Start Writer Nodes processor...");
97117
List<Future<Void>> nodeFutures =
@@ -102,8 +122,6 @@ public void writeToNeo4j(SubGraphRecord subGraphRecord) {
102122
Thread.currentThread().interrupt();
103123
throw new RuntimeException("Error during node upsert", e);
104124
}
105-
106-
ExecutorService edgeExecutor = LocalRunnerThreadPool.getThreadPoolExecutor(null);
107125
try {
108126
node.addTraceLog("Start Writer Edges processor...");
109127
List<Future<Void>> edgeFutures =

common/util/src/main/java/com/antgroup/openspg/common/util/neo4j/Neo4jDriverManager.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
import com.google.common.collect.Maps;
1616
import java.util.Map;
17+
import java.util.concurrent.TimeUnit;
1718
import java.util.concurrent.locks.Lock;
1819
import java.util.concurrent.locks.ReentrantLock;
1920
import org.neo4j.driver.AuthTokens;
21+
import org.neo4j.driver.Config;
2022
import org.neo4j.driver.Driver;
2123
import org.neo4j.driver.GraphDatabase;
2224

@@ -32,7 +34,14 @@ public static Driver getNeo4jDriver(String uri, String user, String password) {
3234
if (instanceMap.get(uniqueKey) == null) {
3335
Driver driver;
3436
try {
35-
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
37+
Config config =
38+
Config.builder()
39+
.withMaxConnectionPoolSize(200)
40+
.withMaxConnectionLifetime(2, TimeUnit.HOURS)
41+
.withMaxTransactionRetryTime(300, TimeUnit.SECONDS)
42+
.withConnectionAcquisitionTimeout(300, TimeUnit.SECONDS)
43+
.build();
44+
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password), config);
3645
driver.verifyConnectivity();
3746
} catch (Exception e) {
3847
throw new RuntimeException("init Neo4j Client failed :" + uri, e);

dev/release/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
version: "3.7"
12
services:
23
server:
34
restart: always

dev/release/mysql/buildx-release-mysql.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
# or implied.
1111

1212
docker buildx build -f Dockerfile --platform linux/arm64/v8,linux/amd64 --push \
13-
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:0.0.3 \
14-
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-mysql:latest \
15-
-t openspg/openspg-mysql:0.0.3 \
13+
-t openspg/openspg-mysql:0.5 \
1614
-t openspg/openspg-mysql:latest \
1715
.

dev/release/mysql/sql/initdb.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ CREATE TABLE `kg_ontology_ext` (
234234
) AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT = 'schema的扩展属性';
235235

236236
INSERT INTO kg_biz_domain (`id`,`gmt_create`,`gmt_modified`,`name`,`status`,`description`,`global_config`) VALUES(1,'2023-09-01 00:00:00','2023-09-01 00:00:00','defaultTenant','VALID','',null);
237-
INSERT INTO kg_project_info (`id`,`name`,`description`,`status`,`gmt_create`,`gmt_modified`,`namespace`,`biz_domain_id`) VALUES(1,'defaultProject','defaultProject','VALID','2023-09-01 00:00:00','2023-09-01 00:00:00','DEFAULT',1);
238237

239238
INSERT INTO kg_ontology_entity (`id`,`original_id`,`name`,`name_zh`,`entity_category`,`layer`,`description`,`description_zh`,`status`,`with_index`,`scope`,`version`,`version_status`,`gmt_create`,`gmt_modified`,`transformer_id`,`operator_config`,`config`,`unique_name`) VALUES(1,1,'Thing','事物','ADVANCED','EXTENSION','Base class for all schema types, all of which inherit the type either directly or indirectly','所有schema类型的基类,它们都直接或者间接继承该类型','1','TRUE','PUBLIC',44,'ONLINE','2023-09-01 00:00:00','2023-09-01 00:00:00',0,null,null,'Thing');
240239
INSERT INTO kg_ontology_entity (`id`,`original_id`,`name`,`name_zh`,`entity_category`,`layer`,`description`,`description_zh`,`status`,`with_index`,`scope`,`version`,`version_status`,`gmt_create`,`gmt_modified`,`transformer_id`,`operator_config`,`config`,`unique_name`) VALUES(2,2,'Text','文本','BASIC','CORE','文本','基本数据类型-文本','1','TRUE','PUBLIC',0,'ONLINE','2023-09-01 00:00:00','2023-09-01 00:00:00',0,null,'{"constrains":[{"id":"REQUIRE","name":"Required","nameZh":"值非空","value":null},{"id":"UNIQUE","name":"Unique","nameZh":"值唯一","value":null},{"id":"ENUM","name":"Enum","nameZh":"枚举","value":null},{"id":"MULTIVALUE","name":"Multi value","nameZh":"多值","value":null},{"id":"REGULAR","name":"Regular match","nameZh":"正则匹配","value":null}]}','Text');

dev/release/python/Dockerfile

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,10 @@ RUN if [ "${TARGETPLATFORM}" = "linux/amd64" ]; then \
4343
RUN python3 -m venv /openspg_venv && \
4444
. /openspg_venv/bin/activate && \
4545
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-$(dpkg --print-architecture) && \
46-
pip3 install openspg-kag==0.0.3.20241022.2 -i https://artifacts.antgroup-inc.cn/artifact/repositories/simple-dev/ && \
46+
pip3 install openspg-kag==0.5.1 && \
4747
pip3 install pemja==0.4.0 && \
4848
pip3 install -U "http://alps-common.oss-cn-hangzhou-zmf.aliyuncs.com/nscommon/shiji/nscommon-0.0.1.tar.gz" &&\
4949
echo "if (tty -s); then \n . /openspg_venv/bin/activate \nfi" >> ~/.bashrc
5050

51-
ADD openspg/dev/release/python/lib/builder*.jar /openspg_venv/lib/python3.8/site-packages/knext/builder/lib
52-
ADD openspg/dev/release/python/lib/reasoner*.jar /openspg_venv/lib/python3.8/site-packages/knext/reasoner/lib
53-
5451
COPY openspg/ /openspg
55-
#RUN git clone --depth=1 https://github.com/OpenSPG/openspg.git
52+
RUN git clone --depth=1 https://github.com/OpenSPG/KAG.git

dev/release/python/build-release-python-aliyun.sh

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
# or implied.
1111

1212
IMAGE="spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python"
13-
VERSION="kag1"
13+
VERSION="0.5"
1414
cd ../../../../
1515
docker build -f openspg/dev/release/python/Dockerfile --platform linux/arm64/v8 --push \
1616
-t ${IMAGE}:${VERSION}-arm64 \
@@ -31,4 +31,18 @@ docker manifest create \
3131
${IMAGE}:${VERSION}-amd64 \
3232
${IMAGE}:${VERSION}-arm64
3333

34-
docker manifest push ${IMAGE}:${VERSION}
34+
docker manifest push ${IMAGE}:${VERSION}
35+
36+
if docker manifest inspect ${IMAGE}:${LATEST} &> /dev/null; then
37+
echo "Manifest already exists, removing it..."
38+
docker manifest rm ${IMAGE}:${LATEST}
39+
else
40+
echo "Manifest does not exist, proceeding with creation and push."
41+
fi
42+
43+
docker manifest create \
44+
${IMAGE}:${LATEST} \
45+
${IMAGE}:${VERSION}-amd64 \
46+
${IMAGE}:${VERSION}-arm64
47+
48+
docker manifest push ${IMAGE}:${LATEST}

dev/release/python/build-release-python.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111

1212
# for amd64
1313
docker build -f Dockerfile --platform linux/amd64 --push \
14-
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:0.0.3 \
15-
-t spg-registry.cn-hangzhou.cr.aliyuncs.com/spg/openspg-python:latest \
16-
-t openspg/openspg-python:0.0.3 \
14+
-t openspg/openspg-python:0.5 \
1715
-t openspg/openspg-python:latest \
1816
.

0 commit comments

Comments
 (0)