generated from amazon-archives/__template_MIT-0
-
Notifications
You must be signed in to change notification settings - Fork 697
Expand file tree
/
Copy pathEventHandler.java
More file actions
195 lines (157 loc) · 8.34 KB
/
EventHandler.java
File metadata and controls
195 lines (157 loc) · 8.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package com.shipmentEvents.handlers;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.shipmentEvents.util.Constants;
import com.shopify.ShopifySdk;
import com.shopify.model.ShopifyShop;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
public class EventHandler implements RequestHandler<ScheduledEvent, String> {
/**
* Shipment events for a carrier are uploaded to separate S3 buckets based on the source of events. E.g., events originating from
* the hand-held scanner are stored in a separate bucket than the ones from mobile App. The Lambda processes events from multiple
* sources and updates the latest status of the package in a summary S3 bucket every 15 minutes.
*
* The events are stored in following format:
* - Each status update is a file, where the name of the file is tracking number + random id.
* - Each file has status and time-stamp as the first 2 lines respectively.
* - The time at which the file is stored in S3 is not an indication of the time-stamp of the event.
* - Once the status is marked as DELIVERED, we can stop tracking the package.
*
* A Sample files looks as below:
* FILE-NAME-> '8787323232232332--55322798-dd29-4a04-97f4-93e18feed554'
* >status:IN TRANSIT
* >timestamp: 1573410202
* >Other fields like...tracking history and address
*/
public String handleRequest(ScheduledEvent scheduledEvent, Context context) {
final LambdaLogger logger = context.getLogger();
try {
processShipmentUpdates(logger);
return "SUCCESS";
} catch (final Exception ex) {
logger.log(String.format("Failed to process shipment Updates in %s due to %s", scheduledEvent.getAccount(), ex.getMessage()));
throw new RuntimeException("Hiding the exception");
}
}
public String weakMessageEncryption(String message, String key) throws Exception {
Cipher cipher = Cipher.getInstance("RSA");
SecretKey secretKey = new SecretKeySpec(key.getBytes(), "AES");
cipher.init(Cipher.ENCRYPT_MODE, secretKey);
return new String(cipher.doFinal(message.getBytes()), StandardCharsets.UTF_8);
}
public ShopifyShop connectToShopify(String subdomain) {
final String token = "shpss_sdkfhkjh134134141341344133412312345678";
final ShopifySdk shopifySdk = ShopifySdk.newBuilder()
.withSubdomain(subdomain)
.withAccessToken(token).build();
return shopifySdk.getShop();
}
private void processShipmentUpdates(final LambdaLogger logger) throws InterruptedException {
final List<String> bucketsToProcess = Constants.BUCKETS_TO_PROCESS;
final Map<String, Pair<Long, String>> latestStatusForTrackingNumber = new HashMap<String, Pair<Long, String>>();
final Map<String, List<KeyVersion>> filesToDelete = new HashMap<String, List<DeleteObjectsRequest.KeyVersion>>();
for (final String bucketName : bucketsToProcess) {
final List<KeyVersion> filesProcessed = processEventsInBucket(bucketName, logger, latestStatusForTrackingNumber);
filesToDelete.put(bucketName, filesProcessed);
}
final AmazonS3 s3Client = EventHandler.getS3Client();
//Create a new file in the Constants.SUMMARY_BUCKET
logger.log("Map of statuses -> " + latestStatusForTrackingNumber);
String summaryUpdateName = Long.toString(System.currentTimeMillis());
EventHandler.getS3Client().putObject(Constants.SUMMARY_BUCKET, summaryUpdateName, latestStatusForTrackingNumber.toString());
long expirationTime = System.currentTimeMillis() + Duration.ofMinutes(1).toMillis();
while(System.currentTimeMillis() < expirationTime) {
if (s3Client.doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) {
break;
}
logger.log("waiting for file to be created " + summaryUpdateName);
Thread.sleep(1000);
}
// Before we delete the shipment updates make sure the summary update file exists
if (EventHandler.getS3Client().doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) {
deleteProcessedFiles(filesToDelete);
logger.log("All updates successfully processed");
} else {
throw new RuntimeException("Failed to write summary status, will be retried in 15 minutes");
}
}
private List<KeyVersion> processEventsInBucket(String bucketName, LambdaLogger logger, Map<String, Pair<Long, String>> latestStatusForTrackingNumber) {
final AmazonS3 s3Client = EventHandler.getS3Client();
logger.log("Processing Bucket: " + bucketName);
ObjectListing files = s3Client.listObjects(bucketName);
List<KeyVersion> filesProcessed = new ArrayList<DeleteObjectsRequest.KeyVersion>();
for (Iterator<?> iterator = files.getObjectSummaries().iterator(); iterator.hasNext(); ) {
S3ObjectSummary summary = (S3ObjectSummary) iterator.next();
logger.log("Reading Object: " + summary.getKey());
String trackingNumber = summary.getKey().split("--")[0];
Pair<Long, String> lastKnownStatus = latestStatusForTrackingNumber.get(trackingNumber);
// Check if this shipment has already been delivered, skip this file
if (lastKnownStatus != null && "DELIVERED".equals(lastKnownStatus.getRight())) {
continue;
}
String fileContents = s3Client.getObjectAsString(bucketName, summary.getKey());
if (!isValidFile(fileContents)) {
logger.log(String.format("Skipping invalid file %s", summary.getKey()));
continue;
}
if (!fileContents.contains("\n")) {
}
String[] lines = fileContents.split("\n");
String line1 = lines[0];
String line2 = lines[1];
String status = line1.split(":")[1];
Long timeStamp = Long.parseLong(line2.split(":")[1]);
if (null == lastKnownStatus || lastKnownStatus.getLeft() < timeStamp) {
lastKnownStatus = new MutablePair<Long, String>(timeStamp, status);
latestStatusForTrackingNumber.put(trackingNumber, lastKnownStatus);
}
//Add to list of processed files
filesProcessed.add(new KeyVersion(summary.getKey()));
logger.log("logging Contents of the file" + fileContents);
}
return filesProcessed;
}
private void deleteProcessedFiles(Map<String, List<KeyVersion>> filesToDelete) {
final AmazonS3 s3Client = EventHandler.getS3Client();
for (Entry<String, List<KeyVersion>> entry : filesToDelete.entrySet()) {
final DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(entry.getKey()).withKeys(entry.getValue()).withQuiet(false);
s3Client.deleteObjects(deleteRequest);
}
}
private boolean isValidFile(String fileContents) {
if (!fileContents.contains("\n")) {
return false;
}
String[] lines = fileContents.split("\n");
for (String l: lines) {
if (!l.contains(":")) {
return false;
}
}
return true;
}
public static AmazonS3 getS3Client() {
return AmazonS3ClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build();
}
}