RunGraphs: Trigger the execution of multiple graphs#106
Conversation
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def run_graphs(proxies, numthreads=multiprocessing.cpu_count()): |
There was a problem hiding this comment.
Does this numthreads parameter really determine the number of event loops that are going to run in parallel?
There was a problem hiding this comment.
Correct, the function spawns exactly numthreads python threading.Thread objects. Each thread will submit a Spark job separately, which in turn will run a distributed event loop on the cluster. So numthreads is also the number of event loops that will be running distributedly and concurrently on the cluster at the same time.
The main reason is to avoid to spawn too many python threads at the same time, for example in an analysis with more samples than available threads on the driver
There was a problem hiding this comment.
Yes, I agree it's good to have it as limiting factor. What I'm not so sure about is the default, since a priori the number of cores in my client machine is not necessarily related to the number of concurrent jobs I am able to submit to a particular Spark cluster.
There was a problem hiding this comment.
Fair point, I just put that number as a first approximation. I think it should be somewhere between 4-8, anyway it's a parameter so a user could also ask for more threads to submit more concurrent jobs
There was a problem hiding this comment.
RunGraphs defaults to 4 concurrent job submissions for the moment
The RunGraphs function is inspired by ROOT::RDF::RunGraphs. In PyRDF, this function dispatches the concurrent execution of multiple computation graphs to the backend in use. If the backend doesn't implement this functionality, it defaults to running the distributed graphs sequentially.
eb378ab to
e67d0ea
Compare
The new
run_graphsfunction is inspired by the logic of ROOT::RDF::RunGraphs. Its main use for now is to trigger the execution of multiple Spark jobs concurrently, each job is submitted in a different thread as suggested by the Spark docs.This implementation is open to suggestions, as are also the name of the function and the place it should belong to in PyRDF. A first test is added to check that the functionality works