Skip to content

Commit 9bbcf9c

Browse files
authored
check the content of Kinesis messages for JSON/CBOR encoding types (#34)
1 parent af668fa commit 9bbcf9c

File tree

2 files changed

+83
-126
lines changed

2 files changed

+83
-126
lines changed
Lines changed: 49 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,71 @@
1-
package cloud.localstack.awssdkv1;
1+
package cloud.localstack;
22

33
import cloud.localstack.LocalstackTestRunner;
4+
import cloud.localstack.awssdkv1.TestUtils;
45

56
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
67
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
78
import com.amazonaws.services.kinesis.model.PutRecordRequest;
8-
import com.amazonaws.services.kinesis.model.PutRecordResult;
99
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
1010
import com.amazonaws.services.kinesis.model.GetRecordsResult;
1111
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
12-
import com.amazonaws.services.kinesis.model.Record;
13-
import com.amazonaws.internal.SdkInternalList;
12+
import com.amazonaws.SDKGlobalConfiguration;
1413

15-
import static org.junit.Assert.assertEquals;
16-
import static org.junit.Assert.assertNotNull;
1714
import org.junit.Assert;
1815
import org.junit.Test;
1916
import org.junit.runner.RunWith;
2017

21-
import static java.lang.System.out;
2218
import java.util.*;
2319
import java.util.concurrent.TimeUnit;
20+
import java.util.stream.Collectors;
2421
import java.nio.ByteBuffer;
2522

2623
@RunWith(LocalstackTestRunner.class)
2724
public class KinesisConsumerTest {
2825

29-
@Test
30-
public void testGetRecord() throws Exception{
31-
String streamName = "test-s-"+UUID.randomUUID().toString();
32-
AmazonKinesisAsync kinesisClient = TestUtils.getClientKinesisAsync();
33-
34-
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
35-
createStreamRequest.setStreamName(streamName);
36-
createStreamRequest.setShardCount(1);
37-
38-
kinesisClient.createStream(createStreamRequest);
39-
TimeUnit.SECONDS.sleep(2);
40-
41-
PutRecordRequest putRecordRequest = new PutRecordRequest();
42-
putRecordRequest.setPartitionKey("partitionkey");
43-
putRecordRequest.setStreamName(streamName);
44-
putRecordRequest.setData(ByteBuffer.wrap("hello, world!".getBytes()));
45-
46-
String shardId = kinesisClient.putRecord(putRecordRequest).getShardId();
47-
48-
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
49-
getShardIteratorRequest.setShardId(shardId);
50-
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
51-
getShardIteratorRequest.setStreamName(streamName);
52-
53-
String shardIterator = kinesisClient
54-
.getShardIterator(getShardIteratorRequest)
55-
.getShardIterator();
56-
57-
GetRecordsRequest getRecordRequest = new GetRecordsRequest() ;
58-
getRecordRequest.setShardIterator(shardIterator);
59-
60-
Integer limit = 100;
61-
Integer counter = 0;
62-
Boolean recordFound = false;
63-
64-
while (true) {
65-
getRecordRequest.setShardIterator(shardIterator);
66-
GetRecordsResult recordsResponse = kinesisClient.getRecords(getRecordRequest);
67-
68-
List records = recordsResponse.getRecords();
69-
if (records.isEmpty()) {
70-
recordFound = true;
71-
break;
72-
}
73-
74-
if(counter >= limit){
75-
break;
76-
}
77-
78-
counter += 1;
79-
shardIterator = recordsResponse.getNextShardIterator();
80-
}
81-
Assert.assertTrue(recordFound);
82-
}
26+
@Test
27+
public void testGetRecordCBOR() throws Exception {
28+
String streamName = "test-s-" + UUID.randomUUID().toString();
29+
AmazonKinesisAsync kinesisClient = TestUtils.getClientKinesisAsync();
30+
31+
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
32+
createStreamRequest.setStreamName(streamName);
33+
createStreamRequest.setShardCount(1);
34+
35+
kinesisClient.createStream(createStreamRequest);
36+
TimeUnit.SECONDS.sleep(2);
37+
38+
PutRecordRequest putRecordRequest = new PutRecordRequest();
39+
putRecordRequest.setPartitionKey("partitionkey");
40+
putRecordRequest.setStreamName(streamName);
41+
42+
String message = "Hello world!";
43+
putRecordRequest.setData(ByteBuffer.wrap(message.getBytes()));
44+
45+
String shardId = kinesisClient.putRecord(putRecordRequest).getShardId();
46+
47+
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
48+
getShardIteratorRequest.setShardId(shardId);
49+
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
50+
getShardIteratorRequest.setStreamName(streamName);
51+
52+
String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest).getShardIterator();
53+
54+
GetRecordsRequest getRecordRequest = new GetRecordsRequest();
55+
getRecordRequest.setShardIterator(shardIterator);
56+
57+
getRecordRequest.setShardIterator(shardIterator);
58+
GetRecordsResult recordsResponse = kinesisClient.getRecords(getRecordRequest);
59+
60+
List<String> records = recordsResponse.getRecords().stream().map(r -> new String(r.getData().array()))
61+
.collect(Collectors.toList());
62+
Assert.assertEquals(message, records.get(0));
63+
}
64+
65+
@Test
66+
public void testGetRecordJSON() throws Exception {
67+
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");
68+
this.testGetRecordCBOR();
69+
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "false");
70+
}
8371
}

src/test/java/cloud/localstack/awssdkv2/KinesisV2ConsumerTest.java

Lines changed: 34 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -2,84 +2,53 @@
22

33
import cloud.localstack.LocalstackTestRunner;
44

5-
import static org.junit.Assert.assertEquals;
6-
import static org.junit.Assert.assertNotNull;
75
import org.junit.Assert;
86
import org.junit.Test;
97
import org.junit.runner.RunWith;
108

11-
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
12-
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
13-
import software.amazon.awssdk.regions.Region;
149
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
1510
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
16-
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
17-
import software.amazon.awssdk.http.Protocol;
18-
import software.amazon.awssdk.utils.AttributeMap;
19-
import software.amazon.awssdk.http.nio.netty.*;
2011
import software.amazon.awssdk.core.*;
21-
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
22-
import software.amazon.awssdk.regions.Region;
23-
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
24-
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
25-
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
2612
import software.amazon.awssdk.services.kinesis.model.*;
2713

28-
import static java.lang.System.out;
2914
import java.util.*;
3015
import java.util.concurrent.TimeUnit;
16+
import java.util.stream.Collectors;
3117

3218
@RunWith(LocalstackTestRunner.class)
3319
public class KinesisV2ConsumerTest {
3420

35-
@Test
36-
public void testGetRecord() throws Exception{
37-
String streamName = "test-s-"+UUID.randomUUID().toString();
38-
KinesisAsyncClient kinesisClient = TestUtils.getClientKinesisAsyncV2();
39-
40-
CreateStreamRequest request = CreateStreamRequest.builder()
41-
.streamName(streamName).shardCount(1).build();
42-
CreateStreamResponse response = kinesisClient.createStream(request).get();
43-
Assert.assertNotNull(response);
44-
TimeUnit.SECONDS.sleep(2);
45-
46-
PutRecordRequest putRecordRequest = PutRecordRequest.builder()
47-
.partitionKey("partitionkey")
48-
.streamName(streamName)
49-
.data(SdkBytes.fromUtf8String("hello, world!"))
50-
.build();
51-
String shardId = kinesisClient.putRecord(putRecordRequest).get().shardId();
52-
53-
GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
54-
.shardId(shardId)
55-
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
56-
.streamName(streamName)
57-
.build();
58-
String shardIterator = kinesisClient
59-
.getShardIterator(getShardIteratorRequest)
60-
.get()
61-
.shardIterator();
62-
63-
GetRecordsRequest getRecordRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
64-
Integer limit = 100;
65-
Integer counter = 0;
66-
Boolean recordFound = false;
67-
68-
while (true) {
69-
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest).get();
70-
71-
if (recordsResponse.hasRecords()) {
72-
recordFound = true;
73-
break;
74-
}
75-
76-
if(counter >= limit){
77-
break;
78-
}
79-
80-
counter += 1;
81-
shardIterator = recordsResponse.nextShardIterator();
82-
}
83-
Assert.assertTrue(recordFound);
84-
}
21+
@Test
22+
public void testGetRecordCBOR() throws Exception {
23+
String streamName = "test-s-" + UUID.randomUUID().toString();
24+
KinesisAsyncClient kinesisClient = TestUtils.getClientKinesisAsyncV2();
25+
26+
CreateStreamRequest request = CreateStreamRequest.builder().streamName(streamName).shardCount(1).build();
27+
CreateStreamResponse response = kinesisClient.createStream(request).get();
28+
Assert.assertNotNull(response);
29+
TimeUnit.SECONDS.sleep(2);
30+
31+
String message = "hello, world!";
32+
PutRecordRequest putRecordRequest = PutRecordRequest.builder().partitionKey("partitionkey").streamName(streamName)
33+
.data(SdkBytes.fromUtf8String(message)).build();
34+
String shardId = kinesisClient.putRecord(putRecordRequest).get().shardId();
35+
36+
GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder().shardId(shardId)
37+
.shardIteratorType(ShardIteratorType.TRIM_HORIZON).streamName(streamName).build();
38+
String shardIterator = kinesisClient.getShardIterator(getShardIteratorRequest).get().shardIterator();
39+
40+
GetRecordsRequest getRecordRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
41+
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest).get();
42+
List<String> records = recordsResponse.records().stream().map(r -> new String(r.data().asUtf8String()))
43+
.collect(Collectors.toList());
44+
45+
Assert.assertEquals(message, records.get(0));
46+
}
47+
48+
@Test
49+
public void testGetRecordJSON() throws Exception {
50+
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
51+
this.testGetRecordCBOR();
52+
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
53+
}
8554
}

0 commit comments

Comments
 (0)