Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 104 additions & 60 deletions src/main/java/io/kestra/plugin/aws/s3/Copy.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package io.kestra.plugin.aws.s3;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -8,23 +9,29 @@
import io.kestra.plugin.aws.AbstractConnection;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import io.kestra.plugin.aws.s3.models.S3ServerSideEncryption;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Plugin(
examples = {
@Example(
full = true,
code = """
examples = {
@Example(
full = true,
code = """
id: aws_s3_copy
namespace: company.team

Expand All @@ -41,114 +48,149 @@
bucket: "my-bucket2"
key: "path/to/file2"
"""
)
}
)
}
)
@Schema(
title = "Copy a file between S3 buckets."
title = "Copy a file between S3 buckets."
)
public class Copy extends AbstractConnection implements AbstractS3, RunnableTask<Copy.Output> {

@Schema(
title = "The source bucket and key."
title = "The source bucket and key."
)
@PluginProperty
private CopyObjectFrom from;

@Schema(
title = "The destination bucket and key."
title = "The destination bucket and key."
)
@PluginProperty
private CopyObject to;

@Schema(
title = "Whether to delete the source file after download."
title = "Whether to delete the source file after download."
)
@Builder.Default
private Property<Boolean> delete = Property.ofValue(false);

@Override
public Output run(RunContext runContext) throws Exception {
try (S3Client client = this.client(runContext)) {
CopyObjectRequest.Builder builder = CopyObjectRequest.builder()
.sourceBucket(runContext.render(this.from.bucket).as(String.class).orElseThrow())
.sourceKey(runContext.render(this.from.key).as(String.class).orElseThrow())
.destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket).as(String.class).orElseThrow())
.destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key).as(String.class).orElseThrow());

try (
S3AsyncClient s3AsyncClient = this.asyncClient(runContext);
S3TransferManager transferManager = S3TransferManager.builder()
.s3Client(s3AsyncClient)
.build()) {

CopyObjectRequest.Builder copyObjectBuilder = CopyObjectRequest.builder()
.sourceBucket(runContext.render(this.from.bucket).as(String.class).orElseThrow())
.sourceKey(runContext.render(this.from.key).as(String.class).orElseThrow())
.destinationBucket(
runContext.render(
this.to.bucket != null ? this.to.bucket : this.from.bucket
).as(String.class).orElseThrow()
)
.destinationKey(
runContext.render(
this.to.key != null ? this.to.key : this.from.key
).as(String.class).orElseThrow()
);

// Optional version ID
if (this.from.versionId != null) {
builder.sourceVersionId(runContext.render(this.from.versionId).as(String.class).orElseThrow());
copyObjectBuilder.sourceVersionId(
runContext.render(this.from.versionId).as(String.class).orElseThrow()
);
}

// Server-side encryption
if (this.to != null && this.to.serverSideEncryption != null) {
S3ServerSideEncryption rSse = runContext
.render(this.to.serverSideEncryption)
.as(S3ServerSideEncryption.class)
.orElse(null);

if (rSse != null && rSse != S3ServerSideEncryption.NONE) {
builder.serverSideEncryption(
software.amazon.awssdk.services.s3.model.ServerSideEncryption.valueOf(rSse.name())
S3ServerSideEncryption sse = runContext
.render(this.to.serverSideEncryption)
.as(S3ServerSideEncryption.class)
.orElse(null);

if (sse != null && sse != S3ServerSideEncryption.NONE) {
copyObjectBuilder.serverSideEncryption(
software.amazon.awssdk.services.s3.model.ServerSideEncryption.valueOf(sse.name())
);

// If using AWS_KMS encryption, set the KMS key ID
if (sse == S3ServerSideEncryption.AWS_KMS && this.to.kmsKeyId != null) {
copyObjectBuilder.ssekmsKeyId(
runContext.render(this.to.kmsKeyId).as(String.class).orElseThrow()
);
}
}
}

CopyObjectRequest copyObjectRequest = copyObjectBuilder.build();

// TransferManager copy (parallel & multipart aware)
CopyRequest copyRequest = CopyRequest.builder()
.copyObjectRequest(copyObjectRequest)
.build();

CopyObjectRequest request = builder.build();
CopyObjectResponse response = client.copyObject(request);
CompletedCopy completedCopy = transferManager
.copy(copyRequest)
.completionFuture()
.join();

if (runContext.render(this.delete).as(Boolean.class).orElseThrow()) {
// Optional delete source
if (runContext.render(this.delete).as(Boolean.class).orElse(false)) {
Delete.builder()
.id(this.id)
.type(Delete.class.getName())
.region(this.region)
.endpointOverride(this.endpointOverride)
.accessKeyId(this.accessKeyId)
.secretKeyId(this.secretKeyId)
.sessionToken(this.sessionToken)
.stsRoleSessionName(this.stsRoleSessionName)
.stsRoleExternalId(this.stsRoleExternalId)
.stsRoleSessionDuration(this.stsRoleSessionDuration)
.stsRoleArn(this.stsRoleArn)
.stsEndpointOverride(this.stsEndpointOverride)
.bucket(Property.ofValue(request.sourceBucket()))
.key(Property.ofValue(request.sourceKey()))
.build()
.run(runContext);
.id(this.id)
.type(Delete.class.getName())
.region(this.region)
.endpointOverride(this.endpointOverride)
.accessKeyId(this.accessKeyId)
.secretKeyId(this.secretKeyId)
.sessionToken(this.sessionToken)
.stsRoleArn(this.stsRoleArn)
.stsRoleExternalId(this.stsRoleExternalId)
.stsRoleSessionName(this.stsRoleSessionName)
.stsRoleSessionDuration(this.stsRoleSessionDuration)
.stsEndpointOverride(this.stsEndpointOverride)
.bucket(Property.ofValue(copyObjectRequest.sourceBucket()))
.key(Property.ofValue(copyObjectRequest.sourceKey()))
.build()
.run(runContext);
}

return Output
.builder()
.bucket(request.destinationBucket())
.key(request.destinationKey())
.eTag(response.copyObjectResult().eTag())
.build();
return Output.builder()
.bucket(copyObjectRequest.destinationBucket())
.key(copyObjectRequest.destinationKey())
.eTag(completedCopy.response().copyObjectResult().eTag())
.build();
}
}

@SuperBuilder(toBuilder = true)
@Getter
@NoArgsConstructor
public static class CopyObject {

@Schema(
title = "The bucket name"
title = "The bucket name"
)
@NotNull
Property<String> bucket;

@Schema(
title = "The bucket key"
title = "The bucket key"
)
@NotNull
Property<String> key;

@Schema(
title = "Server side encryption to apply to the target object.",
description = "Example: AES256 or AWS_KMS"
title = "Server side encryption to apply to the target object.",
description = "Example: AES256 or AWS_KMS"
)
private Property<S3ServerSideEncryption> serverSideEncryption;

@Schema(
title = "KMS Key ARN or Key ID to use when server side encryption is AWS_KMS"
title = "KMS Key ARN or Key ID to use when server side encryption is AWS_KMS"
)
private Property<String> kmsKeyId;
}
Expand All @@ -157,8 +199,9 @@ public static class CopyObject {
@Getter
@NoArgsConstructor
public static class CopyObjectFrom extends CopyObject {

@Schema(
title = "The specific version of the object."
title = "The specific version of the object."
)
private Property<String> versionId;
}
Expand All @@ -167,6 +210,7 @@ public static class CopyObjectFrom extends CopyObject {
@Getter
@NoArgsConstructor
public static class Output extends ObjectOutput implements io.kestra.core.models.tasks.Output {

private String bucket;
private String key;
}
Expand Down