Skip to content

Commit f421792

Browse files
committed
replace protobif with flatbuffers for serialization
1 parent 159e29c commit f421792

File tree

73 files changed

+2035
-858
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+2035
-858
lines changed

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@
1010
[submodule "cpp/FlameGraph"]
1111
path = cpp/FlameGraph
1212
url = git@github.com:brendangregg/FlameGraph.git
13+
[submodule "cpp/third-party/flatbuffers"]
14+
path = cpp/third-party/flatbuffers
15+
url = https://github.com/google/flatbuffers.git

cpp/CMakeLists.txt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
cmake_minimum_required(VERSION 3.19)
2-
set(CMAKE_CXX_STANDARD 14)
3-
# Set extension name here
2+
3+
set(CMAKE_CXX_STANDARD 20)
44
set(TARGET_NAME pixels)
55
set(DCMAKE_EXPORT_COMPILE_COMMANDS=1)
66
set(EXTENSION_NAME ${TARGET_NAME}_extension)
77
project(${TARGET_NAME})
8+
9+
add_definitions(-DDUCKDB_EXTENSION_LIBRARY)
10+
811
include_directories(include)
12+
include_directories(${CMAKE_CURRENT_BINARY_DIR})
913

1014
set(EXTENSION_SOURCES
1115
pixels-duckdb/pixels_extension.cpp
1216
pixels-duckdb/PixelsScanFunction.cpp
1317
)
14-
add_library(${EXTENSION_NAME} STATIC ${EXTENSION_SOURCES})
15-
16-
find_package(Protobuf REQUIRED)
17-
include_directories(${Protobuf_INCLUDE_DIRS})
18-
19-
include_directories(${CMAKE_CURRENT_BINARY_DIR})
2018

2119
add_subdirectory(pixels-common)
2220
add_subdirectory(pixels-core)
@@ -29,18 +27,18 @@ include_directories(pixels-core/include)
2927
include_directories(${CMAKE_CURRENT_BINARY_DIR})
3028
include_directories(${CMAKE_CURRENT_BINARY_DIR}/pixels-common/liburing/src/include)
3129

32-
target_link_libraries(
33-
${EXTENSION_NAME}
30+
build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES})
31+
set(PARAMETERS "-warnings")
32+
build_loadable_extension(${TARGET_NAME} ${PARAMETERS} ${EXTENSION_SOURCES})
33+
34+
target_link_libraries(${EXTENSION_NAME}
3435
pixels-common
3536
pixels-core
3637
)
3738

3839
# Add the subdirectory that contains the build_loadable_extension definition
39-
40-
set(PARAMETERS "-warnings")
41-
build_loadable_extension(${TARGET_NAME} ${PARAMETERS} ${EXTENSION_SOURCES})
42-
4340
message("duckdb export set: ${DUCKDB_EXPORT_SET}" )
41+
message("TARGET NAME: ${TARGET_NAME} EXTENSION NAME: ${EXTENSION_NAME}")
4442

4543
install(
4644
TARGETS ${EXTENSION_NAME} pixels-core pixels-common

cpp/Makefile

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: all clean debug release pull update deps
1+
.PHONY: all clean debug release pull update fb-release fb-debug
22

33
all: release
44

@@ -14,21 +14,22 @@ ifeq (${STATIC_LIBCPP}, 1)
1414
endif
1515

1616
ifeq ($(GEN),ninja)
17-
GENERATOR=-G "Ninja"
18-
FORCE_COLOR=-DFORCE_COLORED_OUTPUT=1
17+
GENERATOR=-G "Ninja"
18+
FORCE_COLOR=-DFORCE_COLORED_OUTPUT=1
1919
endif
2020

21-
PROTOBUF_DIR=third-party/protobuf
21+
# remove protobuf, use flatbuffer instead
2222
BUILD_FLAGS=-DEXTENSION_STATIC_BUILD=1 -DBUILD_TPCH_EXTENSION=1 -DBUILD_BENCHMARKS=1 -DBUILD_PARQUET_EXTENSION=1 \
23-
${OSX_BUILD_UNIVERSAL_FLAG} ${STATIC_LIBCPP}
2423

2524
CLIENT_FLAGS :=
25+
PIXELS_BASE_DIR := $(shell dirname $(shell pwd))
2626

27-
# These flags will make DuckDB build the extension
28-
29-
EXTENSION_FLAGS=-DDUCKDB_EXTENSION_NAMES="pixels" -DDUCKDB_EXTENSION_PIXELS_PATH="$(PROJ_DIR)" \
30-
-DDUCKDB_EXTENSION_PIXELS_SHOULD_LINK="TRUE" -DDUCKDB_EXTENSION_PIXELS_INCLUDE_PATH="$(PROJ_DIR)include" \
31-
-DCMAKE_PREFIX_PATH=$(PROJ_DIR)third-party/protobuf/cmake/build -DPIXELS_SRC="$(dirname $(pwd))"
27+
FB_FLAGS=-DUSE_FLATBUFFERS=ON \
28+
-DDUCKDB_EXTENSION_NAMES="pixels" \
29+
-DDUCKDB_EXTENSION_PIXELS_PATH="$(PROJ_DIR)" \
30+
-DDUCKDB_EXTENSION_PIXELS_SHOULD_LINK="TRUE" \
31+
-DDUCKDB_EXTENSION_PIXELS_INCLUDE_PATH="$(PROJ_DIR)include" \
32+
-DPIXELS_SRC="$(PIXELS_BASE_DIR)"
3233

3334
pull:
3435
git submodule init
@@ -37,24 +38,23 @@ pull:
3738
update:
3839
git submodule update --remote --merge pixels-duckdb/duckdb
3940
git -C third-party/googletest checkout v1.15.2
40-
git -C third-party/protobuf checkout v3.21.6
41-
42-
deps:
43-
mkdir -p "${PROTOBUF_DIR}/cmake/build" && cd "third-party/protobuf/cmake/build" && \
44-
cmake -Dprotobuf_BUILD_TESTS=OFF -DCMAKE_BUILD_TYPE=Release ../.. -DCMAKE_POSITION_INDEPENDENT_CODE=ON \
45-
-Dprotobuf_BUILD_SHARED_LIBS=ON -DCMAKE_INSTALL_PREFIX=./ && \
46-
make -j install
4741

4842
clean:
49-
rm -rf build/release
50-
rm -rf build/debug
43+
rm -rf build/fb-release
44+
rm -rf build/fb-debug
5145
cd pixels-duckdb/duckdb && make clean
5246

53-
# Main build
54-
debug: deps
55-
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/debug && \
56-
cmake --build build/debug --config Debug
47+
debug: fb-debug
48+
release: fb-release
49+
50+
fb-release:
51+
cmake $(GENERATOR) $(FORCE_COLOR) $(FB_FLAGS) ${CLIENT_FLAGS} \
52+
-DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} \
53+
-S pixels-duckdb/duckdb -B build/release && \
54+
cmake --build build/release --config Release
5755

58-
release: deps
59-
cmake $(GENERATOR) $(FORCE_COLOR) $(EXTENSION_FLAGS) ${CLIENT_FLAGS} -DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Release ${BUILD_FLAGS} -S pixels-duckdb/duckdb -B build/release && \
60-
cmake --build build/release --config Release
56+
fb-debug:
57+
cmake $(GENERATOR) $(FORCE_COLOR) $(FB_FLAGS) ${CLIENT_FLAGS} \
58+
-DEXTENSION_STATIC_BUILD=1 -DCMAKE_BUILD_TYPE=Debug ${BUILD_FLAGS} \
59+
-S pixels-duckdb/duckdb -B build/debug && \
60+
cmake --build build/debug --config Debug

cpp/include/PixelsScanFunction.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ namespace duckdb
113113
PixelsScanInitLocal(ExecutionContext &context, TableFunctionInitInput &input,
114114
GlobalTableFunctionState *gstate_p);
115115

116-
static bool PixelsParallelStateNext(ClientContext &context, const PixelsReadBindData &bind_data,
116+
static bool PixelsParallelStateNext(ClientContext &context, PixelsReadBindData &bind_data,
117117
PixelsReadLocalState &scan_data, PixelsReadGlobalState &parallel_state,
118118
bool is_init_state = false);
119119

cpp/pixels-cli/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
project(pixels-cli)
22

3-
set(CMAKE_CXX_STANDARD 17)
3+
set(CMAKE_CXX_STANDARD 20)
44

55
include(ExternalProject)
66
include(ProcessorCount)

cpp/pixels-cli/include/executor/LoadExecutor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@ class LoadExecutor : public CommandExecutor
3636

3737
private:
3838
bool startConsumers(const std::vector <std::string> &inputFiles, Parameters parameters,
39-
const std::vector <std::string> &loadedFiles);
39+
const std::vector <std::string> &loadedFiles, int concurrency);
4040
};
4141
#endif //PIXELS_LOADEXECUTOR_H

cpp/pixels-cli/include/load/PixelsConsumer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include <vector>
2929
#include <string>
30+
#include <mutex>
3031
#include <load/Parameters.h>
3132

3233
class PixelsConsumer
@@ -39,6 +40,7 @@ class PixelsConsumer
3940

4041
private:
4142
static int GlobalTargetPathId;
43+
static std::mutex globalMutex; // Mutex to protect GlobalTargetPathId
4244
std::vector <std::string> queue;
4345
Parameters parameters;
4446
std::vector <std::string> loadedFiles;

cpp/pixels-cli/lib/executor/LoadExecutor.cpp

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,35 @@
2424
*/
2525
#include <executor/LoadExecutor.h>
2626
#include <iostream>
27+
#include <fstream>
28+
#include <filesystem>
2729
#include <encoding/EncodingLevel.h>
2830
#include <physical/storage/LocalFS.h>
2931
#include <load/Parameters.h>
3032
#include <chrono>
33+
#include <thread>
3134
#include <load/PixelsConsumer.h>
3235

3336
void LoadExecutor::execute(const bpo::variables_map &ns, const std::string &command)
3437
{
3538
std::string schema = ns["schema"].as<std::string>();
39+
if (std::filesystem::exists(schema) && std::filesystem::is_regular_file(schema))
40+
{
41+
std::ifstream ifs(schema);
42+
if (ifs.is_open())
43+
{
44+
std::stringstream buffer;
45+
buffer << ifs.rdbuf();
46+
schema = buffer.str();
47+
}
48+
}
3649
std::string origin = ns["origin"].as<std::string>();
3750
std::string target = ns["target"].as<std::string>();
3851
int rowNum = ns["row_num"].as<int>();
3952
std::string regex = ns["row_regex"].as<std::string>();
4053
EncodingLevel encodingLevel = EncodingLevel::from(ns["encoding_level"].as<int>());
4154
bool nullPadding = ns["nulls_padding"].as<bool>();
55+
int concurrency = ns["concurrency"].as<int>();
4256

4357
if (origin.back() != '/')
4458
{
@@ -55,7 +69,7 @@ void LoadExecutor::execute(const bpo::variables_map &ns, const std::string &comm
5569
}
5670

5771
auto startTime = std::chrono::system_clock::now();
58-
if (startConsumers(inputFiles, parameters, loadedFiles))
72+
if (startConsumers(inputFiles, parameters, loadedFiles, concurrency))
5973
{
6074
std::cout << command << " is successful" << std::endl;
6175
}
@@ -65,14 +79,47 @@ void LoadExecutor::execute(const bpo::variables_map &ns, const std::string &comm
6579
}
6680
auto endTime = std::chrono::system_clock::now();
6781
std::chrono::duration<double> elapsedSeconds = endTime - startTime;
68-
std::cout << "Text file in " << origin << " are loaded by 1 thread in "
82+
std::cout << "Text file in " << origin << " are loaded by " << concurrency << " thread(s) in "
6983
<< elapsedSeconds.count() << " seconds." << std::endl;
7084
}
7185

7286
bool LoadExecutor::startConsumers(const std::vector <std::string> &inputFiles, Parameters parameters,
73-
const std::vector <std::string> &loadedFiles)
87+
const std::vector <std::string> &loadedFiles, int concurrency)
7488
{
75-
PixelsConsumer consumer(inputFiles, parameters, loadedFiles);
76-
consumer.run();
89+
if (concurrency <= 1 || inputFiles.size() <= 1)
90+
{
91+
// Single-threaded mode
92+
PixelsConsumer consumer(inputFiles, parameters, loadedFiles);
93+
consumer.run();
94+
}
95+
else
96+
{
97+
// Multi-threaded mode: each thread processes one file
98+
std::vector<std::thread> threads;
99+
int numThreads = std::min(concurrency, static_cast<int>(inputFiles.size()));
100+
std::vector<std::vector<std::string>> inputfilesQueue(numThreads);
101+
int currentThread=0;
102+
for (int i = 0; i < inputFiles.size(); ++i)
103+
{
104+
inputfilesQueue[(currentThread++)%numThreads].push_back(inputFiles[i]);
105+
}
106+
// Each thread gets one file queue to process
107+
for (int i=0;i<numThreads;i++)
108+
{
109+
auto queue=inputfilesQueue[i];
110+
threads.emplace_back([queue, parameters, loadedFiles]() {
111+
PixelsConsumer consumer(queue, parameters, loadedFiles);
112+
consumer.run();
113+
});
114+
}
115+
// Wait for all threads to complete
116+
for (auto &thread : threads)
117+
{
118+
if (thread.joinable())
119+
{
120+
thread.join();
121+
}
122+
}
123+
}
77124
return true;
78-
}
125+
}

cpp/pixels-cli/lib/load/PixelsConsumer.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@
3535
#include <fstream>
3636
#include <sstream>
3737
#include <chrono>
38+
#include <mutex>
3839

3940
int PixelsConsumer::GlobalTargetPathId = 0;
41+
std::mutex PixelsConsumer::globalMutex;
4042

4143
PixelsConsumer::PixelsConsumer(const std::vector <std::string> &queue, const Parameters &parameters,
4244
const std::vector <std::string> &loadedFiles)
@@ -106,8 +108,13 @@ void PixelsConsumer::run()
106108
if (initPixelsFile)
107109
{
108110
LocalFS targetStorage;
111+
int fileId;
112+
{
113+
std::lock_guard<std::mutex> lock(globalMutex);
114+
fileId = GlobalTargetPathId++;
115+
}
109116
targetFileName = std::to_string(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())) + \
110-
"_" + std::to_string(this->loadedFiles.size()) + ".pxl";
117+
"_" + std::to_string(fileId) + ".pxl";
111118
targetFilePath = targetPath + targetFileName;
112119
pixelsWriter = std::make_shared<PixelsWriterImpl>(schema, pixelsStride, rowGroupSize,
113120
targetFilePath, blockSize,
@@ -137,10 +144,7 @@ void PixelsConsumer::run()
137144

138145
if (rowBatch->rowCount == rowBatch->getMaxSize())
139146
{
140-
std::cout << "writing row group to file: " << targetFilePath << " rowCount:" << rowBatch->rowCount
141-
<< std::endl;
142147
pixelsWriter->addRowBatch(rowBatch);
143-
144148
rowBatch->reset();
145149
}
146150

@@ -173,4 +177,4 @@ void PixelsConsumer::run()
173177
this->loadedFiles.push_back(targetFilePath);
174178
}
175179
std::cout << "Exit PixelsConsumer" << std::endl;
176-
}
180+
}

cpp/pixels-cli/main.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ int main()
110110
("encoding_level,e", bpo::value<int>()->default_value(2),
111111
"specify the encoding level for data loading")
112112
("nulls_padding,p", bpo::value<bool>()->default_value(false),
113-
"specify whether nulls padding is enabled");
113+
"specify whether nulls padding is enabled")
114+
("concurrency,c", bpo::value<int>()->default_value(1),
115+
"specify the number of threads for data loading");
114116

115117
bpo::variables_map vm;
116118
try
@@ -127,10 +129,8 @@ int main()
127129
{
128130
std::cerr << "Error parsing options: " << e.what() << "\n";
129131
}
130-
// try {
131-
LoadExecutor *loadExecutor = new LoadExecutor();
132+
std::unique_ptr<LoadExecutor> loadExecutor = std::make_unique<LoadExecutor>();
132133
loadExecutor->execute(vm, command);
133-
// } catch
134134
}
135135
else if (command == "QUERY")
136136
{
@@ -160,6 +160,7 @@ int main()
160160
{
161161
std::cout << "Command " << command << " not found" << std::endl;
162162
}
163+
for (char* p : argv) free(p);
163164
} // end of while loop
164165
return 0;
165-
}
166+
}

0 commit comments

Comments
 (0)