-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-55476][PYTHON] Refactor broadcast variable protocol #54258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
gaogaotiantian
wants to merge
1
commit into
apache:master
Choose a base branch
from
gaogaotiantian:refactor-broadcast
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+53
−56
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would we use this info for debugging? I feel like you can just log it here instead of JSON ser/de
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not quite get the question. We are not using it for debugging. This is part of the protocol. We need this from JVM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind describing how we're going to use this in the PR description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry use what? This is part of the JVM <-> Python worker protocol. We are not adding any new features. JVM used to send broadcast variable information integer by integer (with some strings in the middle) to Python. Now instead of that raw fragile protocol, we send all of the broadcast variable information in a JSON string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to understand why we need this. Is this to purely make the protocol more stable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case, I am not super supportive of this change. This could impact jobs like Structured Streaming (with micro batches) or ML jobs that disable
spark.python.worker.reuse(which happen often in practice to work around any problem by having long living daemon worker). Considering the overhead vs benefit, I would prefer to just leave it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance impact is a big red herring. This change introduced two kinds of "overhead":
Decoding a small json string takes about 1us. It's probably on the same range on scala side. Local network runs at least 10Gbps, an extra 100 bytes takes about 0.1us.
That's the overhead we introduce for every UDF run.
Currently, without reuse-worker, each worker takes about a few hundred ms to spawn. I made an optimization a few weeks ago that eliminated 100-200ms per spawn for reused worker and no one even notice it.
1us is 0.01% of 100ms. That's literally nothing. If we care about 1us, we have serious issues with our current UDF. I can get a lot of 1us from our current code if that's what we need to make our protocol more stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is that the benefit is not super compelling. If we plan to refactor the whole protocol or sth, yeah probably we should go ahead. But doing this alone doesn't look worthwhile to me.
If we do want to refactor the whole protocol, we should better have a bigger picture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the effort to refactor the whole protocol. I'm doing it piece by piece so eventually we can have a structured message from JVM. Ideally a multi-phased message. All the initialization (probably the UDF definition) should be sent in a single message. The message should be relatively resistant to new changes. For example it won't stuck if we decide to add something new. Or it should report a clear error when the message is not following the protocol.
json is good in a sense that, if we decide to add something else to broadcast variable protocol, it's easy. If we did it wrong, we can find it quickly too. Otherwise we had to be really careful about where to insert the
read_longand if we did not do it correctly, the worker could stuck at an arbitrary point.This refactor actually eliminated some unnecessary fields. Our old protocol is too fragile that no one is willing to touch it. I want to gradually convert it to a more structured way. A single switch is a bit too dangerous.