Reference
Schedulers.complete_tasks
— Methodtsks = complete_tasks(eloop)
Given eloop::ElasticLoop
, return a list of tasks that are complete.
Schedulers.epmap
— Methodepmap([options,] f, tasks, args...; kwargs...)
where f
is the map function, and tasks
is an iterable collection of tasks. The function f
takes the positional arguments args
, and the keyword arguments kwargs
. epmap
is parameterized using options::SchedulerOptions
and can be built using options=SchedulerOptions(;epmap_kwargs...)
and pmap_kwargs
are as follows.
epmap_kwargs
retries=0
number of times to retry a task on a given machine before removing that machine from the clustermaxerrors=typemax(Int)
the maximum number of errors before we give-up and exittimeout_multiplier=5
if any (miscellaneous) task takestimeout_multiplier
longer than the mean (miscellaneous) task time, then abort that tasktimeout_function_multiplier=5
if any (actual) task takestimeout_function_multiplier
longer than the (robust) mean (actual) task time, then abort that tasknull_tsk_runtime_threshold=0
the maximum duration (in seconds) for a task to be considered insignificant or ‘null’ and thus not included in the timeout measurement.skip_tsk_tol_ratio=0
the ratio of the total number of tasks that can be skippedgrace_period_ratio=0
the ratio between the "grace period" (when enough number of tasks are done) over the (robust) average task timeskip_tasks_that_timeout=false
skip task that exceed the timeout, or retry them on a different machineminworkers=Distributed.nworkers
method (or value) giving the minimum number of workers to elastically shrink tomaxworkers=Distributed.nworkers
method (or value) giving the maximum number of workers to elastically expand tousemaster=false
assign tasks to the master process?nworkers=Distributed.nworkers
the number of machines currently provisioned for work[1]quantum=()->32
the maximum number of workers to elastically add at a timeaddprocs=n->Distributed.addprocs(n)
method for adding n processes (will depend on the cluster manager being used)init=pid->nothing
after starting a worker, this method is run on that worker.preempt_channel_future=pid->nothing
method for retrieving aFuture
that hold aChannel
through which preemption events are communicated[2].checkpont_task=tsk->nothing
method that will be run if a preemption event is communicated.restart_task=tsk->nothing
method that will be run at the start of a task, and can be used for partially completed tasks that have checkpoint information.reporttasks=true
log task assignmentjournal_init_callback=tsks->nothing
additional method when intializing the journaljournal_task_callback=tsk->nothing
additional method when journaling a task
GXTODO: add doc
Notes
[1] The number of machines provisioned may be greater than the number of workers in the cluster since with some cluster managers, there may be a delay between the provisioining of a machine, and when it is added to the Julia cluster. [2] For example, on Azure Cloud a SPOT instance will be pre-empted if someone is willing to pay more for it
Schedulers.epmapreduce!
— Methodepmapreduce!(result, [options], f, tasks, args...; kwargs...) -> result
where f
is the map function, and tasks
are an iterable set of tasks to map over. The positional arguments args
and the named arguments kwargs
are passed to f
which has the method signature: f(localresult, f, task, args; kwargs...)
. localresult
is the assoicated partial reduction contribution to result
. epmapreduce!
is parameterized by options::SchedulerOptions
and can be built using options=SchedulerOptions(;epmap_kwargs...)
and epmap_kwargs
are as follows.
epmap_kwargs
reducer! = default_reducer!
the method used to reduce the result. The default is(x,y)->(x .+= y)
save_checkpoint = default_save_checkpoint
the method used to save checkpoints[1]load_checkpoint = default_load_checkpoint
the method used to load a checkpoint[2]zeros = ()->zeros(eltype(result), size(result))
the method used to initiaize partial reductionsretries=0
number of times to retry a task on a given machine before removing that machine from the clustermaxerrors=Inf
the maximum number of errors before we give-up and exittimeout_multiplier=5
if any (miscellaneous) task takestimeout_multiplier
longer than the mean (miscellaneous) task time, then abort that tasktimeout_function_multiplier=5
if any (actual) task takestimeout_function_multiplier
longer than the (robust) mean (actual) task time, then abort that tasknull_tsk_runtime_threshold=0
the maximum duration (in seconds) for a task to be considered insignificant or ‘null’ and thus not included in the timeout measurement.skip_tsk_tol_ratio=0
the ratio of the total number of tasks that can be skippedgrace_period_ratio=0
the ratio between the "grace period" (when enough number of tasks are done) over the (robust) average task timeskip_tasks_that_timeout=false
skip task that exceed the timeout, or retry them on a different machineminworkers=nworkers
method giving the minimum number of workers to elastically shrink tomaxworkers=nworkers
method giving the maximum number of workers to elastically expand tousemaster=false
assign tasks to the master process?nworkers=nworkers
the number of machines currently provisioned for work[3]quantum=()->32
the maximum number of workers to elastically add at a timeaddprocs=n->addprocs(n)
method for adding n processes (will depend on the cluster manager being used)init=pid->nothing
after starting a worker, this method is run on that worker.preempt_channel_future=pid->nothing
method for retrieving aFuture
that hold aChannel
through which preemption events are communicated.checkpont_task=tsk->nothing
method that will be run if a preemption event is communicated.restart_task=tsk->nothing
method that will be run at the start of a task, and can be used for partially completed tasks that have checkpoint information.scratch=["/scratch"]
storage location accesible to all cluster machines (e.g NFS, Azure blobstore,...)[4]reporttasks=true
log task assignmentjournalfile=""
write a journal showing what was computed where to a json filejournal_init_callback=tsks->nothing
additional method when intializing the journaljournal_task_callback=tsk->nothing
additional method when journaling a taskid=randstring(6)
identifier used for the scratch filesreduce_trigger=eloop->nothing
condition for triggering a reduction prior to the completion of the mapsave_partial_reduction=x->nothing
method for saving a partial reduction triggered byreduce_trigger
Notes
[1] The signiture is my_save_checkpoint(checkpoint_file, x)
where checkpoint_file
is the file that will be written to, and x
is the data that will be written. [2] The signiture is my_load_checkpoint(checkpoint_file)
where checkpoint_file
is the file that data will be loaded from. [3] The number of machines provisioined may be greater than the number of workers in the cluster since with some cluster managers, there may be a delay between the provisioining of a machine, and when it is added to the Julia cluster. [4] If more than one scratch location is selected, then check-point files will be distributed across those locations. This can be useful if you are, for example, constrained by cloud storage through-put limits.
Examples
Example 1
With the assumption that /scratch
is accesible from all workers:
using Distributed
addprocs(2)
@everywhere using Distributed, Schedulers
@everywhere f(x, tsk) = x .+= tsk; nothing)
result,tsks = epmapreduce!(zeros(Float32,10), f, 1:100)
rmprocs(workers())
Example 2
Using Azure blob storage:
using Distributed, AzStorage
container = AzContainer("scratch"; storageaccount="mystorageaccount")
mkpath(container)
addprocs(2)
@everywhere using Distributed, Schedulers
@everywhere f(x, tsk) = (x .+= tsk; nothing)
result,tsks = epmapreduce!(zeros(Float32,10), SchedulerOptions(;scratch=container), f, 1:100)
rmprocs(workers())
Example 3
With a reduce trigger every 10 minutes:
function my_reduce_trigger(eloop, tic)
if time() - tic[] > 600
trigger_reduction!(eloop)
tic[] = time()
end
end
my_save(r, filename) = write(filename, r)
tic = Ref(time())
addprocs(2)
@everywhere f(x, tsk) = (x .+= tsk; nothing)
@everywhere using Distributed, Schedulers
result,tsks = epmapreduce!(zeros(Float32,10), SchedulerOptions(;reduce_trigger=eloop->my_reduce_trigger(eloop, tic), save_partial_reduction=r->my_save(r, "partial.bin"), f, 1:100)
Note that the methods complete_tasks
, pending_tasks
, reduced_tasks
, and total_tasks
can be useful when designing the reduce_trigger
method.
Schedulers.pending_tasks
— Methodtsks = pending_tasks(eloop)
Given eloop::ElasticLoop
, return a list of tasks that are still pending.
Schedulers.reduced_tasks
— Methodtsks = reduced_tasks(eloop)
Given eloop::ElasticLoop
, return a list of tasks that are complete and reduced.
Schedulers.total_tasks
— Methodn = total_tasks(eloop)
Given eloop::ElasticLoop
, return the number of total tasks that are being mapped over.