|
3 | 3 | import base64 |
4 | 4 | import json |
5 | 5 | import zlib |
6 | | -from typing import TYPE_CHECKING |
| 6 | +from typing import TYPE_CHECKING, Any |
7 | 7 |
|
8 | 8 | from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import ( |
9 | 9 | CloudWatchLogsDecodedData, |
@@ -100,19 +100,59 @@ def kinesis(self) -> KinesisStreamRecordPayload: |
100 | 100 | return KinesisStreamRecordPayload(self["kinesis"]) |
101 | 101 |
|
102 | 102 |
|
| 103 | +class KinesisStreamWindow(DictWrapper): |
| 104 | + @property |
| 105 | + def start(self) -> str: |
| 106 | + """The time window started""" |
| 107 | + return self["start"] |
| 108 | + |
| 109 | + @property |
| 110 | + def end(self) -> str: |
| 111 | + """The time window will end""" |
| 112 | + return self["end"] |
| 113 | + |
| 114 | + |
103 | 115 | class KinesisStreamEvent(DictWrapper): |
104 | 116 | """Kinesis stream event |
105 | 117 |
|
106 | 118 | Documentation: |
107 | 119 | -------------- |
108 | 120 | - https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html |
| 121 | + - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html |
109 | 122 | """ |
110 | 123 |
|
111 | 124 | @property |
112 | 125 | def records(self) -> Iterator[KinesisStreamRecord]: |
113 | 126 | for record in self["Records"]: |
114 | 127 | yield KinesisStreamRecord(record) |
115 | 128 |
|
| 129 | + @property |
| 130 | + def window(self) -> KinesisStreamWindow | None: |
| 131 | + window = self.get("window") |
| 132 | + if window: |
| 133 | + return KinesisStreamWindow(window) |
| 134 | + return window |
| 135 | + |
| 136 | + @property |
| 137 | + def state(self) -> dict[str, Any]: |
| 138 | + return self.get("state") or {} |
| 139 | + |
| 140 | + @property |
| 141 | + def shard_id(self) -> str | None: |
| 142 | + return self.get("shardId") |
| 143 | + |
| 144 | + @property |
| 145 | + def event_source_arn(self) -> str | None: |
| 146 | + return self.get("eventSourceARN") |
| 147 | + |
| 148 | + @property |
| 149 | + def is_final_invoke_for_window(self) -> bool | None: |
| 150 | + return self.get("isFinalInvokeForWindow") |
| 151 | + |
| 152 | + @property |
| 153 | + def is_window_terminated_early(self) -> bool | None: |
| 154 | + return self.get("isWindowTerminatedEarly") |
| 155 | + |
116 | 156 |
|
117 | 157 | def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]: |
118 | 158 | return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records] |
|
0 commit comments