Skip to content

Aws backend - parallelize invocation and reduction#1

Open
Vetchu wants to merge 3 commits into
CloudPyRDF:AWS-backendfrom
Kamilbur:AWS-backend
Open

Aws backend - parallelize invocation and reduction#1
Vetchu wants to merge 3 commits into
CloudPyRDF:AWS-backendfrom
Kamilbur:AWS-backend

Conversation

@Vetchu
Copy link
Copy Markdown
Member

@Vetchu Vetchu commented Jul 24, 2021

No description provided.

Copy link
Copy Markdown
Member Author

@Vetchu Vetchu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another general thought is moving most function declarations to static in this file instead of inlining them (or moving these to separate file, utils)

Comment thread PyRDF/backend/AWS.py Outdated
trials = 3

client = boto3.client('lambda', region_name=self.region)
def process_response(response):
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole function could be simplified to "try json.loads(response[payload].read) except exception e debug log and set empty dict"

Comment thread PyRDF/backend/AWS.py Outdated
# Maybe here give info about number of invoked lambda for awsmonitor
#self.logger.info(f'New lambda - 31')

while trials:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be more explicit by stating "trials != 0", as that's what ends this while

Comment thread PyRDF/backend/AWS.py Outdated
#self.logger.info(f'Invocation time: {my_after_time - my_before_time}')

if 'FunctionError' in response:
raise RuntimeError('Lambda runtime error')
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error from response should be passed along to RuntimeError

Comment thread PyRDF/backend/AWS.py
)['Body'].read())

files = []
with concurrent.futures.ThreadPoolExecutor(max_workers=len(filenames)) as executor:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this construction repeats multiple times, could go as separate generic function

Comment thread PyRDF/backend/AWS.py Outdated
except Exception as error:
# All other errors
self.logger.warn(error)
else:
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this else even needed at this point?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code under this else is executed only if there is no exception, what indicates that the loop should be ended straight away, cause lambda invocation have been successful. Idk, can change it.

Comment thread PyRDF/backend/AWS.py Outdated
response_payload = response['Payload']
lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200
if lambda_status != 200:
raise RuntimeError(f'Lambda status code {lambda_status}')
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again the exact reason is not passed further

Comment thread PyRDF/backend/AWS.py Outdated

if 'Payload' in response:
response_payload = response['Payload']
lambda_status = response_payload['statusCode'] if 'statusCode' in response_payload else 200
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it ever not be in payload?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Payload is a serialized value that lambda returns. By default it is dict with some values and one of the keys is statusCode, but it can be omitted. It was done without prior knowledge of lambda's insides and with an assumption that some lambda logic may be based on the early return of other statusCode f.e. to indicate error. This is not the case here, so can remove this whole paylod thing for a potential speedup.

Comment thread PyRDF/backend/AWS.py Outdated
pass

@staticmethod
def process_execution(s3_client, processing_bucket, num_of_lambdas, logger):
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this name is a bit misleading, maybe something more of "wait_for_all_lambdas" or "await_for_completion"

Copy link
Copy Markdown
Member Author

@Vetchu Vetchu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another general thought is moving most function declarations to static in this file instead of inlining them (or moving these to separate file, utils)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants