Examples

Parallel map

using Distributed

addprocs(5)
@everywhere using Distributed, Schedulers
@everywhere function foo(tsk)
    @info "sleeping for task $tsk on worker $(myid()) for 60 seconds"
    sleep(60)
end

epmap(foo, 1:20)
rmprocs(workers())

Parallel map reduce

The parallel map reduce method epmapreduce! creates memory on each worker process for storing a local reduction. Upon exhaustion of the tasks, the local reductions are reduced into a final reduction. The memory allocated for each worker is dictated by the first argument to epmapreduce! which is also the memory that holds the final reduced result.

using Distributed

addprocs(5)
@everywhere using Distributed, Schedulers
@everywhere function foo(y, tsk)
    y .+= tsk
    @info "sleeping for task $tsk on worker $(myid()) for 60 seconds"
    sleep(60)
end
x,tsks = epmapreduce!(zeros(Float64,5), foo, 1:10) # x=[10,10,10,10,10]

Note that y contains a partial reduction for the tasks assigned to its Julia process. x is the result, and tsks is a list of any tasks that were aborted. In general, we should expect this to be an empty list.

Parallel map reduce with structured data

By default the reduction assumes that the object being reduced is a Julia array. However, this can be customized to use an arbitrary object by specifying the epmap_zeros and epmap_reducer! key-work arguments.

using Distributed

addprocs(5)
@everywhere using Distributed, Schedulers
@everywhere function foo(y, tsk)
    a = y.a
    b = y.b
    a .+= tsk
    b .+= 2*tsk
    nothing
end

@everywhere my_zeros() = (a=zeros(Float64,5), b=zeros(Float64,5))
@everywhere function my_reducer!(x,y)
    x.a .+= y.a
    x.b .+= y.b
    nothing
end
options = SchedulerOptions(zeros = my_zeros, reducer! = my_reducer!)
x,tsks = epmapreduce!(my_zeros(), options, foo, 1:10)

Parameterization

Both epmap and epmapreduce! can be controlled by a parameter-set defined in options::SchedulerOptions, and that can be passed as the first argument to epmap or the second argument to epmapreduce. The parameters that can be set are described in the epmap and epmapreduce! documentation.

Parallel map with elasticity

As an example of parameterizing epmap, we consider allow the compute cluster to grow and shrink during the map. options::SchedulerOptions include options.minworkers and options.maxworkers to control elasticity.

using Distributed

addprocs(10)
@everywhere using Distributed, Schedulers
@everywhere function foo(tsk)
    @info "sleeping for task $tsk on worker $(myid()) for 60 seconds"
    sleep(60)
end

epmap(SchedulerOptions(;minworkers=5, maxworkers=15), foo, 1:20)
rmprocs(workers())

In addition options.quantum and options.addprocs can be used to control how workers are added. Distributed.rmprocs is used to shrink the number of workers. Further, note that the epmap_nworkers method allows you to specify a method that returns the current number of allocated workers. This is useful for cloud computing where there is latency associated with provisioning. See for example the nworkers_provisioned method in AzManagers.jl. As an example of elasticity, the number of workers will shrink when the number of remaining tasks is less than the number of Julia workers, or when a worker is deemed not usable due to fault handling. Note that as epmap elastically adds workers, methods and packages that are defined in Main will automatically be loaded on the added workers.