Dask¶
The ability to use Dask for task pool scheduling
has been added and can be used by setting use_dask=True
in
submit_tasks()
.
You can decide how many nodes to use by setting dask_nodes
, one Dask
worker will be created on every node, Dask will always use an entire
node, using all cores on the node.
The workflow added to IPS using Dask allows for more than just running binary executables, you can run python functions and class methods.
An example showing this is the following, where we are adding an
executable (in this case sleep
),
a function that sleeps (myFun
) and a method that sleeps
(myMethod
) respectively to a task pool and submitting the task
pool with self.services.submit_tasks('pool', use_dask=True)
.
The driver.py
in the most
simplest form would be
from ipsframework import Component
class Driver(Component):
def step(self, timestamp=0.0):
worker_comp = self.services.get_port('WORKER')
self.services.call(worker_comp, 'step', 0.0)
And the dask_worker.py
is
import copy
from time import sleep
from ipsframework import Component
def myFun(*args):
print(f"myFun({args[0]})")
sleep(float(args[0]))
return 0
class DaskWorker(Component):
def step(self, timestamp=0.0):
cwd = self.services.get_working_dir()
self.services.create_task_pool('pool')
duration = 0.5
self.services.add_task('pool', 'binary', 1, cwd, self.EXECUTABLE, duration)
self.services.add_task('pool', 'function', 1, cwd, myFun, duration)
self.services.add_task('pool', 'method', 1, cwd, copy.copy(self).myMethod, duration)
ret_val = self.services.submit_tasks('pool',
use_dask=True,
dask_nodes=1)
print('ret_val =', ret_val)
exit_status = self.services.get_finished_tasks('pool')
print('exit_status = ', exit_status)
def myMethod(self, *args):
print(f"myMethod({args[0]})")
sleep(float(args[0]))
return 0
A simple config to run this is, dask_sim.config
SIM_NAME = dask_example
SIM_ROOT = $PWD
LOG_FILE = log
LOG_LEVEL = INFO
SIMULATION_MODE = NORMAL
[PORTS]
NAMES = DRIVER WORKER
[[DRIVER]]
IMPLEMENTATION = driver
[[WORKER]]
IMPLEMENTATION = dask_worker
[driver]
CLASS = DRIVER
SUB_CLASS =
NAME = Driver
NPROC = 1
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
SCRIPT = $PWD/driver.py
[dask_worker]
CLASS = DASK_WORKER
SUB_CLASS =
NAME = DaskWorker
NPROC = 1
BIN_PATH =
INPUT_FILES =
OUTPUT_FILES =
SCRIPT = $PWD/dask_worker.py
EXECUTABLE = $PWD/sleep
This is executed with ips.py --config dask_sim.config --platform
platform.conf
and the output shows each different task type
executing:
...
ret_val = 3
myFun(0.5)
myMethod(0.5)
/bin/sleep 0.5
exit_status = {'binary': 0, 'method': 0, 'function': 0}
...
The output simulation log includes the start and end time of each task with in the pool with the elapsed time as expected, a trimmed JSON simulation log is shown:
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_LAUNCH_DASK_TASK",
"walltime": "2.33",
"comment": "task_name = method, Target = myMethod(0.5)",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_LAUNCH_DASK_TASK",
"walltime": "2.33",
"comment": "task_name = function, Target = myFun(0.5)",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_LAUNCH_DASK_TASK",
"walltime": "2.33",
"state": "Running",
"comment": "task_name = binary, Target = sleep 0.5",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = method, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.83",
"comment": "task_name = function, elapsed time = 0.50s",
}
{
"code": "DASK_WORKER__DaskWorker",
"eventtype": "IPS_TASK_END",
"walltime": "2.85",
"state": "Running",
"comment": "task_name = binary, elapsed time = 0.52s",
}
Running dask in shifter¶
Shifter is a resource for running docker containers on HPC. Documentation can be found here.
An option use_shifter has been added to
submit_tasks()
that will
run the Dask scheduler and workers run inside the shifter
container. Additional arguments to be passed to shifter when launching
dask (such as –image and –module) can be set by the
shifter_args parameters,
You will need to match the versions of Dask within the shifter container to the version running outside. This is because the Dask scheduler and workers run inside the container while IPS has the sk client outside.
As an example would be using the module
python/3.8-anaconda-2020.11
and the docker image
continuumio/anaconda3:2020.11
which will have the same
environment.
You will need to have IPS installed in the conda environment
python -m pip install ipsframework
. IPS is not required inside the
shifter container, only the Dask scheduler and workers are running
inside.
To pull down the docker into shifter run:
shifterimg pull continuumio/anaconda3:2020.11
You can entry the shifter container and check it’s contents with:
shifter --image=continuumio/anaconda3:2020.11 /bin/bash
You batch script should then look like:
#!/bin/bash
...
#SBATCH --image=continuumio/anaconda3:2020.11
module load python/3.8-anaconda-2020.11
ips.py --config=ips.conf --platform=platform.conf
Running with worker plugin¶
There is the ability to set a
WorkerPlugin
on the dask
worker using the dask_worker_plugin option in
submit_tasks()
.
Using a WorkerPlugin in combination with shifter allows you to do things like coping files out of the Temporary XFS file system. An example of that is
from distributed.diagnostics.plugin import WorkerPlugin
class CopyWorkerPlugin(WorkerPlugin):
def __init__(self, tmp_dir, target_dir):
self.tmp_dir = tmp_dir
self.target_dir = target_dir
def teardown(self, worker):
os.system(f"cp {self.tmp_dir}/* {self.target_dir}")
class Worker(Component):
def step(self, timestamp=0.0):
cwd = self.services.get_working_dir()
tmp_xfs_dir = '/tmp'
self.services.create_task_pool('pool')
self.services.add_task('pool', 'task_1', 1, tmp_xfs_dir, 'executable')
worker_plugin = CopyWorkerPlugin(tmp_xfs_dir, cwd)
ret_val = self.services.submit_tasks('pool',
use_dask=True, use_shifter=True,
dask_worker_plugin=worker_plugin)
exit_status = self.services.get_finished_tasks('pool')
where the batch script has the temporary XFS filesystem mounted as
#SBATCH --volume="/global/cscratch1/sd/$USER/tmpfiles:/tmp:perNodeCache=size=1G"
Continuous Archiving¶
Another example is a WorkerPlugin that will continuously create a tar archive of the output data at a regular interval while tasks are executing. This is useful should the workflow fail or is canceled before everything is finished. It creates a separate achieve for each node/worker since the temporary XFS filesystem is unique per node. This example creates an archive of all the data in the working directory every 60 seconds and again when everything is finished.
def file_daemon(worker_id, evt, source_dir, target_dir):
cmd = f"tar -caf {target_dir}/{worker_id}_archive.tar.gz -C {source_dir} ."
while not evt.wait(60): # interval which to archive data
os.system(cmd)
os.system(cmd)
class ContinuousArchivingWorkerPlugin(WorkerPlugin):
def __init__(self, tmp_dir, target_dir):
self.tmp_dir = tmp_dir
self.target_dir = target_dir
def setup(self, worker):
self.evt = Event()
self.thread = Thread(target=file_daemon, args=(worker.id, self.evt, self.tmp_dir, self.target_dir))
self.thread.start()
def teardown(self, worker):
self.evt.set() # tells the thread to exit
self.thread.join()
class Worker(Component):
def step(self, timestamp=0.0):
cwd = self.services.get_working_dir()
tmp_xfs_dir = '/tmp'
self.services.create_task_pool('pool')
self.services.add_task('pool', 'task_1', 1, tmp_xfs_dir, 'executable')
worker_plugin = ContinuousArchivingWorkerPlugin(tmp_xfs_dir, cwd)
ret_val = self.services.submit_tasks('pool',
use_dask=True, use_shifter=True,
dask_worker_plugin=worker_plugin)
exit_status = self.services.get_finished_tasks('pool')
where the batch script has the temporary XFS filesystem mounted as
#SBATCH --volume="/global/cscratch1/sd/$USER/tmpfiles:/tmp:perNodeCache=size=1G"