Skip to content

Commit 16ac544

Browse files
authored
add simple tests for Kinesis consumers using v1 and v2 SDKs (#28)
1 parent 2fe795f commit 16ac544

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package cloud.localstack.awssdkv1;
2+
3+
import cloud.localstack.LocalstackTestRunner;
4+
5+
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
6+
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
7+
import com.amazonaws.services.kinesis.model.PutRecordRequest;
8+
import com.amazonaws.services.kinesis.model.PutRecordResult;
9+
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
10+
import com.amazonaws.services.kinesis.model.GetRecordsResult;
11+
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
12+
import com.amazonaws.services.kinesis.model.Record;
13+
import com.amazonaws.internal.SdkInternalList;
14+
15+
import static org.junit.Assert.assertEquals;
16+
import static org.junit.Assert.assertNotNull;
17+
import org.junit.Assert;
18+
import org.junit.Test;
19+
import org.junit.runner.RunWith;
20+
21+
import static java.lang.System.out;
22+
import java.util.*;
23+
import java.util.concurrent.TimeUnit;
24+
import java.nio.ByteBuffer;
25+
26+
@RunWith(LocalstackTestRunner.class)
27+
public class KinesisConsumerTest {
28+
29+
@Test
30+
public void testGetRecord() throws Exception{
31+
String streamName = "test-s-"+UUID.randomUUID().toString();
32+
AmazonKinesisAsync kinesisClient = TestUtils.getClientKinesisAsync();
33+
34+
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
35+
createStreamRequest.setStreamName(streamName);
36+
createStreamRequest.setShardCount(1);
37+
38+
kinesisClient.createStream(createStreamRequest);
39+
TimeUnit.SECONDS.sleep(2);
40+
41+
PutRecordRequest putRecordRequest = new PutRecordRequest();
42+
putRecordRequest.setPartitionKey("partitionkey");
43+
putRecordRequest.setStreamName(streamName);
44+
putRecordRequest.setData(ByteBuffer.wrap("hello, world!".getBytes()));
45+
46+
String shardId = kinesisClient.putRecord(putRecordRequest).getShardId();
47+
48+
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
49+
getShardIteratorRequest.setShardId(shardId);
50+
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");
51+
getShardIteratorRequest.setStreamName(streamName);
52+
53+
String shardIterator = kinesisClient
54+
.getShardIterator(getShardIteratorRequest)
55+
.getShardIterator();
56+
57+
GetRecordsRequest getRecordRequest = new GetRecordsRequest() ;
58+
getRecordRequest.setShardIterator(shardIterator);
59+
60+
Integer limit = 100;
61+
Integer counter = 0;
62+
Boolean recordFound = false;
63+
64+
while (true) {
65+
getRecordRequest.setShardIterator(shardIterator);
66+
GetRecordsResult recordsResponse = kinesisClient.getRecords(getRecordRequest);
67+
68+
List records = recordsResponse.getRecords();
69+
if (records.isEmpty()) {
70+
recordFound = true;
71+
break;
72+
}
73+
74+
if(counter >= limit){
75+
break;
76+
}
77+
78+
counter += 1;
79+
shardIterator = recordsResponse.getNextShardIterator();
80+
}
81+
Assert.assertTrue(recordFound);
82+
}
83+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package cloud.localstack.awssdkv2;
2+
3+
import cloud.localstack.LocalstackTestRunner;
4+
5+
import static org.junit.Assert.assertEquals;
6+
import static org.junit.Assert.assertNotNull;
7+
import org.junit.Assert;
8+
import org.junit.Test;
9+
import org.junit.runner.RunWith;
10+
11+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
12+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
13+
import software.amazon.awssdk.regions.Region;
14+
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
15+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
16+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
17+
import software.amazon.awssdk.http.Protocol;
18+
import software.amazon.awssdk.utils.AttributeMap;
19+
import software.amazon.awssdk.http.nio.netty.*;
20+
import software.amazon.awssdk.core.*;
21+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
22+
import software.amazon.awssdk.regions.Region;
23+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
24+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
25+
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
26+
import software.amazon.awssdk.services.kinesis.model.*;
27+
28+
import static java.lang.System.out;
29+
import java.util.*;
30+
import java.util.concurrent.TimeUnit;
31+
32+
@RunWith(LocalstackTestRunner.class)
33+
public class KinesisV2ConsumerTest {
34+
35+
@Test
36+
public void testGetRecord() throws Exception{
37+
String streamName = "test-s-"+UUID.randomUUID().toString();
38+
KinesisAsyncClient kinesisClient = TestUtils.getClientKinesisAsyncV2();
39+
40+
CreateStreamRequest request = CreateStreamRequest.builder()
41+
.streamName(streamName).shardCount(1).build();
42+
CreateStreamResponse response = kinesisClient.createStream(request).get();
43+
Assert.assertNotNull(response);
44+
TimeUnit.SECONDS.sleep(2);
45+
46+
PutRecordRequest putRecordRequest = PutRecordRequest.builder()
47+
.partitionKey("partitionkey")
48+
.streamName(streamName)
49+
.data(SdkBytes.fromUtf8String("hello, world!"))
50+
.build();
51+
String shardId = kinesisClient.putRecord(putRecordRequest).get().shardId();
52+
53+
GetShardIteratorRequest getShardIteratorRequest = GetShardIteratorRequest.builder()
54+
.shardId(shardId)
55+
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
56+
.streamName(streamName)
57+
.build();
58+
String shardIterator = kinesisClient
59+
.getShardIterator(getShardIteratorRequest)
60+
.get()
61+
.shardIterator();
62+
63+
GetRecordsRequest getRecordRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
64+
Integer limit = 100;
65+
Integer counter = 0;
66+
Boolean recordFound = false;
67+
68+
while (true) {
69+
GetRecordsResponse recordsResponse = kinesisClient.getRecords(getRecordRequest).get();
70+
71+
if (recordsResponse.hasRecords()) {
72+
recordFound = true;
73+
break;
74+
}
75+
76+
if(counter >= limit){
77+
break;
78+
}
79+
80+
counter += 1;
81+
shardIterator = recordsResponse.nextShardIterator();
82+
}
83+
Assert.assertTrue(recordFound);
84+
}
85+
}

0 commit comments

Comments
 (0)