Skip to content

Commit e98452a

Browse files
[GOBBLIN-2231] Implement IcebergSource to enable copying Iceberg data files to any dest (#4146)
* Implement IcebergSource to enable copying Iceberg data files to any dest * Introduce partition-aware discovery & lookback period based copy for IcebergSource * Added unit tests for IcebergSource * Added unit tests for IcebergFileStreamHelper * Add FilePathWithPartition class in IcebergTable * Address review comments * Fix indentation * Handling for yyyy-MM-dd-00 partition format * Remove unused import * Address review comment * Fix core tests error * Fix failing test * Fix failing test
1 parent 93be76b commit e98452a

File tree

7 files changed

+2351
-1
lines changed

7 files changed

+2351
-1
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.data.management.copy.iceberg;
19+
20+
import java.io.IOException;
21+
import java.util.Iterator;
22+
23+
import org.apache.commons.lang3.NotImplementedException;
24+
25+
import lombok.extern.slf4j.Slf4j;
26+
27+
import org.apache.gobblin.configuration.WorkUnitState;
28+
import org.apache.gobblin.data.management.copy.FileAwareInputStream;
29+
import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
30+
31+
/**
32+
* Extractor for file streaming mode that creates FileAwareInputStream for each file.
33+
*
34+
* This extractor is used when {@code iceberg.record.processing.enabled=false} to stream
35+
* Iceberg table files as binary data to destinations like Azure, HDFS
36+
*
37+
* Each "record" is a {@link FileAwareInputStream} representing one file from
38+
* the Iceberg table. The downstream writer handles streaming the file content.
39+
*/
40+
@Slf4j
41+
public class IcebergFileStreamExtractor extends FileBasedExtractor<String, FileAwareInputStream> {
42+
43+
public IcebergFileStreamExtractor(WorkUnitState workUnitState) throws IOException {
44+
super(workUnitState, new IcebergFileStreamHelper(workUnitState));
45+
}
46+
47+
@Override
48+
public String getSchema() {
49+
// For file streaming, schema is not used by IdentityConverter; returning a constant
50+
return "FileAwareInputStream";
51+
}
52+
53+
@Override
54+
public Iterator<FileAwareInputStream> downloadFile(String filePath) throws IOException {
55+
throw new NotImplementedException("Not yet implemented");
56+
}
57+
58+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.gobblin.data.management.copy.iceberg;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.util.List;
23+
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.fs.FileSystem;
26+
import org.apache.hadoop.fs.Path;
27+
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
import org.apache.gobblin.configuration.ConfigurationKeys;
31+
import org.apache.gobblin.configuration.State;
32+
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
33+
import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
34+
35+
/**
36+
* File-based helper for Iceberg file streaming operations.
37+
*
38+
* This helper supports file streaming mode where Iceberg table files
39+
* are streamed as binary data without record-level processing.
40+
*/
41+
@Slf4j
42+
public class IcebergFileStreamHelper implements TimestampAwareFileBasedHelper {
43+
44+
private final State state;
45+
private final Configuration configuration;
46+
private FileSystem fileSystem;
47+
48+
public IcebergFileStreamHelper(State state) {
49+
this.state = state;
50+
this.configuration = new Configuration();
51+
52+
// Add any Hadoop configuration from job properties
53+
for (String key : state.getPropertyNames()) {
54+
if (key.startsWith("fs.") || key.startsWith("hadoop.")) {
55+
configuration.set(key, state.getProp(key));
56+
}
57+
}
58+
}
59+
60+
@Override
61+
public void connect() throws FileBasedHelperException {
62+
try {
63+
this.fileSystem = FileSystem.get(configuration);
64+
log.info("Connected to Iceberg file stream helper with FileSystem: {}", fileSystem.getClass().getSimpleName());
65+
} catch (IOException e) {
66+
throw new FileBasedHelperException("Failed to initialize FileSystem for Iceberg file streaming", e);
67+
}
68+
}
69+
70+
@Override
71+
public List<String> ls(String path) throws FileBasedHelperException {
72+
try {
73+
// For Iceberg, file discovery is handled by IcebergSource
74+
// This method returns files from work unit configuration
75+
List<String> filesToPull = state.getPropAsList(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
76+
log.debug("Returning {} files for processing", filesToPull.size());
77+
return filesToPull;
78+
} catch (Exception e) {
79+
throw new FileBasedHelperException("Failed to list files", e);
80+
}
81+
}
82+
83+
@Override
84+
public InputStream getFileStream(String filePath) throws FileBasedHelperException {
85+
try {
86+
Path path = new Path(filePath);
87+
FileSystem fs = getFileSystemForPath(path);
88+
return fs.open(path);
89+
} catch (IOException e) {
90+
throw new FileBasedHelperException("Failed to get file stream for: " + filePath, e);
91+
}
92+
}
93+
94+
@Override
95+
public long getFileSize(String filePath) throws FileBasedHelperException {
96+
try {
97+
Path path = new Path(filePath);
98+
FileSystem fs = getFileSystemForPath(path);
99+
return fs.getFileStatus(path).getLen();
100+
} catch (IOException e) {
101+
throw new FileBasedHelperException("Failed to get file size for: " + filePath, e);
102+
}
103+
}
104+
105+
@Override
106+
public long getFileMTime(String filePath) throws FileBasedHelperException {
107+
try {
108+
Path path = new Path(filePath);
109+
FileSystem fs = getFileSystemForPath(path);
110+
return fs.getFileStatus(path).getModificationTime();
111+
} catch (IOException e) {
112+
throw new FileBasedHelperException("Failed to get file modification time for: " + filePath, e);
113+
}
114+
}
115+
116+
private FileSystem getFileSystemForPath(Path path) throws IOException {
117+
// If path has a different scheme than the default FileSystem, get scheme-specific FS
118+
if (path.toUri().getScheme() != null &&
119+
!path.toUri().getScheme().equals(fileSystem.getUri().getScheme())) {
120+
return path.getFileSystem(configuration);
121+
}
122+
return fileSystem;
123+
}
124+
125+
@Override
126+
public void close() throws IOException {
127+
if (fileSystem != null) {
128+
try {
129+
fileSystem.close();
130+
log.info("Closed Iceberg file stream helper and FileSystem connection");
131+
} catch (IOException e) {
132+
log.warn("Error closing FileSystem connection", e);
133+
throw e;
134+
}
135+
} else {
136+
log.debug("Closing Iceberg file stream helper - no FileSystem to close");
137+
}
138+
}
139+
140+
}

0 commit comments

Comments
 (0)