Reference

Schedulers.epmapMethod
epmap([options,] f, tasks, args...; kwargs...)

where f is the map function, and tasks is an iterable collection of tasks. The function f takes the positional arguments args, and the keyword arguments kwargs. epmap is parameterized using options::SchedulerOptions and can be built using options=SchedulerOptions(;epmap_kwargs...) and pmap_kwargs are as follows.

epmap_kwargs

  • retries=0 number of times to retry a task on a given machine before removing that machine from the cluster
  • maxerrors=typemax(Int) the maximum number of errors before we give-up and exit
  • timeout_multiplier=5 if any task takes timeout_multiplier longer than the mean task time, then abort that task
  • skip_tasks_that_timeout=false skip task that exceed the timeout, or retry them on a different machine
  • minworkers=Distributed.nworkers method (or value) giving the minimum number of workers to elastically shrink to
  • maxworkers=Distributed.nworkers method (or value) giving the maximum number of workers to elastically expand to
  • usemaster=false assign tasks to the master process?
  • nworkers=Distributed.nworkers the number of machines currently provisioned for work[1]
  • quantum=()->32 the maximum number of workers to elastically add at a time
  • addprocs=n->Distributed.addprocs(n) method for adding n processes (will depend on the cluster manager being used)
  • init=pid->nothing after starting a worker, this method is run on that worker.
  • preempted=()->false method for determining of a machine got pre-empted (removed on purpose)[2]
  • reporttasks=true log task assignment
  • journal_init_callback=tsks->nothing additional method when intializing the journal
  • journal_task_callback=tsk->nothing additional method when journaling a task

Notes

[1] The number of machines provisioned may be greater than the number of workers in the cluster since with some cluster managers, there may be a delay between the provisioining of a machine, and when it is added to the Julia cluster. [2] For example, on Azure Cloud a SPOT instance will be pre-empted if someone is willing to pay more for it

source
Schedulers.epmapreduce!Method
epmapreduce!(result, [options], f, tasks, args...; kwargs...) -> result

where f is the map function, and tasks are an iterable set of tasks to map over. The positional arguments args and the named arguments kwargs are passed to f which has the method signature: f(localresult, f, task, args; kwargs...). localresult is the assoicated partial reduction contribution to result. epmapreduce! is parameterized by options::SchedulerOptions and can be built using options=SchedulerOptions(;epmap_kwargs...) and epmap_kwargs are as follows.

epmap_kwargs

  • reducer! = default_reducer! the method used to reduce the result. The default is (x,y)->(x .+= y)
  • save_checkpoint = default_save_checkpoint the method used to save checkpoints[1]
  • load_checkpoint = default_load_checkpoint the method used to load a checkpoint[2]
  • zeros = ()->zeros(eltype(result), size(result)) the method used to initiaize partial reductions
  • retries=0 number of times to retry a task on a given machine before removing that machine from the cluster
  • maxerrors=Inf the maximum number of errors before we give-up and exit
  • timeout_multiplier=5 if any task takes timeout_multiplier longer than the mean task time, then abort that task
  • skip_tasks_that_timeout=false skip task that exceed the timeout, or retry them on a different machine
  • minworkers=nworkers method giving the minimum number of workers to elastically shrink to
  • maxworkers=nworkers method giving the maximum number of workers to elastically expand to
  • usemaster=false assign tasks to the master process?
  • nworkers=nworkers the number of machines currently provisioned for work[3]
  • quantum=()->32 the maximum number of workers to elastically add at a time
  • addprocs=n->addprocs(n) method for adding n processes (will depend on the cluster manager being used)
  • init=pid->nothing after starting a worker, this method is run on that worker.
  • scratch=["/scratch"] storage location accesible to all cluster machines (e.g NFS, Azure blobstore,...)[4]
  • reporttasks=true log task assignment
  • journalfile="" write a journal showing what was computed where to a json file
  • journal_init_callback=tsks->nothing additional method when intializing the journal
  • journal_task_callback=tsk->nothing additional method when journaling a task
  • id=randstring(6) identifier used for the scratch files
  • reduce_trigger=eloop->nothing condition for triggering a reduction prior to the completion of the map
  • save_partial_reduction=x->nothing method for saving a partial reduction triggered by reduce_trigger

Notes

[1] The signiture is my_save_checkpoint(checkpoint_file, x) where checkpoint_file is the file that will be written to, and x is the data that will be written. [2] The signiture is my_load_checkpoint(checkpoint_file) where checkpoint_file is the file that data will be loaded from. [3] The number of machines provisioined may be greater than the number of workers in the cluster since with some cluster managers, there may be a delay between the provisioining of a machine, and when it is added to the Julia cluster. [4] If more than one scratch location is selected, then check-point files will be distributed across those locations. This can be useful if you are, for example, constrained by cloud storage through-put limits.

Examples

Example 1

With the assumption that /scratch is accesible from all workers:

using Distributed
addprocs(2)
@everywhere using Distributed, Schedulers
@everywhere f(x, tsk) = x .+= tsk; nothing)
result,tsks = epmapreduce!(zeros(Float32,10), f, 1:100)
rmprocs(workers())

Example 2

Using Azure blob storage:

using Distributed, AzStorage
container = AzContainer("scratch"; storageaccount="mystorageaccount")
mkpath(container)
addprocs(2)
@everywhere using Distributed, Schedulers
@everywhere f(x, tsk) = (x .+= tsk; nothing)
result,tsks = epmapreduce!(zeros(Float32,10), SchedulerOptions(;scratch=container), f, 1:100)
rmprocs(workers())

Example 3

With a reduce trigger every 10 minutes:

function my_reduce_trigger(eloop, tic)
    if time() - tic[] > 600
        trigger_reduction!(eloop)
        tic[] = time()
    end
end

my_save(r, filename) = write(filename, r)

tic = Ref(time())
addprocs(2)
@everywhere f(x, tsk) = (x .+= tsk; nothing)
@everywhere using Distributed, Schedulers
result,tsks = epmapreduce!(zeros(Float32,10), SchedulerOptions(;reduce_trigger=eloop->my_reduce_trigger(eloop, tic), save_partial_reduction=r->my_save(r, "partial.bin"), f, 1:100)

Note that the methods complete_tasks, pending_tasks, reduced_tasks, and total_tasks can be useful when designing the reduce_trigger method.

source
Schedulers.total_tasksMethod
n = total_tasks(eloop)

Given eloop::ElasticLoop, return the number of total tasks that are being mapped over.

source