2020import java .io .IOException ;
2121import java .util .ArrayList ;
2222import java .util .Collection ;
23+ import java .util .Collections ;
2324import java .util .List ;
24- import java .util .Map ;
2525import java .util .Optional ;
2626import java .util .Properties ;
2727import java .util .UUID ;
28- import java .util .concurrent .ConcurrentHashMap ;
2928import java .util .function .Function ;
3029import java .util .function .Predicate ;
31- import java .util .stream .Collectors ;
3230
3331import org .apache .commons .collections .CollectionUtils ;
3432import org .apache .commons .lang3 .StringUtils ;
4240import org .apache .iceberg .StructLike ;
4341import org .apache .iceberg .TableMetadata ;
4442import org .apache .iceberg .TableProperties ;
45- import org .apache .iceberg .util .SerializationUtil ;
4643
47- import com .google .common .collect .Lists ;
4844import com .google .common .collect .Maps ;
4945import com .google .common .collect .ImmutableList ;
5046import com .google .common .base .Preconditions ;
@@ -101,48 +97,16 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
10197 // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code duplication
10298 // Differences are getting data files, copying ancestor permission and adding post publish steps
10399 String fileSet = this .getFileSetId ();
104- List <CopyEntity > copyEntities = Lists .newArrayList ();
105100 IcebergTable srcIcebergTable = getSrcIcebergTable ();
106101 List <DataFile > srcDataFiles = srcIcebergTable .getPartitionSpecificDataFiles (this .partitionFilterPredicate );
107- Map <Path , DataFile > destDataFileBySrcPath = calcDestDataFileBySrcPath (srcDataFiles );
108- Configuration defaultHadoopConfiguration = new Configuration ();
109-
110- for (Map .Entry <Path , FileStatus > entry : calcSrcFileStatusByDestFilePath (destDataFileBySrcPath ).entrySet ()) {
111- Path destPath = entry .getKey ();
112- FileStatus srcFileStatus = entry .getValue ();
113- // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
114- FileSystem actualSourceFs = getSourceFileSystemFromFileStatus (srcFileStatus , defaultHadoopConfiguration );
115-
116- CopyableFile fileEntity = CopyableFile .fromOriginAndDestination (
117- actualSourceFs , srcFileStatus , targetFs .makeQualified (destPath ), copyConfig )
118- .fileSet (fileSet )
119- .datasetOutputPath (targetFs .getUri ().getPath ())
120- .build ();
121-
122- fileEntity .setSourceData (getSourceDataset (this .sourceFs ));
123- fileEntity .setDestinationData (getDestinationDataset (targetFs ));
124- copyEntities .add (fileEntity );
125- }
126-
127- // Adding this check to avoid adding post publish step when there are no files to copy.
128- List <DataFile > destDataFiles = new ArrayList <>(destDataFileBySrcPath .values ());
129- if (CollectionUtils .isNotEmpty (destDataFiles )) {
130- copyEntities .add (createOverwritePostPublishStep (destDataFiles ));
131- }
132-
133- log .info ("~{}~ generated {} copy entities" , fileSet , copyEntities .size ());
134- return copyEntities ;
135- }
136102
137- private Map <Path , DataFile > calcDestDataFileBySrcPath (List <DataFile > srcDataFiles )
138- throws IcebergTable .TableNotFoundException {
139- String fileSet = this .getFileSetId ();
140- Map <Path , DataFile > destDataFileBySrcPath = new ConcurrentHashMap <>(srcDataFiles .size ());
141103 if (srcDataFiles .isEmpty ()) {
142104 log .warn ("~{}~ found no data files for partition col : {} with partition value : {} to copy" , fileSet ,
143105 this .partitionColumnName , this .partitionColValue );
144- return destDataFileBySrcPath ;
106+ return new ArrayList <>( 0 ) ;
145107 }
108+
109+ // get source & destination write data locations to update data file paths
146110 TableMetadata srcTableMetadata = getSrcIcebergTable ().accessTableMetadata ();
147111 TableMetadata destTableMetadata = getDestIcebergTable ().accessTableMetadata ();
148112 PartitionSpec partitionSpec = destTableMetadata .spec ();
@@ -160,17 +124,58 @@ private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile> srcDataFile
160124 destWriteDataLocation
161125 );
162126 }
163- srcDataFiles .forEach (dataFile -> {
127+
128+ List <CopyEntity > copyEntities = getIcebergParitionCopyEntities (targetFs , srcDataFiles , srcWriteDataLocation , destWriteDataLocation , partitionSpec , copyConfig );
129+ // Adding this check to avoid adding post publish step when there are no files to copy.
130+ if (CollectionUtils .isNotEmpty (copyEntities )) {
131+ copyEntities .add (createOverwritePostPublishStep ());
132+ }
133+
134+ log .info ("~{}~ generated {} copy entities" , fileSet , copyEntities .size ());
135+ return copyEntities ;
136+ }
137+
138+ private List <CopyEntity > getIcebergParitionCopyEntities (
139+ FileSystem targetFs ,
140+ List <DataFile > srcDataFiles ,
141+ String srcWriteDataLocation ,
142+ String destWriteDataLocation ,
143+ PartitionSpec partitionSpec ,
144+ CopyConfiguration copyConfig ) {
145+ String fileSet = this .getFileSetId ();
146+ Configuration defaultHadoopConfiguration = new Configuration ();
147+ List <CopyEntity > copyEntities = Collections .synchronizedList (new ArrayList <>(srcDataFiles .size ()));
148+ Function <Path , FileStatus > getFileStatus = CheckedExceptionFunction .wrapToTunneled (this .sourceFs ::getFileStatus );
149+
150+ srcDataFiles .parallelStream ().forEach (dataFile -> {
151+ // create destination data file from source data file by replacing the source path with destination path
164152 String srcFilePath = dataFile .path ().toString ();
165153 Path updatedDestFilePath = relocateDestPath (srcFilePath , srcWriteDataLocation , destWriteDataLocation );
166154 log .debug ("~{}~ Path changed from Src : {} to Dest : {}" , fileSet , srcFilePath , updatedDestFilePath );
167- destDataFileBySrcPath . put ( new Path ( srcFilePath ), DataFiles .builder (partitionSpec )
155+ DataFile destDataFile = DataFiles .builder (partitionSpec )
168156 .copy (dataFile )
169157 .withPath (updatedDestFilePath .toString ())
170- .build ());
158+ .build ();
159+
160+ // get file status of source file
161+ FileStatus srcFileStatus = getFileStatus .apply (new Path (srcFilePath ));
162+ try {
163+ // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
164+ FileSystem actualSourceFs = getSourceFileSystemFromFileStatus (srcFileStatus , defaultHadoopConfiguration );
165+ // create copyable file entity
166+ CopyableFile fileEntity = CopyableFile .fromOriginAndDestination (actualSourceFs , srcFileStatus ,
167+ targetFs .makeQualified (updatedDestFilePath ), copyConfig ).fileSet (fileSet )
168+ .datasetOutputPath (targetFs .getUri ().getPath ()).build ();
169+ fileEntity .setSourceData (getSourceDataset (this .sourceFs ));
170+ fileEntity .setDestinationData (getDestinationDataset (targetFs ));
171+ // add corresponding data file to each copyable iceberg partition file
172+ IcebergPartitionCopyableFile icebergPartitionCopyableFile = new IcebergPartitionCopyableFile (fileEntity , destDataFile );
173+ copyEntities .add (icebergPartitionCopyableFile );
174+ } catch (IOException e ) {
175+ throw new RuntimeException (e );
176+ }
171177 });
172- log .info ("~{}~ created {} destination data files" , fileSet , destDataFileBySrcPath .size ());
173- return destDataFileBySrcPath ;
178+ return copyEntities ;
174179 }
175180
176181 private Path relocateDestPath (String curPathStr , String prefixToBeReplaced , String prefixToReplaceWith ) {
@@ -186,43 +191,17 @@ private Path addUUIDToPath(String filePathStr) {
186191 return new Path (fileDir , newFileName );
187192 }
188193
189- private Map <Path , FileStatus > calcSrcFileStatusByDestFilePath (Map <Path , DataFile > destDataFileBySrcPath )
190- throws IOException {
191- Function <Path , FileStatus > getFileStatus = CheckedExceptionFunction .wrapToTunneled (this .sourceFs ::getFileStatus );
192- Map <Path , FileStatus > srcFileStatusByDestFilePath = new ConcurrentHashMap <>();
193- try {
194- srcFileStatusByDestFilePath = destDataFileBySrcPath .entrySet ()
195- .parallelStream ()
196- .collect (Collectors .toConcurrentMap (entry -> new Path (entry .getValue ().path ().toString ()),
197- entry -> getFileStatus .apply (entry .getKey ())));
198- } catch (CheckedExceptionFunction .WrappedIOException wrapper ) {
199- wrapper .rethrowWrapped ();
200- }
201- return srcFileStatusByDestFilePath ;
202- }
203-
204- private PostPublishStep createOverwritePostPublishStep (List <DataFile > destDataFiles ) {
205- List <String > serializedDataFiles = getBase64EncodedDataFiles (destDataFiles );
206-
194+ private PostPublishStep createOverwritePostPublishStep () {
207195 IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new IcebergOverwritePartitionsStep (
208196 this .getDestIcebergTable ().getTableId ().toString (),
209197 this .partitionColumnName ,
210198 this .partitionColValue ,
211- serializedDataFiles ,
212199 this .properties
213200 );
214201
215202 return new PostPublishStep (this .getFileSetId (), Maps .newHashMap (), icebergOverwritePartitionStep , 0 );
216203 }
217204
218- private List <String > getBase64EncodedDataFiles (List <DataFile > destDataFiles ) {
219- List <String > base64EncodedDataFiles = new ArrayList <>(destDataFiles .size ());
220- for (DataFile dataFile : destDataFiles ) {
221- base64EncodedDataFiles .add (SerializationUtil .serializeToBase64 (dataFile ));
222- }
223- return base64EncodedDataFiles ;
224- }
225-
226205 private Predicate <StructLike > createPartitionFilterPredicate () throws IOException {
227206 //TODO: Refactor it later using factory or other way to support different types of filter predicate
228207 // Also take into consideration creation of Expression Filter to be used in overwrite api
0 commit comments