Aws backend - parallelize invocation and reduction#1
Conversation
Vetchu
left a comment
There was a problem hiding this comment.
Another general thought is moving most function declarations to static in this file instead of inlining them (or moving these to separate file, utils)
| trials = 3 | ||
|
|
||
| client = boto3.client('lambda', region_name=self.region) | ||
| def process_response(response): |
There was a problem hiding this comment.
this whole function could be simplified to "try json.loads(response[payload].read) except exception e debug log and set empty dict"
| # Maybe here give info about number of invoked lambda for awsmonitor | ||
| #self.logger.info(f'New lambda - 31') | ||
|
|
||
| while trials: |
There was a problem hiding this comment.
This could be more explicit by stating "trials != 0", as that's what ends this while
| #self.logger.info(f'Invocation time: {my_after_time - my_before_time}') | ||
|
|
||
| if 'FunctionError' in response: | ||
| raise RuntimeError('Lambda runtime error') |
There was a problem hiding this comment.
The error from response should be passed along to RuntimeError
| )['Body'].read()) | ||
|
|
||
| files = [] | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=len(filenames)) as executor: |
There was a problem hiding this comment.
this construction repeats multiple times, could go as separate generic function
| except Exception as error: | ||
| # All other errors | ||
| self.logger.warn(error) | ||
| else: |
There was a problem hiding this comment.
is this else even needed at this point?
There was a problem hiding this comment.
Code under this else is executed only if there is no exception, what indicates that the loop should be ended straight away, cause lambda invocation have been successful. Idk, can change it.
| response_payload = response['Payload'] | ||
| lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200 | ||
| if lambda_status != 200: | ||
| raise RuntimeError(f'Lambda status code {lambda_status}') |
There was a problem hiding this comment.
Again the exact reason is not passed further
|
|
||
| if 'Payload' in response: | ||
| response_payload = response['Payload'] | ||
| lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200 |
There was a problem hiding this comment.
Can it ever not be in payload?
There was a problem hiding this comment.
Payload is a serialized value that lambda returns. By default it is dict with some values and one of the keys is statusCode, but it can be omitted. It was done without prior knowledge of lambda's insides and with an assumption that some lambda logic may be based on the early return of other statusCode f.e. to indicate error. This is not the case here, so can remove this whole paylod thing for a potential speedup.
| pass | ||
|
|
||
| @staticmethod | ||
| def process_execution(s3_client, processing_bucket, num_of_lambdas, logger): |
There was a problem hiding this comment.
this name is a bit misleading, maybe something more of "wait_for_all_lambdas" or "await_for_completion"
Vetchu
left a comment
There was a problem hiding this comment.
Another general thought is moving most function declarations to static in this file instead of inlining them (or moving these to separate file, utils)
No description provided.