Distributed computing with Julia and Slurm

Recently I had a project where I had to run several thousand different tests of an algorithm with ranges of different parameter values. Each of these tests takes between 5-40min to run on a single CPU. This is a fairly common scenario, and one that can be quite easily tackled with the massive parallelism offered by distributed computing clusters.

These clusters are often run by universities or labs and are usually shared between many hundreds of users over multiple institutions. In order to manage compute resources and handle job submission and queueing, workload managers such as Slurm and PBS are used. Users can submit jobs to the job queue, and these are then allocated compute nodes and memory to execute their work.

There are two main methods that I have seen for running a large number of tests over a range of parameter tuples on high performance clusters:

1. Submit many individual jobs; each for a single node. Each job essentially acts as a single program on a single CPU. Each individual job can run a single parameter tuple which are often defined through shell scripting and command line arguments. In this case distribution of computation is handled by the HPCs queuing system.
2. Submit a single job which spans many nodes which runs all required parameter sets. Distribution of computation is handled by the job submitter, rather than the queueing system.

The first method is often simpler to set up, but can add some manual complexity by requiring more shell scripting and result collection. By unifying all the computation into a single job it greatly simplifies the required batch script, and allows all the results to be processed in the main language (in my case, Julia).

My solution to this problem is a combination of the RemoteChannel example from the Julia Manual and an example on github by magerton. A full example of my solution can be found in this gist.

The code

Most of the necessary functionality is built into the standard library in the Distributed module for Julia and are detailed in the manual. Another key component is the ClusterManagers library which adds functionality for job queue systems that are usually used on shared high performance compute clusters.

The first step in distributed computing in Julia is adding the worker processes. If working locally this is done with the addprocs function. This starts a Julia worker process for each individual CPU core that can independently run code. The ClusterManagers library adds functionality to start the worker processes on the individual compute nodes (essentially a CPU core) allocated through Slurm (and many other job management systems used on HPCs). The number of workers is equal to the number of tasks requested through sbatch, which we can read directly from the environment variable.

 1 2 3  using Distributed, ClusterManagers addprocs_slurm(parse(Int, ENV["SLURM_NTASKS"])) # addprocs() # if only on a local machine 

Because all the individual workers are independent Julia processes, we need to ensure that all functions that will be doing work are defined and compiled on all workers. This is done with the @everywhere macro. In this case I’m just declaring a single function with @everywhere, however this could also be using a module with @everywhere using JuMP or including a file @everywhere include("workfunctions.jl").1

 1 2 3 4  @everywhere function work_function(a, b, c) exec_time = a * rand() return (val=a*c, string="a, b, c = $a,$b, $c", exec_time) end  To coordinate our workers, we need two queues: One for jobs that still need to be done, and one for results that are ready to be processed. A RemoteChannel is a queue like FIFO data structure. Elements can be added to the queue with put! and removed with take!.  1 2  const jobqueue = RemoteChannel(() -> Channel{Tuple}(32)) const resultsqueue = RemoteChannel(() -> Channel{NamedTuple}(32))  Our workers will repeatedly remove a job from the job queue, execute it, and put the result in the result queue. Calls to take! on a channel will block if the channel is empty, and calls to put! block if the channel is full.  1 2 3 4 5 6 7 8 9  @everywhere function dowork(jobs, results) while true # fn, args... = take!(jobs) # only available in Julia 1.6 job = take!(jobs) # Take the next available job fn, args = job[1], job[2:end] result = eval(fn)(args...) # Run the job put!(results, result) # Send the result end end  Next we need to start the main loop on every worker. Right now they won’t actually be doing any work, because we haven’t submitted anything to the queue.  1 2 3 4  foreach( pid -> remote_do(dowork, pid, jobqueue, resultsqueue), workers() )  In this set up, a job consists of a tuple of the function name and the arguments for that function. A nicer alternative to this would be to have each job be a closure with the function to be executed, however I found sending closures through remote channels did not work2.  1 2 3 4 5 6  jobs = [ (:work_function, 1, 2, 3), (:work_function, 2, 4, 8), (:work_function, 8, 4, 2), (:work_function, 3, 2, 1), ]  Jobs can be submitted for execution by adding them to the jobqueue channel with put!. Because put! blocks if the channel is full, it is important to add jobs to the queue asynchronously to avoid deadlocks.  1 2  submit_job(job) = put!(jobqueue, job) @async foreach(submit_job, jobs)  Once the jobs are submitted, each of our workers will continuously take a job from the queue, execute it, and put the result into the results queue. I’ve found the most convenient form for the results is a NamedTuple, as it is directly compatible with the Tables.jl interface. This means you can directly write it to a CSV file or push it to a DataFrame or database as needed.   1 2 3 4 5 6 7 8 9 10  # Saving to a CSV file n = length(jobs) csvfile = "results.csv" for i in 1:n results = take!(resultsqueue) @info "Obtained result ($i/$n)" pairs(results) # Append to existing CSV file if it already exists. # This ensures we only get a single header CSV.write(csvfile, [results]; append=isfile(csvfile)) end   1 2 3 4 5 6 7 8 9  # Saving to a DataFrame n = length(jobs) d = DataFrame() for i in 1:n results = take!(resultsqueue) @info "Obtained result ($i/\$n)" pairs(results) push!(d, results) end # ... further processing of dataframe 

Finally the workers need to be stopped gracefully.

 1  rmprocs(workers()) 

Submitting the job

The Slurm batch script for this job is very simple, because all the inputs, outputs, and data processing is contained within the Julia script. The following is an example of a run script.

  1 2 3 4 5 6 7 8 9 10  #!/bin/bash #SBATCH --time=3:00 #SBATCH --ntasks=4 # Makes the julia binary available. This will depend on your HPC setup module load julia/1.5 # Run the job with full optimisations. Add in --project="." if you have a # Project/Manifest with dependencies julia --optimize=3 run.jl 

The number of tasks determines the number of worker processes. Optimisation is enabled in running the script, as this can potentially add runtime improvements at the tradeoff of higher initial compilation costs.

The above script (saved as run.sh) can be submitted to Slurm with sbatch run.sh.

Conclusions and future improvements

Using this method I was able to run jobs with up to 48 CPUs (although there is no reason more would not work) over several compute nodes. These jobs were able to maintain a very high average CPU utilisation (~95%). Load is balanced over all the CPU cores throughout the whole duration of the Slurm job.

There are a few improvements that can be made:

• All jobs right now are expected to return the same type of results. If different types of jobs are submitted, the results would need to be handled separately in the results processing loop.
• I have arbitrarily set the length of the jobs and results queue to 32. There may be fewer interruptions if this is increased, especially if the number of workers is large.
• Error handling within jobs is currently not really supported. In my jobs I simply have a try ... catch block within my job functions and the results are set to contain NaN values. This isn’t ideal and a better system of reporting errors would be useful.

Full example scripts for can be found in this gist.

1. Due to world age issues, any work functions (or functions that are called from work functions) need to be defined/included before the main loop for the workers is started. ↩︎

2. I’m not quite sure why, but I would guess it has something to do with compiled code from one worker not being compatible with the other. I also have no idea how closures would be stored in the channel. I wasn’t able to get any meaningful error messages so I settled on sending the function name and arguments. It works fine, although looks a little uglier. ↩︎