Skip to content

Commit 019653e

Browse files
authored
Create Kinesis event with all fields (#19)
1 parent 9d4b8f3 commit 019653e

File tree

4 files changed

+123
-21
lines changed

4 files changed

+123
-21
lines changed

src/main/java/cloud/localstack/LambdaExecutor.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package cloud.localstack;
22

33
import cloud.localstack.lambda.DDBEventParser;
4+
import cloud.localstack.lambda.KinesisEventParser;
45
import cloud.localstack.lambda.S3EventParser;
56
import com.amazonaws.services.lambda.runtime.Context;
67
import com.amazonaws.services.lambda.runtime.RequestHandler;
78
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
8-
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
9-
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
10-
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.Record;
119
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
1210
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
1311
import com.amazonaws.util.StringInputStream;
@@ -23,13 +21,10 @@
2321
import java.lang.reflect.InvocationTargetException;
2422
import java.lang.reflect.ParameterizedType;
2523
import java.lang.reflect.Type;
26-
import java.nio.ByteBuffer;
2724
import java.nio.charset.StandardCharsets;
2825
import java.nio.file.Files;
2926
import java.nio.file.Paths;
3027
import java.util.Arrays;
31-
import java.util.Base64;
32-
import java.util.Date;
3328
import java.util.LinkedList;
3429
import java.util.List;
3530
import java.util.Map;
@@ -69,21 +64,7 @@ public static void main(String[] args) throws Exception {
6964
}
7065
} else {
7166
if (records.stream().anyMatch(record -> record.containsKey("kinesis") || record.containsKey("Kinesis"))) {
72-
KinesisEvent kinesisEvent = new KinesisEvent();
73-
inputObject = kinesisEvent;
74-
kinesisEvent.setRecords(new LinkedList<>());
75-
for (Map<String, Object> record : records) {
76-
KinesisEventRecord r = new KinesisEventRecord();
77-
kinesisEvent.getRecords().add(r);
78-
Record kinesisRecord = new Record();
79-
Map<String, Object> kinesis = (Map<String, Object>) get(record, "Kinesis");
80-
String dataString = new String(get(kinesis, "Data").toString().getBytes());
81-
byte[] decodedData = Base64.getDecoder().decode(dataString);
82-
kinesisRecord.setData(ByteBuffer.wrap(decodedData));
83-
kinesisRecord.setPartitionKey((String) get(kinesis, "PartitionKey"));
84-
kinesisRecord.setApproximateArrivalTimestamp(new Date());
85-
r.setKinesis(kinesisRecord);
86-
}
67+
inputObject = KinesisEventParser.parse(records);
8768
} else if (records.stream().anyMatch(record -> record.containsKey("Sns"))) {
8869
SNSEvent snsEvent = new SNSEvent();
8970
inputObject = snsEvent;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package cloud.localstack.lambda;
2+
3+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
4+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
5+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.Record;
6+
7+
import java.nio.ByteBuffer;
8+
import java.util.Base64;
9+
import java.util.Date;
10+
import java.util.LinkedList;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
import static cloud.localstack.LambdaExecutor.get;
15+
16+
public class KinesisEventParser {
17+
18+
public static KinesisEvent parse(List<Map<String, Object>> records) {
19+
KinesisEvent kinesisEvent = new KinesisEvent();
20+
kinesisEvent.setRecords(new LinkedList<>());
21+
for (Map<String, Object> record : records) {
22+
KinesisEventRecord r = new KinesisEventRecord();
23+
kinesisEvent.getRecords().add(r);
24+
25+
r.setEventSourceARN((String) get(record, "eventSourceARN"));
26+
r.setEventSource((String) get(record, "eventSource"));
27+
r.setEventName((String) get(record, "eventName"));
28+
r.setEventVersion((String) get(record, "eventVersion"));
29+
r.setEventID((String) get(record, "eventID"));
30+
r.setAwsRegion((String) get(record, "awsRegion"));
31+
r.setInvokeIdentityArn((String) get(record, "invokeIdentityArn"));
32+
33+
//Kinesis
34+
Map<String, Object> kinesis = (Map<String, Object>) get(record, "Kinesis");
35+
String dataString = new String(get(kinesis, "Data").toString().getBytes());
36+
byte[] decodedData = Base64.getDecoder().decode(dataString);
37+
Record kinesisRecord = new Record();
38+
kinesisRecord.setData(ByteBuffer.wrap(decodedData));
39+
kinesisRecord.setPartitionKey((String) get(kinesis, "PartitionKey"));
40+
kinesisRecord.setSequenceNumber((String) get(kinesis, "SequenceNumber"));
41+
kinesisRecord.setKinesisSchemaVersion((String) get(kinesis, "KinesisSchemaVersion"));
42+
kinesisRecord.setApproximateArrivalTimestamp(new Date());
43+
r.setKinesis(kinesisRecord);
44+
}
45+
46+
return kinesisEvent;
47+
}
48+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package cloud.localstack;
2+
3+
import cloud.localstack.lambda.KinesisEventParser;
4+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
5+
import com.fasterxml.jackson.databind.DeserializationFeature;
6+
import com.fasterxml.jackson.databind.MapperFeature;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import org.junit.Test;
9+
10+
import java.nio.ByteBuffer;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
import static cloud.localstack.LambdaExecutor.get;
15+
import static cloud.localstack.LambdaExecutor.readFile;
16+
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertNotNull;
18+
19+
@SuppressWarnings("unchecked")
20+
public class KinesisEventMappingTest {
21+
22+
@Test
23+
public void testKinesisRecord() throws Exception {
24+
String fileContent = readFile("src/test/resources/KinesisEventLambda.json");
25+
ObjectMapper reader = new ObjectMapper();
26+
reader.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
27+
reader.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
28+
Map<String,Object> map = reader.readerFor(Map.class).readValue(fileContent);
29+
30+
List<Map<String,Object>> records = (List<Map<String, Object>>) get(map, "Records");
31+
32+
KinesisEvent inputObject = KinesisEventParser.parse(records);
33+
assertNotNull(inputObject);
34+
assertEquals(1, inputObject.getRecords().size());
35+
36+
KinesisEvent.KinesisEventRecord record = inputObject.getRecords().get(0);
37+
assertEquals("us-east-2", record.getAwsRegion());
38+
assertEquals("aws:kinesis", record.getEventSource());
39+
assertEquals("shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
40+
record.getEventID());
41+
assertEquals("arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", record.getEventSourceARN());
42+
assertEquals("aws:kinesis:record", record.getEventName());
43+
assertEquals("arn:aws:iam::123456789012:role/lambda-role", record.getInvokeIdentityArn());
44+
45+
KinesisEvent.Record kinesisRecord = record.getKinesis();
46+
assertEquals("1.0", kinesisRecord.getKinesisSchemaVersion());
47+
assertEquals("1", kinesisRecord.getPartitionKey());
48+
assertEquals("49590338271490256608559692538361571095921575989136588898",
49+
kinesisRecord.getSequenceNumber());
50+
assertEquals(ByteBuffer.wrap("Hello, this is a test.".getBytes()), kinesisRecord.getData());
51+
assertNotNull(kinesisRecord.getApproximateArrivalTimestamp());
52+
}
53+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"Records": [
3+
{
4+
"kinesis": {
5+
"kinesisSchemaVersion": "1.0",
6+
"partitionKey": "1",
7+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
8+
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
9+
"approximateArrivalTimestamp": 1545084650.987
10+
},
11+
"eventSource": "aws:kinesis",
12+
"eventVersion": "1.0",
13+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
14+
"eventName": "aws:kinesis:record",
15+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
16+
"awsRegion": "us-east-2",
17+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
18+
}
19+
]
20+
}

0 commit comments

Comments
 (0)