Support Cancellation of Spark Jobs#665
Conversation
|
Thank you for your pull request. An admin will review this request soon. |
There was a problem hiding this comment.
It seems kinda weird to me that InterruptibleIterator is actually implementing hasNext, which then you override here. Maybe it should have a method exceptionIfThreadInterrupted. It seems like the trait is not actually implementing hasNext at all, its just supplying a utility methods for implementations.
There was a problem hiding this comment.
Yes, this seems odd to me, too. Is there some reason why InterruptibleIterator can't be implemented more like CompletionIterator? In other words, class InterruptibleIteratorDecorator just becomes class InterruptibleIterator, which extends Iterator; and your InterruptibleIterator trait becomes object InterruptibleIterator, which defs a function called something like notInterrupted to replace all of the calls to InterruptibleIterator.hasNext -- i.e. the peculiar super.hasNext calls become something like InterruptibleIterator.notInterrupted.
|
This looks great! I have wanted this feature for a long time. I made a couple of minor style comments. I only have one question about the implementation -- it seems like you register the cores as freed as soon as the killTask request gets sent. Do the tasks really die immediately? Should it wait for some acknowledgement that the task really has been killed? I guess its not horrible to have too many tasks running on an executor for a little while, so if it adds a lot of complication for a rare corner case, maybe we can forget it. |
|
Thanks for your comments., I'll incorporate them in the update i will add shortly with TPCH performance #s. |
There was a problem hiding this comment.
Should we be catching Throwable here?
There was a problem hiding this comment.
yes, you are right., we should be catching throwable here.
|
There was one major issue I ran into when I tried to do this before. I am not sure if this still applies: When canceling tasks that were reading files from HDFS, I noticed that the sockets open in (As a generalization, I guess we are relying on user-defined classes behaving appropriately when we interrupt a thread ?) |
|
Hey Ram, as I'm looking at this more closely, one question on the Iterator design: why do we need CompletionIterator to also extend InterruptibleIterator? Can't we just catch the InterruptedException in the code that's running the task and run the cleanup procedure there? |
|
(Or more generally, catch it on a call to next or hasNext; basically the on-complete callbacks should be called even if the task throws an exception, so InterruptedException doesn't need to be different). |
|
Hey Matei, |
|
After merging this PR into master @ 7dcda9a, CancellationSuite "Cancel Task" is not completing for me. |
|
Oh ok will take a look. The test might not have been as well thought out as I imagined, it seems to work on my machine but possibly has a race condition. Sent from my iPhone On Jul 10, 2013, at 12:20 PM, Mark Hamstra notifications@github.com wrote:
|
There was a problem hiding this comment.
Yes, that is actually true. To do this properly we'll need to do some kind of reference-counting on the stages (keep a list of which jobs currently want to run this stage). One difference here is that killJob is called by the user and for the first use case, of Shark, it's probably going to be fine. But it would be good to either track this properly or send a warning.
There was a problem hiding this comment.
That's pretty much the conclusion that I was arriving at. I'll work on the reference-counting refactoring. Should be doable independently of this PR and only require a minimal change here once it is done.
There was a problem hiding this comment.
Cool, that would be great to have.
|
Thank you for your pull request. An admin will review this request soon. |
|
Ram - I am closing this one because it is going to be subsumed by #935. |
…in"... ... java.lang.ClassNotFoundException: org.apache.spark.broadcast.TorrentBroadcastFactory Author: witgo <witgo@qq.com> Closes mesos#665 from witgo/SPARK-1734 and squashes the following commits: cacf238 [witgo] SPARK-1734: spark-submit throws an exception: Exception in thread "main" java.lang.ClassNotFoundException: org.apache.spark.broadcast.TorrentBroadcastFactory
Hi
This patch allows us to cancel runaway Spark queries by issuing a killJob command passing in the jobId. The approach taken here is to let the DAG scheduler clean up its state about a job that is currently executing, and propagate an interrupt to the running task. Each task runs within a Future to properly respond to interrupts and cancel execution. Every Iterator that iterates over data (either via CacheManager/ or BlockFetcher or the HadoopRDD) is wrapped by an InterruptibleIterator which checks for the interrupt and cleans up state accordingly and exits.
I have tested the performance of this patch against master on a bunch of internal queries and the performance is not impacted by this patch.
I am in the process of obtaining TPCH benchmarks with and without the patch which i will attach here.
In the meanwhile, please review and let me know if the design needs changes.