Reference
Schedulers.complete_tasks — Methodtsks = complete_tasks(eloop)Given eloop::ElasticLoop, return a list of tasks that are complete.
Schedulers.epmap — Methodepmap([options,] f, tasks, args...; kwargs...)where f is the map function, and tasks is an iterable collection of tasks. The function f takes the positional arguments args, and the keyword arguments kwargs. epmap is parameterized using options::SchedulerOptions and can be built using options=SchedulerOptions(;epmap_kwargs...) and pmap_kwargs are as follows.
epmap_kwargs
retries=0number of times to retry a task on a given machine before removing that machine from the clustermaxerrors=typemax(Int)the maximum number of errors before we give-up and exittimeout_multiplier=5if any (miscellaneous) task takestimeout_multiplierlonger than the mean (miscellaneous) task time, then abort that tasktimeout_function_multiplier=5if any (actual) task takestimeout_function_multiplierlonger than the (robust) mean (actual) task time, then abort that tasknull_tsk_runtime_threshold=0the maximum duration (in seconds) for a task to be considered insignificant or ‘null’ and thus not included in the timeout measurement.skip_tsk_tol_ratio=0the ratio of the total number of tasks that can be skippedgrace_period_ratio=0the ratio between the "grace period" (when enough number of tasks are done) over the (robust) average task timeskip_tasks_that_timeout=falseskip task that exceed the timeout, or retry them on a different machineminworkers=Distributed.nworkersmethod (or value) giving the minimum number of workers to elastically shrink tomaxworkers=Distributed.nworkersmethod (or value) giving the maximum number of workers to elastically expand tousemaster=falseassign tasks to the master process?nworkers=Distributed.nworkersthe number of machines currently provisioned for work[1]quantum=()->32the maximum number of workers to elastically add at a timeaddprocs=n->Distributed.addprocs(n)method for adding n processes (will depend on the cluster manager being used)init=pid->nothingafter starting a worker, this method is run on that worker.preempt_channel_future=pid->nothingmethod for retrieving aFuturethat hold aChannelthrough which preemption events are communicated[2].checkpont_task=tsk->nothingmethod that will be run if a preemption event is communicated.restart_task=tsk->nothingmethod that will be run at the start of a task, and can be used for partially completed tasks that have checkpoint information.reporttasks=truelog task assignmentjournal_init_callback=tsks->nothingadditional method when initializing the journaljournal_task_callback=tsk->nothingadditional method when journaling a task
GXTODO: add doc
Notes
[1] The number of machines provisioned may be greater than the number of workers in the cluster since with some cluster managers, there may be a delay between the provisioining of a machine, and when it is added to the Julia cluster. [2] For example, on Azure Cloud a SPOT instance will be pre-emptied if someone is willing to pay more for it
Schedulers.epmapreduce! — Methodepmapreduce!(result, [options], f, tasks, args...; kwargs...) -> resultwhere f is the map function, and tasks are an iterable set of tasks to map over. The positional arguments args and the named arguments kwargs are passed to f which has the method signature: f(localresult, f, task, args; kwargs...). localresult is the associated partial reduction contribution to result. epmapreduce! is parameterized by options::SchedulerOptions and can be built using options=SchedulerOptions(;epmap_kwargs...) and epmap_kwargs are as follows.
epmap_kwargs
reducer! = default_reducer!the method used to reduce the result. The default is(x,y)->(x .+= y)save_checkpoint = default_save_checkpointthe method used to save checkpoints[1]load_checkpoint = default_load_checkpointthe method used to load a checkpoint[2]zeros = ()->zeros(eltype(result), size(result))the method used to initialize partial reductionsretries=0number of times to retry a task on a given machine before removing that machine from the clustermaxerrors=Infthe maximum number of errors before we give-up and exittimeout_multiplier=5if any (miscellaneous) task takestimeout_multiplierlonger than the mean (miscellaneous) task time, then abort that tasktimeout_function_multiplier=5if any (actual) task takestimeout_function_multiplierlonger than the (robust) mean (actual) task time, then abort that tasknull_tsk_runtime_threshold=0the maximum duration (in seconds) for a task to be considered insignificant or ‘null’ and thus not included in the timeout measurement.skip_tsk_tol_ratio=0the ratio of the total number of tasks that can be skippedgrace_period_ratio=0the ratio between the "grace period" (when enough number of tasks are done) over the (robust) average task timeskip_tasks_that_timeout=falseskip task that exceed the timeout, or retry them on a different machineminworkers=nworkersmethod giving the minimum number of workers to elastically shrink tomaxworkers=nworkersmethod giving the maximum number of workers to elastically expand tousemaster=falseassign tasks to the master process?nworkers=nworkersthe number of machines currently provisioned for work[3]quantum=()->32the maximum number of workers to elastically add at a timeaddprocs=n->addprocs(n)method for adding n processes (will depend on the cluster manager being used)init=pid->nothingafter starting a worker, this method is run on that worker.preempt_channel_future=pid->nothingmethod for retrieving aFuturethat hold aChannelthrough which preemption events are communicated.checkpont_task=tsk->nothingmethod that will be run if a preemption event is communicated.restart_task=tsk->nothingmethod that will be run at the start of a task, and can be used for partially completed tasks that have checkpoint information.scratch=["/scratch"]storage location accessible to all cluster machines (e.g NFS, Azure blobstore,...)[4]reporttasks=truelog task assignmentjournalfile=""write a journal showing what was computed where to a json filejournal_init_callback=tsks->nothingadditional method when initializing the journaljournal_task_callback=tsk->nothingadditional method when journaling a taskid=randstring(6)identifier used for the scratch filesreduce_trigger=eloop->nothingcondition for triggering a reduction prior to the completion of the mapsave_partial_reduction=x->nothingmethod for saving a partial reduction triggered byreduce_trigger
Notes
[1] The signature is my_save_checkpoint(checkpoint_file, x) where checkpoint_file is the file that will be written to, and x is the data that will be written. [2] The signature is my_load_checkpoint(checkpoint_file) where checkpoint_file is the file that data will be loaded from. [3] The number of machines provisioined may be greater than the number of workers in the cluster since with some cluster managers, there may be a delay between the provisioining of a machine, and when it is added to the Julia cluster. [4] If more than one scratch location is selected, then check-point files will be distributed across those locations. This can be useful if you are, for example, constrained by cloud storage through-put limits.
Examples
Example 1
With the assumption that /scratch is accessible from all workers:
using Distributed
addprocs(2)
@everywhere using Distributed, Schedulers
@everywhere f(x, tsk) = x .+= tsk; nothing)
result,tsks = epmapreduce!(zeros(Float32,10), f, 1:100)
rmprocs(workers())Example 2
Using Azure blob storage:
using Distributed, AzStorage
container = AzContainer("scratch"; storageaccount="mystorageaccount")
mkpath(container)
addprocs(2)
@everywhere using Distributed, Schedulers
@everywhere f(x, tsk) = (x .+= tsk; nothing)
result,tsks = epmapreduce!(zeros(Float32,10), SchedulerOptions(;scratch=container), f, 1:100)
rmprocs(workers())Example 3
With a reduce trigger every 10 minutes:
function my_reduce_trigger(eloop, tic)
if time() - tic[] > 600
trigger_reduction!(eloop)
tic[] = time()
end
end
my_save(r, filename) = write(filename, r)
tic = Ref(time())
addprocs(2)
@everywhere f(x, tsk) = (x .+= tsk; nothing)
@everywhere using Distributed, Schedulers
result,tsks = epmapreduce!(zeros(Float32,10), SchedulerOptions(;reduce_trigger=eloop->my_reduce_trigger(eloop, tic), save_partial_reduction=r->my_save(r, "partial.bin"), f, 1:100)Note that the methods complete_tasks, pending_tasks, reduced_tasks, and total_tasks can be useful when designing the reduce_trigger method.
Schedulers.pending_tasks — Methodtsks = pending_tasks(eloop)Given eloop::ElasticLoop, return a list of tasks that are still pending.
Schedulers.reduced_tasks — Methodtsks = reduced_tasks(eloop)Given eloop::ElasticLoop, return a list of tasks that are complete and reduced.
Schedulers.total_tasks — Methodn = total_tasks(eloop)Given eloop::ElasticLoop, return the number of total tasks that are being mapped over.