Skip to content

Commit 626b0e0

Browse files
committed
Throw an error depending on the expiration date of TemporalConfigStorage
1 parent 432517d commit 626b0e0

File tree

5 files changed

+84
-5
lines changed

5 files changed

+84
-5
lines changed

digdag-spi/src/main/java/io/digdag/spi/Storage.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,14 @@ default Optional<DirectUploadHandle> getDirectUploadHandle(String key)
3535
{
3636
return Optional.absent();
3737
}
38+
39+
default Optional<Long> getDirectDownloadExpiration()
40+
{
41+
return Optional.absent();
42+
}
43+
44+
default Optional<Long> getDirectUploadExpiration()
45+
{
46+
return Optional.absent();
47+
}
3848
}

digdag-standards/src/main/java/io/digdag/standards/command/KubernetesCommandExecutor.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,8 @@ CommandStatus runOnKubernetes(final CommandContext context,
225225
nextStatus.set("cluster_name", FACTORY.textNode(client.getConfig().getName()));
226226
nextStatus.set("pod_name", FACTORY.textNode(pod.getName()));
227227
nextStatus.set("pod_creation_timestamp", FACTORY.numberNode(pod.getCreationTimestamp()));
228+
nextStatus.set("in_temporal_config_storage_expiration", FACTORY.numberNode(inConfigStorage.getDirectDownloadExpiration().get()));
229+
nextStatus.set("out_temporal_config_storage_expiration", FACTORY.numberNode(outConfigStorage.getDirectUploadExpiration().get()));
228230
nextStatus.set("io_directory", FACTORY.textNode(ioDirectoryPath.toString()));
229231
nextStatus.set("executor_state", FACTORY.objectNode());
230232
return createCommandStatus(pod, false, nextStatus);
@@ -256,6 +258,21 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context,
256258
log(logMessage, clog);
257259
nextExecutorState.set("log_offset", FACTORY.numberNode(offset + logMessage.length())); // update log_offset
258260
}
261+
else if(isLaunchingLongerThanInConfigStorageExpiration(previousStatusJson)) {
262+
// Throw error because launching pod time is longer than inTemporalConfigStorage expires.
263+
TaskRequest request = context.getTaskRequest();
264+
long attemptId = request.getAttemptId();
265+
long taskId = request.getTaskId();
266+
267+
final String message = s("Pod launch timeout: attempt=%d, task=%d", attemptId, taskId);
268+
logger.warn(message);
269+
270+
logger.info(s("Delete pod %d", pod.getName()));
271+
client.deletePod(pod.getName());
272+
273+
// Throw exception to stop the task as failure
274+
throw new TaskExecutionException(message);
275+
}
259276
else { // 'waiting'
260277
// Write pod status to the command logger to avoid users confusing. For example, the container
261278
// waits starting if it will take long time to download container images.
@@ -276,7 +293,7 @@ CommandStatus getCommandStatusFromKubernetes(final CommandContext context,
276293
final InputStream in = outConfigStorage.getContentInputStream(outputArchiveKey);
277294
ProjectArchives.extractTarArchive(context.getLocalProjectPath(), in); // runtime exception
278295
}
279-
else if (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson)) {
296+
else if (isRunningLongerThanOutConfigStorageExpiration(previousStatusJson) || (defaultPodTTL.isPresent() && isRunningLongerThanTTL(previousStatusJson))) {
280297
TaskRequest request = context.getTaskRequest();
281298
long attemptId = request.getAttemptId();
282299
long taskId = request.getTaskId();
@@ -307,6 +324,22 @@ protected List<String> setArgumentsAfterScriptCommandLine()
307324
return ImmutableList.of();
308325
}
309326

327+
private boolean isLaunchingLongerThanInConfigStorageExpiration(final ObjectNode previousStatusJson)
328+
{
329+
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
330+
long inTemporalConfigStorageExpiration = previousStatusJson.get("in_temporal_config_storage_expiration").asLong();
331+
long currentTimestamp = Instant.now().getEpochSecond();
332+
return currentTimestamp > creationTimestamp + inTemporalConfigStorageExpiration;
333+
}
334+
335+
private boolean isRunningLongerThanOutConfigStorageExpiration(final ObjectNode previousStatusJson)
336+
{
337+
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();
338+
long outTemporalConfigStorageExpiration = previousStatusJson.get("out_temporal_config_storage_expiration").asLong();
339+
long currentTimestamp = Instant.now().getEpochSecond();
340+
return currentTimestamp > creationTimestamp + outTemporalConfigStorageExpiration;
341+
}
342+
310343
private boolean isRunningLongerThanTTL(final ObjectNode previousStatusJson)
311344
{
312345
long creationTimestamp = previousStatusJson.get("pod_creation_timestamp").asLong();

digdag-standards/src/main/java/io/digdag/standards/command/kubernetes/TemporalConfigStorage.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.IOException;
1313
import java.io.InputStream;
1414
import java.nio.file.Path;
15+
import com.google.common.base.Optional;
1516

1617
public class TemporalConfigStorage
1718
{
@@ -75,4 +76,14 @@ public InputStream getContentInputStream(final String key)
7576
throw Throwables.propagate(e);
7677
}
7778
}
79+
80+
public Optional<Long> getDirectDownloadExpiration()
81+
{
82+
return storage.getDirectDownloadExpiration();
83+
}
84+
85+
public Optional<Long> getDirectUploadExpiration()
86+
{
87+
return storage.getDirectUploadExpiration();
88+
}
7889
}

digdag-storage-gcs/src/main/java/io/digdag/storage/gcs/GCSStorage.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public void list(String objectPrefix, FileListing callback)
144144
@Override
145145
public Optional<DirectDownloadHandle> getDirectDownloadHandle(String object)
146146
{
147-
final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60);
147+
final long secondsToExpire = getDirectDownloadExpiration().get();
148148

149149
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build();
150150
URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.GET), Storage.SignUrlOption.withV4Signature());
@@ -156,7 +156,7 @@ public Optional<DirectDownloadHandle> getDirectDownloadHandle(String object)
156156
@Override
157157
public Optional<DirectUploadHandle> getDirectUploadHandle(String object)
158158
{
159-
final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60);
159+
final long secondsToExpire = getDirectUploadExpiration().get();
160160

161161
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, object).build();
162162
URL signedUrl = this.storage.signUrl(blobInfo, secondsToExpire, TimeUnit.SECONDS, Storage.SignUrlOption.httpMethod(HttpMethod.PUT), Storage.SignUrlOption.withV4Signature());
@@ -165,6 +165,18 @@ public Optional<DirectUploadHandle> getDirectUploadHandle(String object)
165165
return Optional.of(DirectUploadHandle.of(url));
166166
}
167167

168+
@Override
169+
public Optional<Long> getDirectDownloadExpiration()
170+
{
171+
return Optional.of(config.get("direct_download_expiration", Long.class, 10L*60));
172+
}
173+
174+
@Override
175+
public Optional<Long> getDirectUploadExpiration()
176+
{
177+
return Optional.of(config.get("direct_upload_expiration", Long.class, 10L*60));
178+
}
179+
168180
private <T> T getWithRetry(String message, Callable<T> callable)
169181
throws StorageException
170182
{

digdag-storage-s3/src/main/java/io/digdag/storage/s3/S3Storage.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void list(String keyPrefix, FileListing callback)
200200
@Override
201201
public Optional<DirectDownloadHandle> getDirectDownloadHandle(String key)
202202
{
203-
final long secondsToExpire = config.get("direct_download_expiration", Long.class, 10L*60);
203+
final long secondsToExpire = getDirectDownloadExpiration().get();
204204

205205
GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key);
206206
req.setExpiration(Date.from(Instant.now().plusSeconds(secondsToExpire)));
@@ -213,7 +213,8 @@ public Optional<DirectDownloadHandle> getDirectDownloadHandle(String key)
213213
@Override
214214
public Optional<DirectUploadHandle> getDirectUploadHandle(String key)
215215
{
216-
final long secondsToExpire = config.get("direct_upload_expiration", Long.class, 10L*60);
216+
final long secondsToExpire = getDirectUploadExpiration().get();
217+
217218

218219
GeneratePresignedUrlRequest req = new GeneratePresignedUrlRequest(bucket, key);
219220
req.setMethod(HttpMethod.PUT);
@@ -224,6 +225,18 @@ public Optional<DirectUploadHandle> getDirectUploadHandle(String key)
224225
return Optional.of(DirectUploadHandle.of(url));
225226
}
226227

228+
@Override
229+
public Optional<Long> getDirectDownloadExpiration()
230+
{
231+
return Optional.of(config.get("direct_download_expiration", Long.class, 10L*60));
232+
}
233+
234+
@Override
235+
public Optional<Long> getDirectUploadExpiration()
236+
{
237+
return Optional.of(config.get("direct_upload_expiration", Long.class, 10L*60));
238+
}
239+
227240
private <T> T getWithRetry(String message, Callable<T> callable)
228241
throws StorageFileNotFoundException
229242
{

0 commit comments

Comments
 (0)