Dask

The ability to use Dask for task pool scheduling has been added and can be used by setting use_dask=True in submit_tasks().

You can decide how many nodes to use by setting dask_nodes, one Dask worker will be created on every node, Dask will always use an entire node, using all cores on the node.

The workflow added to IPS using Dask allows for more than just running binary executables, you can run python functions and class methods.

An example showing this is the following, where we are adding an executable (in this case sleep), a function that sleeps (myFun) and a method that sleeps (myMethod) respectively to a task pool and submitting the task pool with self.services.submit_tasks('pool', use_dask=True).

The driver.py in the most simplest form would be

from ipsframework import Component


class Driver(Component):
    def step(self, timestamp=0.0):
        worker_comp = self.services.get_port('WORKER')
        self.services.call(worker_comp, 'step', 0.0)

And the dask_worker.py is

import copy
from time import sleep
from ipsframework import Component


def myFun(*args):
    print(f"myFun({args[0]})")
    sleep(float(args[0]))
    return 0


class DaskWorker(Component):
    def step(self, timestamp=0.0):
        cwd = self.services.get_working_dir()
        self.services.create_task_pool('pool')

        duration = 0.5
        self.services.add_task('pool', 'binary', 1, cwd, self.EXECUTABLE, duration)
        self.services.add_task('pool', 'function', 1, cwd, myFun, duration)
        self.services.add_task('pool', 'method', 1, cwd, copy.copy(self).myMethod, duration)

        ret_val = self.services.submit_tasks('pool',
                                             use_dask=True,
                                             dask_nodes=1)
        print('ret_val =', ret_val)
        exit_status = self.services.get_finished_tasks('pool')
        print('exit_status = ', exit_status)

    def myMethod(self, *args):
        print(f"myMethod({args[0]})")
        sleep(float(args[0]))
        return 0

A simple config to run this is, dask_sim.config

SIM_NAME = dask_example
SIM_ROOT = $PWD
LOG_FILE = log
LOG_LEVEL = INFO
SIMULATION_MODE = NORMAL

[PORTS]
    NAMES = DRIVER WORKER
    [[DRIVER]]
        IMPLEMENTATION = driver

    [[WORKER]]
        IMPLEMENTATION = dask_worker

[driver]
    CLASS = DRIVER
    SUB_CLASS =
    NAME = Driver
    NPROC = 1
    BIN_PATH =
    INPUT_FILES =
    OUTPUT_FILES =
    SCRIPT = $PWD/driver.py

[dask_worker]
    CLASS = DASK_WORKER
    SUB_CLASS =
    NAME = DaskWorker
    NPROC = 1
    BIN_PATH =
    INPUT_FILES =
    OUTPUT_FILES =
    SCRIPT = $PWD/dask_worker.py
    EXECUTABLE = $PWD/sleep

This is executed with ips.py --config dask_sim.config --platform platform.conf and the output shows each different task type executing:

...
ret_val = 3
myFun(0.5)
myMethod(0.5)
/bin/sleep 0.5
exit_status =  {'binary': 0, 'method': 0, 'function': 0}
...

The output simulation log includes the start and end time of each task with in the pool with the elapsed time as expected, a trimmed JSON simulation log is shown:

{
  "code": "DASK_WORKER__DaskWorker",
  "eventtype": "IPS_LAUNCH_DASK_TASK",
  "walltime": "2.33",
  "comment": "task_name = method, Target = myMethod(0.5)",
}
{
  "code": "DASK_WORKER__DaskWorker",
  "eventtype": "IPS_LAUNCH_DASK_TASK",
  "walltime": "2.33",
  "comment": "task_name = function, Target = myFun(0.5)",
}
{
  "code": "DASK_WORKER__DaskWorker",
  "eventtype": "IPS_LAUNCH_DASK_TASK",
  "walltime": "2.33",
  "state": "Running",
  "comment": "task_name = binary, Target = sleep 0.5",
}
{
  "code": "DASK_WORKER__DaskWorker",
  "eventtype": "IPS_TASK_END",
  "walltime": "2.83",
  "comment": "task_name = method, elapsed time = 0.50s",
}
{
  "code": "DASK_WORKER__DaskWorker",
  "eventtype": "IPS_TASK_END",
  "walltime": "2.83",
  "comment": "task_name = function, elapsed time = 0.50s",
}
{
  "code": "DASK_WORKER__DaskWorker",
  "eventtype": "IPS_TASK_END",
  "walltime": "2.85",
  "state": "Running",
  "comment": "task_name = binary, elapsed time = 0.52s",
}

Running dask in shifter

Shifter is a resource for running docker containers on HPC. Documentation can be found here.

An option use_shifter has been added to submit_tasks() that will run the Dask scheduler and workers run inside the shifter container. Additional arguments to be passed to shifter when launching dask (such as –image and –module) can be set by the shifter_args parameters,

You will need to match the versions of Dask within the shifter container to the version running outside. This is because the Dask scheduler and workers run inside the container while IPS has the sk client outside.

As an example would be using the module python/3.8-anaconda-2020.11 and the docker image continuumio/anaconda3:2020.11 which will have the same environment.

You will need to have IPS installed in the conda environment python -m pip install ipsframework. IPS is not required inside the shifter container, only the Dask scheduler and workers are running inside.

To pull down the docker into shifter run:

shifterimg pull continuumio/anaconda3:2020.11

You can entry the shifter container and check it’s contents with:

shifter --image=continuumio/anaconda3:2020.11 /bin/bash

You batch script should then look like:

#!/bin/bash
...
#SBATCH --image=continuumio/anaconda3:2020.11

module load python/3.8-anaconda-2020.11

ips.py --config=ips.conf --platform=platform.conf

Running with worker plugin

There is the ability to set a WorkerPlugin on the dask worker using the dask_worker_plugin option in submit_tasks().

Using a WorkerPlugin in combination with shifter allows you to do things like coping files out of the Temporary XFS file system. An example of that is

from distributed.diagnostics.plugin import WorkerPlugin

class CopyWorkerPlugin(WorkerPlugin):
    def __init__(self, tmp_dir, target_dir):
        self.tmp_dir = tmp_dir
        self.target_dir = target_dir

    def teardown(self, worker):
        os.system(f"cp {self.tmp_dir}/* {self.target_dir}")

class Worker(Component):
    def step(self, timestamp=0.0):
        cwd = self.services.get_working_dir()
        tmp_xfs_dir = '/tmp'

        self.services.create_task_pool('pool')
        self.services.add_task('pool', 'task_1', 1, tmp_xfs_dir, 'executable')

        worker_plugin = CopyWorkerPlugin(tmp_xfs_dir, cwd)

        ret_val = self.services.submit_tasks('pool',
                                             use_dask=True, use_shifter=True,
                                             dask_worker_plugin=worker_plugin)

        exit_status = self.services.get_finished_tasks('pool')

where the batch script has the temporary XFS filesystem mounted as

#SBATCH --volume="/global/cscratch1/sd/$USER/tmpfiles:/tmp:perNodeCache=size=1G"

Continuous Archiving

Another example is a WorkerPlugin that will continuously create a tar archive of the output data at a regular interval while tasks are executing. This is useful should the workflow fail or is canceled before everything is finished. It creates a separate achieve for each node/worker since the temporary XFS filesystem is unique per node. This example creates an archive of all the data in the working directory every 60 seconds and again when everything is finished.

def file_daemon(worker_id, evt, source_dir, target_dir):
    cmd = f"tar -caf {target_dir}/{worker_id}_archive.tar.gz -C {source_dir} ."

    while not evt.wait(60):  # interval which to archive data
        os.system(cmd)

    os.system(cmd)

class ContinuousArchivingWorkerPlugin(WorkerPlugin):
    def __init__(self, tmp_dir, target_dir):
        self.tmp_dir = tmp_dir
        self.target_dir = target_dir

    def setup(self, worker):
        self.evt = Event()
        self.thread = Thread(target=file_daemon, args=(worker.id, self.evt, self.tmp_dir, self.target_dir))
        self.thread.start()

    def teardown(self, worker):
        self.evt.set()  # tells the thread to exit
        self.thread.join()

class Worker(Component):
    def step(self, timestamp=0.0):
        cwd = self.services.get_working_dir()
        tmp_xfs_dir = '/tmp'

        self.services.create_task_pool('pool')
        self.services.add_task('pool', 'task_1', 1, tmp_xfs_dir, 'executable')

        worker_plugin = ContinuousArchivingWorkerPlugin(tmp_xfs_dir, cwd)

        ret_val = self.services.submit_tasks('pool',
                                             use_dask=True, use_shifter=True,
                                             dask_worker_plugin=worker_plugin)

        exit_status = self.services.get_finished_tasks('pool')

where the batch script has the temporary XFS filesystem mounted as

#SBATCH --volume="/global/cscratch1/sd/$USER/tmpfiles:/tmp:perNodeCache=size=1G"