Code Listings

IPS

The Integrated Plasma Simulator (IPS) Framework. This framework enables loose, file-based coupling of certain class of nuclear fusion simulation codes.

For further design information see

  • Wael Elwasif, David E. Bernholdt, Aniruddha G. Shet, Samantha S. Foley, Randall Bramley, Donald B. Batchelor, and Lee A. Berry, The Design and Implementation of the SWIM Integrated Plasma Simulator, in The 18th Euromirco International Conference on Parallel, Distributed and Network - Based Computing (PDP 2010), 2010.

  • Samantha S. Foley, Wael R. Elwasif, David E. Bernholdt, Aniruddha G. Shet, and Randall Bramley, Extending the Concept of Component Interfaces: Experience with the Integrated Plasma Simulator, in Component - Based High - Performance Computing (CBHPC) 2009, 2009, (extended abstract).

  • D Batchelor, G Alba, E D’Azevedo, G Bateman, DE Bernholdt, L Berry, P Bonoli, R Bramley, J Breslau, M Chance, J Chen, M Choi, W Elwasif, S Foley, G Fu, R Harvey, E Jaeger, S Jardin, T Jenkins, D Keyes, S Klasky, S Kruger, L Ku, V Lynch, D McCune, J Ramos, D Schissel, D Schnack, and J Wright, Advances in Simulation of Wave Interactions with Extended MHD Phenomena, in Horst Simon, editor, SciDAC 2009, 14-18 June 2009, San Diego, California, USA, volume 180 of Journal of Physics: Conference Series, page 012054, Institute of Physics, 2009, 6pp.

  • Samantha S. Foley, Wael R. Elwasif, Aniruddha G. Shet, David E. Bernholdt, and Randall Bramley, Incorporating Concurrent Component Execution in Loosely Coupled Integrated Fusion Plasma Simulation, in Component-Based High-Performance Computing (CBHPC) 2008, 2008, (extended abstract).

  • D. Batchelor, C. Alba, G. Bateman, D. Bernholdt, L. Berry, P. Bonoli, R. Bramley, J. Breslau, M. Chance, J. Chen, M. Choi, W. Elwasif, G. Fu, R. Harvey, E. Jaeger, S. Jardin, T. Jenkins, D. Keyes, S. Klasky, S. Kruger, L. Ku, V. Lynch, D. McCune, J. Ramos, D. Schissel, D. Schnack, and J. Wright, Simulation of Wave Interactions with MHD, in Rick Stevens, editor, SciDAC 2008, 14-17 July 2008, Washington, USA, volume 125 of Journal of Physics: Conference Series, page 012039, Institute of Physics, 2008.

  • Wael R. Elwasif, David E. Bernholdt, Lee A. Berry, and Don B. Batchelor, Component Framework for Coupled Integrated Fusion Plasma Simulation, in HPC-GECO/CompFrame 2007, 21-22 October, Montreal, Quebec, Canada, 2007.

Authors

Wael R. Elwasif, Samantha Foley, Aniruddha G. Shet

Organization

Center for Simulation of RF Wave Interactions with Magnetohydrodynamics

Framework

class ipsframework.ips.Framework(config_file_list, log_file_name, platform_file_name=None, debug=False, verbose_debug=False, cmd_nodes=0, cmd_ppn=0)

Create an IPS Framework Instance to coordinate the execution of IPS simulations

The Framework performs the following main tasks:

  • Initialize the different IPS managers that perform the bulk of the framework functionality

  • Manage communication queues, and route service requests from simulation components to appropriate managers.

  • Provide logging services to IPS managers.

  • Perform shutdown procedure on exit

Parameters
  • config_file_list (list) –

    A list of simulation configuration files to be used in the simulaion. Each simulation configuration file must have the following parameters

    • SIM_ROOT The root directory for the simulation

    • SIM_NAME A name that identifies the simulation

    • LOG_FILE The name of a log file that is used to capture logging and error information for this simulation.

    SIM_ROOT, SIM_NAME, and LOG_FILE must be unique across simulations.

  • log_file_name (str) – A file name where Framework logging messages are placed.

  • platform_file_name (str) – The name of the platform configuration file used in the simulation. If not specified it will try to find the one installed in the share directory.

  • debug (bool) – A flag indicating whether framework debugging messages are enabled (default = False)

  • verbose_debug (bool) – A flag adding more verbose framework debugging (default = False)

  • cmd_nodes (int) – Computer nodes (default = 0)

  • cmd_ppn (int) – Computer processor per nodes (default = 0)

critical(msg, *args)

Produce critical message in simulation log file. See logging.critical() for usage.

debug(msg, *args)

Produce debugging message in simulation log file. See logging.debug() for usage.

error(msg, *args)

Produce error message in simulation log file. See logging.error() for usage.

exception(msg, *args)

Produce exception message in simulation log file. See logging.exception() for usage.

get_inq()
Returns

handle to the Framework’s input queue object

Return type

multiprocessing.Queue

info(msg, *args)

Produce informational message in simulation log file. See logging.info() for usage.

initiate_new_simulation(sim_name)

This is to be called by the configuration manager as part of dynamically creating a new simulation. The purpose here is to initiate the method invocations for the framework-visible components in the new simulation

log(msg, *args)

Wrapper for Framework.info().

register_service_handler(service_list, handler)

Register a call back method to handle a list of framework service invocations.

Parameters
  • service_list – a list of service names to call handler when invoked by components. The service name must match the target_method parameter in messages.ServiceRequestMessage.

  • handler – a Python callable object that takes a messages.ServiceRequestMessage.

run()

Run the communication outer loop of the framework.

This method implements the core communication and message dispatch functionality of the framework. The main phases of execution for the framework are:

  1. Invoke the init method on all framework-attached components, blocking pending method call termination.

  2. Generate method invocation messages for the remaining public method in the framework-centric components (i.e. step and finalize).

  3. Generate a queue of method invocation messages for all public framework accessible components in the simulations being run. framework-accessible components are made up of the Init component (if is exists), and the Driver component. The generated messages invoke the public methods init, step, and finalize.

  4. Dispatch method invocations for each framework-centric component and physics simulation in order.

Exceptions that propagate to this method from the managed simulations causes the framework to abort any pending method invocation for the source simulation. Exceptions from framework-centeric component aborts further invocations to that component.

When all method invocations have been dispatched (or aborted), configurationManager.ConfigurationManager.terminate_sim() is called to trigger normal termination of all component processes.

Returns

Success status

Return type

bool

send_terminate_msg(sim_name, status=0)

This method remotely invokes the method component.Component.terminate() on all componnets in the IPS simulation sim_name.

Parameters
  • sim_name (str) – The simulation name from which all the components are terminated

  • status (messages.Message.SUCCESS, messages.Message.FAILURE) – message status, defaults to messages.Message.SUCCESS

terminate_all_sims(status=0)

Terminate all active component instances.

This method remotely invokes the method component.Component.terminate() on all componnets in the IPS simulation.

Parameters

status (messages.Message.SUCCESS, messages.Message.FAILURE) – message status, defaults to messages.Message.SUCCESS

warning(msg, *args)

Produce warning message in simulation log file. See logging.warning() for usage.

Data Manager

class ipsframework.dataManager.DataManager(fwk)

The data manager facilitates the movement and exchange of data files for the simulation.

merge_current_plasma_state(msg)

Merge partial plasma state file with global master. Newly updated plasma state copied to caller’s workdir. Exception raised on copy error.

msg.args:

  1. partial_state_file

  2. target_state_file

  3. log_file: stdout for merge process if not None

process_service_request(msg)

Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.

stage_state(msg)

Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.

msg.args:

  1. state_files

  2. source_dir

  3. target_dir

update_state(msg)

Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.

msg.args:

  1. state_files

  2. source_dir

  3. target_dir

Task Manager

class ipsframework.taskManager.TaskInit(nproc, binary, working_dir, tppn, tcpp, tgpp, block, omp, wnodes, wsocks, cmd_args, launch_cmd_extra_args)
binary

Alias for field number 1

block

Alias for field number 6

cmd_args

Alias for field number 10

launch_cmd_extra_args

Alias for field number 11

nproc

Alias for field number 0

omp

Alias for field number 7

tcpp

Alias for field number 4

tgpp

Alias for field number 5

tppn

Alias for field number 3

wnodes

Alias for field number 8

working_dir

Alias for field number 2

wsocks

Alias for field number 9

class ipsframework.taskManager.TaskManager(fwk)

The task manager is responsible for facilitating component method invocations, and the launching of tasks.

build_launch_cmd(nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, task_id, cpp=0, omp=False, gpp=0, core_list='', launch_cmd_extra_args=None)

Construct task launch command to be executed by the component.

  • nproc - number of processes to use

  • binary - binary to launch

  • cmd_args - additional command line arguments for the binary

  • working_dir - full path to where the executable will be launched

  • ppn - processes per node value to use

  • max_ppn - maximum possible ppn for this allocation

  • nodes - comma separated list of node ids

  • accurateNodes - if True, launch on nodes in nodes, otherwise the parallel launcher determines the process placement

  • partial_nodes - if True and accurateNodes and task_launch_cmd == ‘mpirun’,

    a host file is created specifying the exact placement of processes on cores.

  • core_list - used for creating host file with process to core mappings

finish_task(finish_task_msg)

Cleanup after a task launched by a component terminates

finish_task_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. task_id: task id of finished task

  2. task_data: return code of task

get_call_id()

Return a new call id

get_task_id()

Return a new task id

init_call(init_call_msg, manage_return=True)

Creates and sends a messages.MethodInvokeMessage from the calling component to the target component. If manage_return is True, a record is added to outstanding_calls. Return call id.

Message args:

  1. method_name

  2. + arguments to be passed on as method arguments.

init_task(init_task_msg)

Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component. Return launch command to component via messages.ServiceResponseMessage. Raise exception if task can not be launched at this time (ipsExceptions.BadResourceRequestException, ipsExceptions.InsufficientResourcesException).

init_task_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. nproc: number of processes the task needs

  2. binary: full path to the executable to launch

# SIMYAN: added this to deal with the component directory change 2. working_dir: full path to directory where the task will be launched

  1. tppn: processes per node for this task. (0 indicates that the default ppn is used.)

  2. block: whether or not to wait until the task can be launched.

  3. wnodes: True for whole node allocation, False otherwise.

  4. wsocks: True for whole socket allocation, False otherwise.

  5. + cmd_args: any arguments for the executable

init_task_pool(init_task_msg)

Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component.

init_task_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. task_dict: dictionary of task names and objects

initialize(data_mgr, resource_mgr, config_mgr)

Initialize references to other managers and key values from configuration manager.

printCurrTaskTable()

Prints the task table pretty-like.

process_service_request(msg)

Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.

return_call(response_msg)

Handle the response message generated by a component in response to a method invocation on that component.

reponse_msg is expected to be of type messages.MethodResultMessage

wait_call(wait_msg)

Determine if the call has finished. If finished, return any data or errors. If not finished raise the appropriate blocking or nonblocking exception and try again later.

wait_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. call_id: call id for which to wait

  2. blocking: determines the wait is blocking or not

Resource Manager

class ipsframework.resourceManager.Allocation(partial_node, nodelist, corelist, ppn, max_ppn, cpp, accurateNodes, cores_allocated)
accurateNodes

Alias for field number 6

corelist

Alias for field number 2

cores_allocated

Alias for field number 7

cpp

Alias for field number 5

max_ppn

Alias for field number 4

nodelist

Alias for field number 1

partial_node

Alias for field number 0

ppn

Alias for field number 3

class ipsframework.resourceManager.ResourceManager(fwk)

The resource manager is responsible for detecting the resources allocated to the framework, allocating resources to task requests, and maintaining the associated bookkeeping.

add_nodes(listOfNodes)

Add node entries to self.nodes. Typically used by initialize() to initialize self.nodes. May be used to add nodes to a dynamic allocation in the future.

listOfNodes is a list of tuples (node name, cores). self.nodes is a dictionary where the keys are the node names and the values are node_structure.Node structures.

Return total number of cores.

begin_RM_report()

Print header information for resource usage reporting file.

check_core_cap(nproc, ppn)

Determine if it is currently possible to allocate nproc processes with a ppn of ppn without further restrictions.. Return True and list of nodes to use if successful. Return False and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.

check_gpus(ppn, task_gpp)
check_whole_node_cap(nproc, ppn)

Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole nodes. Return True and list of nodes to use if successful. Return False and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.

check_whole_sock_cap(nproc, ppn)

Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole sockets. Return True and list of nodes to use if successful. Return False and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.

get_allocation(comp_id, nproc, task_id, whole_nodes, whole_socks, task_ppn=0, task_cpp=0, task_gpp=0)

Traverse available nodes to return:

If whole_nodes is True:

  • shared_nodes: False

  • nodes: list of node names

  • ppn: processes per node for launching the task

  • max_ppn: processes that can be launched

  • accurateNodes: True if nodes uses the actual names of the nodes, False otherwise.

If whole_nodes is False:

  • shared_nodes: True

  • nodes: list of node names

  • node_file_entries: list of (node, corelist) tuples, where corelist is a list of core names.

    Core names are integers from 0 to n-1 where n is the number of cores on a node.

  • ppn: processes per node for launching the task

  • max_ppn: processes that can be launched

  • accurateNodes: True if nodes uses the actual names of the nodes, False otherwise.

Arguments:

  • nproc: the number of requested processes (int)

  • comp_id: component identifier, must be unique with respect to the framework (string)

  • task_id: task identifier from TM (int)

  • method: name of method (string)

  • task_ppn: ppn for this task (optional) (int)

initialize(dataMngr, taskMngr, configMngr, cmd_nodes=0, cmd_ppn=0)

Initialize resource management structures, references to other managers (dataMngr, taskMngr, configMngr).

Resource information comes from the following in order of priority:

  • command line specification (cmd_nodes, cmd_ppn)

  • detection using parameters from platform config file

  • manual settings from platform config file

The second two sources are obtained through resourceHelper.getResourceList().

printRMState()

Print the node tree to stdout.

process_service_request(msg)
release_allocation(task_id, status)

Set resources allocated to task task_id to available. status is not used, but may be used to correlate resource failures to task failures and implement task relaunch strategies.

report_RM_status(notes='')

Print current RM status to the reporting_file (“resource_usage”) Entries consist of:

  • time in seconds since beginning of time (__init__ of RM)

  • # cores that are available

  • # cores that are allocated

  • % allocated cores

  • # processes launched by task

  • % cores used by processes

  • notes (a description of the event that changed the resource usage)

sendEvent(eventName, info)

wrapper for constructing and publishing EM events


class ipsframework.node_structure.Node(name, socks, cores, p)

Models a node in the allocation.

  • name: name of node, typically actual name from resource detection phase.

  • task_ids, owners: identifiers for the tasks and components that are currently using the node.

  • allocated, available: list of sockets that have cores allocated and available. A socket may appear in both lists if it is only partially allocated.

  • sockets: list of sockets belonging to this node

  • avail_cores: number of cores that are currently available.

  • total_cores: total number of cores that can be allocated on this node.

  • status: indicates if the node is ‘UP’ or ‘DOWN’. Currently not used, all nodes are considered functional..

allocate(whole_nodes, whole_sockets, tid, o, procs)

Mark procs number of cores as allocated subject to the values of whole_nodes and whole_sockets. Return the number of cores allocated and their corresponding slots, a list of strings of the form:

<socket name>:<core name>

print_sockets(fname='')

Pretty print of state of sockets.

release(tid, o)

Mark cores used by task tid and component o as available. Return the number of cores released.

class ipsframework.node_structure.Socket(name, cps, coreids=())

Models a socket in a node.

  • name: identifier for the socket

  • task_ids, owners: identifiers for the tasks and components that are currently using the socket.

  • allocated, available: lists of cores that are allocated and available.

  • cores: list of Core objects belonging to this socket

  • avail_cores: number of cores that are currently available.

  • total_cores: total number of cores that can be allocated on this socket.

allocate(whole, tid, o, num_procs)

Mark num_procs cores as allocated subject to the value of whole. Return a list of strings of the form:

<socket name>:<core name>

print_cores(fname='')

Pretty print of state of cores.

release(tid)

Mark cores that are allocated to task tid as available. Return number of cores set to available.

class ipsframework.node_structure.Core(name)

Models a core of a socket.

  • name: name of core

  • is_available: boolean value indicating the availability of the core.

  • task_id, owner: identifiers of the task and component using the core.

allocate(tid, o)

Mark core as allocated.

release()

Mark core as available.


The Resource Helper file contains all of the code needed to figure out what host we are on and what resources we have. Taking this out of the resource manager will allow us to test it independent of the IPS.

ipsframework.resourceHelper.getResourceList(services, host, partial_nodes=False)

Using the host information, the resources are detected. Return list of (<node name>, <processes per node>), cores per node, sockets per node, processes per node, and True if the node names are accurate, False otherwise.

ipsframework.resourceHelper.get_checkjob_info()
ipsframework.resourceHelper.get_pbs_info()

Access info about allocation from PBS environment variables:

PBS_NNODES PBS_NODEFILE

ipsframework.resourceHelper.get_qstat_jobinfo()

Use qstat -f $PBS_JOBID to get the number of nodes and ppn of the allocation. Typically works on PBS systems.

ipsframework.resourceHelper.get_qstat_jobinfo2()

A second way to use qstat -f $PBS_JOBID to get the number of nodes and ppn of the allocation. Typically works on PBS systems.

ipsframework.resourceHelper.get_slurm_info()

Access environment variables set by Slurm to get the node names, tasks per node and number of processes.

SLURM_NODELIST SLURM_TASKS_PER_NODE or SLURM_JOB_TASKS_PER_NODE SLURM_NPROC

ipsframework.resourceHelper.manual_detection(services)

Use values listed in platform configuration file.

Component

IPS Framework Component

class ipsframework.component.Component(services, config)

Base class for all IPS components. Common set up, connection and invocation actions are implemented here.

Parameters
  • services (ServicesProxy) – service proxy to communicate with framework

  • config (dict) – configuration dictionary for this component

property args
property call_id
checkpoint(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

property component_id
property config
finalize(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

init(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

property method_name
restart(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

property services
property start_time
step(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

terminate(status)

Clean up services and call sys_exit.

Component Registry

class ipsframework.componentRegistry.ComponentID(class_name, sim_name)

Object to facilitate the creation, serialization and deserialization of component ids.

all_ids = {}
delimiter = '@'
static deserialize(comp_id_string)

Return the deserialized version of the component id.

get_class_name()

Return class name of component.

get_instance_name()

Return instance name of component id.

get_seq_num()

Return sequence number of component.

get_serialization()

Return serialization.

get_sim_name()

Return simulation name for the component.

seq_num = 0
class ipsframework.componentRegistry.ComponentRegistry(*args, **kwargs)
class RegistryEntry(svc_response_q, invocation_q, component_ref, services, config)

Container for queues and references associated with a component.

addEntry(component_id, svc_response_q, invocation_q, component_ref, services, config)

Create a component registry entry for component_id and its associated queues, component ref, services and configuration information.

getComponentArtifact(component_id, artifact)

Return value of artifact in component_id’s registry entry.

getEntry(component_id)

Return a registry entry.

get_component_ids(sim_name)

Return all of the component ids associated with sim sim_name

removeEntry(component_id)
setComponentArtifact(component_id, artifact, value)

Set the value of artifact in component_id’s registry entry to value.

class ipsframework.componentRegistry.SingletonMeta

Configuration Manager

class ipsframework.configurationManager.ConfigurationManager(fwk, config_file_list, platform_file_name)

The configuration manager is responsible for paring the simulation and platform configuration files, creating the framework and simulation components, as well as providing an interface to accessing items from the configuration files (e.g., the time loop).

class SimulationData(sim_name, start_time=1674857500.5921588)

Structure to hold simulation data stored into the sim_map entry in the configurationManager class

create_simulation(sim_name, config_file, override, sub_workflow=False)
get_all_simulation_components_map()
get_all_simulation_sim_root()
get_component_map()

Return a dictionary of simulation names and lists of component references. (May only be the driver, and init (if present)???)

get_config_parameter(sim_name, param)

Return value of param from simulation configuration file for sim_name.

get_framework_components()

Return list of framework components.

get_platform_parameter(param, silent=False)

Return value of platform parameter param. If silent is False (default) None is returned when param not found, otherwise an exception is raised.

get_port(sim_name, port_name)

Return a reference to the component from simulation sim_name implementing port port_name.

get_sim_names()

Return list of names of simulations.

get_sim_parameter(sim_name, param)

Return value of param from simulation configuration file for sim_name.

get_simulation_components(sim_name)
initialize(data_mgr, resource_mgr, task_mgr)

Parse the platform and simulation configuration files using the ConfigObj module. Create and initialize simulation(s) and their components, framework components and loggers.

process_service_request(msg)

Invokes public configuration manager method for a component. Return method’s return value.

set_config_parameter(sim_name, param, value, target_sim_name)

Set the configuration parameter param to value value in target_sim_name. If target_sim_name is the framework, all simulations will get the change. Return value.

terminate(status)

Terminates all processes attached to the framework. status not used.

terminate_sim(sim_name)

Services

IPS Services

class ipsframework.services.RunningTask(process, start_time, timeout, nproc, cores_allocated, command, binary, args)
args

Alias for field number 7

binary

Alias for field number 6

command

Alias for field number 5

cores_allocated

Alias for field number 4

nproc

Alias for field number 3

process

Alias for field number 0

start_time

Alias for field number 1

timeout

Alias for field number 2

class ipsframework.services.ServicesProxy(fwk, fwk_in_q, svc_response_q, sim_conf, log_pipe_name)

The ServicesProxy object is responsible for marshalling invocations of framework services to the framework process using a shared queue. The queue is shared among all components in a simulation. The results from framework services invocations are received via another, component-specific “framework response” queue.

Create a new ServicesProxy object

Parameters
  • fwk (ipsframework.ips.Framework) – Enclosing IPS simulation framework

  • fwk_in_q (multiprocessing.Queue) – Framework input message queue - shared among all service objects

  • svc_response_q (multiprocessing.Queue) – Service response message queue - one per service object.

  • sim_conf (dict) – Simulation configuration dictionary, contains data from the simulation configuration file merged with the platform configuration file.

  • log_pipe_name (str) – Name of logging pipe for use by the IPS logging daemon.

add_task(task_pool_name, task_name, nproc, working_dir, binary, *args, **keywords)

Add task task_name to task pool task_pool_name. Remaining arguments are the same as in ServicesProxy.launch_task().

call(component_id, method_name, *args, **keywords)

Invoke method method_name on component component_id with optional arguments *args. Will wait until call is finished. Return result from invoking the method.

Parameters
  • component_id (ComponentID) – Component ID of requested component

  • method_name (str) – component method to call, e.g. init or step

Returns

service response message arguments

call_nonblocking(component_id, method_name, *args, **keywords)

Invoke method method_name on component component_id with optional arguments *args. Will not wait until finished.

Parameters
  • component_id (ComponentID) – Component ID of requested component

  • method_name (str) – component method to call, e.g. init or step

Returns

call_id

Return type

int

checkpoint_components(comp_id_list, time_stamp, Force=False, Protect=False)

Selectively checkpoint components in comp_id_list based on the configuration section CHECKPOINT. If Force is True, the checkpoint will be taken even if the conditions for taking the checkpoint are not met. If Protect is True, then the data from the checkpoint is protected from clean up. Force and Protect are optional and default to False.

The CHECKPOINT_MODE option controls determines if the components checkpoint methods are invoked.

Possible MODE options are:

ALL:

Checkpint every time the call is made (equivalent to always setting Force =True)

WALLTIME_REGULAR:

checkpoints are saved upon invocation of the service call checkpoint_components(), when a time interval greater than, or equal to, the value of the configuration parameter WALLTIME_INTERVAL had passed since the last checkpoint. A checkpoint is assumed to have happened (but not actually stored) when the simulation starts. Calls to checkpoint_components() before WALLTIME_INTERVAL seconds have passed since the last successful checkpoint result in a NOOP.

WALLTIME_EXPLICIT:

checkpoints are saved when the simulation wall clock time exceeds one of the (ordered) list of time values (in seconds) specified in the variable WALLTIME_VALUES. Let [t_0, t_1, …, t_n] be the list of wall clock time values specified in the configuration parameter WALLTIME_VALUES. Then checkpoint(T) = True if T >= t_j, for some j in [0,n] and there is no other time T_1, with T > T_1 >= T_j such that checkpoint(T_1) = True. If the test fails, the call results in a NOOP.

PHYSTIME_REGULAR:

checkpoints are saved at regularly spaced “physics time” intervals, specified in the configuration parameter PHYSTIME_INTERVAL. Let PHYSTIME_INTERVAL = PTI, and the physics time stamp argument in the call to checkpoint_components() be pts_i, with i = 0, 1, 2, … Then checkpoint(pts_i) = True if pts_i >= n PTI , for some n in 1, 2, 3, … and pts_i - pts_prev >= PTI, where checkpoint(pts_prev) = True and pts_prev = max (pts_0, pts_1, ..pts_i-1). If the test fails, the call results in a NOOP.

PHYSTIME_EXPLICIT:

checkpoints are saved when the physics time equals or exceeds one of the (ordered) list of physics time values (in seconds) specified in the variable PHYSTIME_VALUES. Let [pt_0, pt_1, …, pt_n] be the list of physics time values specified in the configuration parameter PHYSTIME_VALUES. Then checkpoint(pt) = True if pt >= pt_j, for some j in [0,n] and there is no other physics time pt_k, with pt > pt_k >= pt_j such that checkpoint(pt_k) = True. If the test fails, the call results in a NOOP.

The configuration parameter NUM_CHECKPOINT controls how many checkpoints to keep on disk. Checkpoints are deleted in a FIFO manner, based on their creation time. Possible values of NUM_CHECKPOINT are:

  • NUM_CHECKPOINT = n, with n > 0 –> Keep the most recent n checkpoints

  • NUM_CHECKPOINT = 0 –> No checkpoints are made/kept (except when Force = True)

  • NUM_CHECKPOINT < 0 –> Keep ALL checkpoints

Checkpoints are saved in the directory ${SIM_ROOT}/restart

cleanup()

Clean up any state from the services. Called by the terminate method in the base class for components.

create_simulation(config_file, override)

Create simulation

create_sub_workflow(sub_name, config_file, override=None, input_dir=None)

Create sub-workflow

create_task_pool(task_pool_name)

Create an empty pool of tasks with the name task_pool_name. Raise exception if duplicate name.

critical(msg, *args)

Produce critical message in simulation log file. See logging.critical() for usage.

debug(msg, *args)

Produce debugging message in simulation log file. See logging.debug() for usage.

error(msg, *args)

Produce error message in simulation log file. See logging.error() for usage.

exception(msg, *args)

Produce exception message in simulation log file. See logging.exception() for usage.

get_config_param(param, silent=False)

Return the value of the configuration parameter param. Raise exception if not found and silent is False.

Parameters
  • param (str) – The parameter requested from simulation config

  • silent (bool) – If True and parameter isn’t found then exception is not raised, default False

Returns

dictionary of given parameter from configuration

Return type

dict

get_finished_tasks(task_pool_name)

Return dictionary of finished tasks and return values in task pool task_pool_name. Raise exception if no active or finished tasks.

get_port(port_name)
Parameters

port_name (str) – port name

Returns

Return a reference to the component implementing port port_name.

Return type

ipsframework.componentRegistry.ComponentID

get_restart_files(restart_root, timeStamp, file_list)

Copy files needed for component restart from the restart directory:

<restart_root>/restart/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}

to the component’s work directory.

Copying errors are not fatal (exception raised).

get_time_loop()

Return the list of times as specified in the configuration file.

Returns

list of times

Return type

list of float

get_working_dir()

Return the working directory of the calling component.

The structure of the working directory is defined using the configuration parameters CLASS, SUB_CLASS, and NAME of the component configuration section. The structure of the working directory is:

${SIM_ROOT}/work/$CLASS_${SUB_CLASS}_$NAME_<instance_num>
Returns

working directory

Return type

str

info(msg, *args)

Produce informational message in simulation log file. See logging.info() for usage.

kill_all_tasks()

Kill all tasks associated with this component.

kill_task(task_id)

Kill launched task task_id. Return if successful. Raises exceptions if the task or process cannot be found or killed successfully.

Parameters

task_id (int) – task ID

Returns

if successfully killed

Return type

bool

launch_task(nproc, working_dir, binary, *args, **keywords)

Launch binary in working_dir on nproc processes. *args are any arguments to be passed to the binary on the command line. **keywords are any keyword arguments used by the framework to manage how the binary is launched. Keywords may be the following:

  • task_ppn : the processes per node value for this task

  • task_cpp : the cores per process, only used when MPIRUN=srun commands

  • task_gpp : the gpus per process, only used when MPIRUN=srun commands

  • ompIf True the task will be launch with the correct OpenMP environment

    variables set, only used when MPIRUN=srun

  • block : specifies that this task will block (or raise an exception) if not enough resources are available to run immediately. If True, the task will be retried until it runs. If False, an exception is raised indicating that there are not enough resources, but it is possible to eventually run. (default = True)

  • tag : identifier for the portal. May be used to group related tasks.

  • logfile : file name for stdout (and stderr) to be redirected to for this task. By default stderr is redirected to stdout, and stdout is not redirected.

  • whole_nodes : if True, the task will be given exclusive access to any nodes it is assigned. If False, the task may be assigned nodes that other tasks are using or may use.

  • whole_sockets : if True, the task will be given exclusive access to any sockets of nodes it is assigned. If False, the task may be assigned sockets that other tasks are using or may use.

  • launch_cmd_extra_args : extra command arguments added the the MPIRUN command

Return task_id if successful. May raise exceptions related to opening the logfile, being unable to obtain enough resources to launch the task (InsufficientResourcesException), bad task launch request (ResourceRequestMismatchException, BadResourceRequestException) or problems executing the command. These exceptions may be used to retry launching the task as appropriate.

Note

This is a nonblocking function, users must use a version of ServicesProxy.wait_task() to get result.

Parameters
  • nproc (int) – number of processes

  • working_dir (str) – change to this directory before launching task

  • binary (str) – command to execute, can include arguments or can be pass in with *args

Returns

task_id (PID)

Return type

int

launch_task_pool(task_pool_name, launch_interval=0.0)

Construct messages to task manager to launch each task in task pool. Used by TaskPool to launch tasks in a task_pool.

Parameters
  • task_pool_name (str) – name of task pool

  • launch_internal (float) – time to wait between launching tasks, default 0.0

Returns

activate task, dictionary mapping task_name to task_id

Return type

dict

log(msg, *args)

Wrapper for ServicesProxy.info().

merge_current_state(partial_state_file, logfile=None, merge_binary=None)

Merge partial plasma state with global state. Partial plasma state contains only the values that the component contributes to the simulation. Raise exceptions on bad merge. Optional logfile will capture stdout from merge. Optional merge_binary specifies path to executable code to do the merge (default value : “update_state”)

process_events()

Poll for events on subscribed topics.

publish(topicName, eventName, eventBody)

Publish event consisting of eventName and eventBody to topic topicName to the IPS event service.

remove_task_pool(task_pool_name)

Kill all running tasks, clean up all finished tasks, and delete task pool.

save_restart_files(timeStamp, file_list)

Copy files needed for component restart to the restart directory:

${SIM_ROOT}/restart/$timestamp/components/$CLASS_${SUB_CLASS}_$NAME

Copying errors are not fatal (exception raised).

send_portal_event(event_type='COMPONENT_EVENT', event_comment='', event_time=None, elapsed_time=None)

Send event to web portal.

setMonitorURL(url='')

Send event to portal setting the URL where the monitor component will put data.

set_config_param(param, value, target_sim_name=None)

Set configuration parameter param to value. Raise exceptions if the parameter cannot be changed or if there are problems setting the value. This tell the framework to call ipsframework.configurationManager.ConfigurationManager.set_config_parameter() to change the parameter.

Parameters
  • param (str) – The parameter requested from simulation config

  • value – The value to set the parameter

Returns

return value from setting parameter

stage_input_files(input_file_list)

Copy component input files to the component working directory (as obtained via a call to ServicesProxy.get_working_dir()). Input files are assumed to be originally located in the directory variable INPUT_DIR in the component configuration section.

File are copied using ipsframework.ipsutil.copyFiles().

Parameters

input_file_list (str or Iterable of str) – input files can space separated string or iterable of strings

stage_output_files(timeStamp, file_list, keep_old_files=True, save_plasma_state=True)

Copy associated component output files (from the working directory) to the component simulation results directory. Output files are prefixed with the configuration parameter OUTPUT_PREFIX. The simulation results directory has the format:

${SIM_ROOT}/simulation_results/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}

Additionally, plasma state files are archived for debugging purposes:

${SIM_ROOT}/history/plasma_state/<file_name>_$CLASS_${SUB_CLASS}_$NAME_<timeStamp>

Copying errors are not fatal (exception raised).

stage_state(state_files=None)

Copy current state to work directory.

stage_subflow_output_files(subflow_name='ALL')

Gather outputs from sub-workflows. Sub-workflow output is defined to be the output files from its DRIVER component as they exist in the sub-workflow driver’s work area at the end of the sub-simulation. If subflow_name != ‘ALL’ then get output from only that sub-flow

submit_tasks(task_pool_name, block=True, use_dask=False, dask_nodes=1, dask_ppw=None, launch_interval=0.0, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False)

Launch all unfinished tasks in task pool task_pool_name. If block is True, return when all tasks have been launched. If block is False, return when all tasks that can be launched immediately have been launched. Return number of tasks submitted.

Optionally, dask can be used to schedule and run the task pool.

subscribe(topicName, callback)

Subscribe to topic topicName on the IPS event service and register callback as the method to be invoked when an event is published to that topic.

unsubscribe(topicName)

Remove subscription to topic topicName.

update_state(state_files=None)

Copy local (updated) state to global state. If no state files are specified, component configuration specification is used. Raise exceptions upon copy.

update_time_stamp(new_time_stamp=- 1)

Update time stamp on portal.

wait_call(call_id, block=True)

If block is True, return when the call has completed with the return code from the call. If block is False, raise IncompleteCallException if the call has not completed, and the return value is it has.

Parameters

call_id (int) – call ID

Returns

service response message arguments

wait_call_list(call_id_list, block=True)

Check the status of each of the call in call_id_list. If block is True, return when all calls are finished. If block is False, raise IncompleteCallException if any of the calls have not completed, otherwise return. The return value is a dictionary of call_ids and return values.

Parameters

call_id_list (list of int) – list of call ID’s

Returns

dict of call_id and return value

Return type

dict

wait_task(task_id, timeout=- 1, delay=1)

Check the status of task task_id. Return the return value of the task when finished successfully. Raise exceptions if the task is not found, or if there are problems finalizing the task.

Parameters
  • task_id (int) – task ID (PID)

  • timeout (float) – maximum time to wait for task to finish, default -1 (no timeout)

  • delay (float) – time to wait before checking if task has timed-out

Returns

return value of task

wait_task_nonblocking(task_id)

Check the status of task task_id. If it has finished, the return value is populated with the actual value, otherwise None is returned. A KeyError exception may be raised if the task is not found.

Parameters

task_id (int) – task ID (PID)

Returns

return value of task if finished else None

wait_tasklist(task_id_list, block=True)

Check the status of a list of tasks. If block is True, return a dictionary of return values when all tasks have completed. If block is False, return a dictionary containing entries for each completed task. Note that the dictionary may be empty. Raise KeyError exception if task_id not found.

Parameters
  • task_id_list (list of int) – list of task_id’s (PID’s) to wait until completed

  • block (bool) – if to wait until all task finish

Returns

dict of task_id and return value

Return type

dict

warning(msg, *args)

Produce warning message in simulation log file. See logging.warning() for usage.

class ipsframework.services.Task(task_name, nproc, working_dir, binary, *args, **keywords)

Container for task information:

Parameters
  • name (str) – task name

  • nproc (int) – number of processes the task needs

  • working_dir (str) – location to launch task from

  • binary (str) – full path to executable to launch

  • *args – arguments for binary

  • **keywords – keyword arguments for launching the task. See ServicesProxy.launch_task() for details.

class ipsframework.services.TaskPool(name, services)

Class to contain and manage a pool of tasks.

add_task(task_name, nproc, working_dir, binary, *args, **keywords)

Create Task object and add to queued_tasks of the task pool. Raise exception if task name already exists in task pool.

Parameters
  • task_name (str) – unique task name

  • nproc (int) – number of process to run task with

  • working_dir (str) – change to this directory before launching task

  • binary (str) – full path to executable to launch

get_dask_finished_tasks_status()

Return a dictionary of exit status values for all dask tasks that have finished since the last time finished tasks were polled.

Returns

dict mapping task name to exit status

Return type

dict

get_finished_tasks_status()

Return a dictionary of exit status values for all tasks that have finished since the last time finished tasks were polled.

Returns

dict mapping task name to exit status

Return type

dict

submit_dask_tasks(block=True, dask_nodes=1, dask_ppw=None, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False)

Launch tasks in queued_tasks using dask.

One dask worker will be started for each node unless dask_worker_per_gpu is True where one dask worker will be started for every GPU. So dask_node times GPUS_PER_NODE workers will be started.

Parameters
  • block (bool) – Unused, this will always return after tasks are submitted

  • dask_nodes (int) – Number of task nodes, default 1

  • dask_ppw (int) – Number of processes per dask worker, default is PROCS_PER_NODE

  • use_shifter (bool) – Option to launch dask scheduler and workers in shifter container

  • dask_worker_plugin (distributed.diagnostics.plugin.WorkerPlugin) – If provided this will be registered as a worker plugin with the dask client

  • dask_worker_per_gpu (bool) – If true then a separate worker will be started for each GPU and binded to that GPU

submit_tasks(block=True, use_dask=False, dask_nodes=1, dask_ppw=None, launch_interval=0.0, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False)

Launch tasks in queued_tasks. Finished tasks are handled before launching new ones. If block is True, the number of tasks submitted is returned after all tasks have been launched and completed. If block is False the number of tasks that can immediately be launched is returned.

If use_dask==True then the tasks are launched with submit_dask_tasks(). One dask worker will be started for each node unless dask_worker_per_gpu is True where one dask worker will be started for every GPU. So dask_node times GPUS_PER_NODE workers will be started.

Parameters
  • block (bool) – If True then wait for task to complete, default True

  • use_dask (bool) – If True then use dask to launch tasks, default False

  • dask_nodes (int) – Number of task nodes, only used it use_dask==True

  • dask_ppw (int) – Number of processes per dask worker, default is PROCS_PER_NODE, only used it use_dask==True

  • launch_internal (float) – time to wait between launching tasks, default 0.0

  • use_shifter (bool) – Option to launch dask scheduler and workers in shifter container

  • shifter_args (str) – Optional arguments added to shifter when launching dask scheduler and workers

  • dask_worker_plugin (distributed.diagnostics.plugin.WorkerPlugin) – If provided this will be registered as a worker plugin with the dask client

  • dask_worker_per_gpu (bool) – If true then a separate worker will be started for each GPU and binded to that GPU

terminate_tasks()

Kill all active tasks, clear all queued, blocked and finished tasks.

ipsframework.services.launch(binary, task_name, working_dir, *args, **keywords)

This is used by TaskPool.submit_dask_tasks() as the input to dask.distributed.Client.submit().

Other Utilities

IPS Exceptions

exception ipsframework.ipsExceptions.BadResourceRequestException(caller_id, tid, request, deficit)

Exception raised by the resource manager when a component requests a quantity of resources that can never be satisfied during a get_allocation() call

exception ipsframework.ipsExceptions.BlockedMessageException(msg, reason)

Exception Raised by the any manager when a blocking service invocation is made, and the invocation result is not readily available.

exception ipsframework.ipsExceptions.GPUResourceRequestMismatchException(caller_id, tid, ppn, gpp, max_gpp)

Exception raised by the resource manager when it is possible to launch the requested number of GPUs per task

exception ipsframework.ipsExceptions.IncompleteCallException(callID)

Exception Raised by the taskManager when a nonblocking wait_call() method is invoked before the call has finished.

exception ipsframework.ipsExceptions.InsufficientResourcesException(caller_id, tid, request, deficit)

Exception Raised by the resource manager when not enough resources are available to satisfy an allocate() call

exception ipsframework.ipsExceptions.InvalidResourceSettingsException(t, spn, cpn)

Exception raised by the resource helper to indicate inconsistent resource settings.

exception ipsframework.ipsExceptions.ResourceRequestMismatchException(caller_id, tid, nproc, ppn, max_procs, max_ppn)

Exception raised by the resource manager when it is possible to launch the requested number of processes, but not on the requested number of processes per node.

exception ipsframework.ipsExceptions.ResourceRequestUnequalPartitioningException(caller_id, tid, nproc, ppn, max_procs, max_ppn)

Exception raised by the resource manager when it is possible to launch the requested number of processes, but the requested number of processes and processes per node will result in unequal partitioning of nodes.

IPS Utilities

ipsframework.ipsutil.copyFiles(src_dir, src_file_list, target_dir, prefix='', keep_old=False)

Copy files in src_file_list from src_dir to target_dir with an optional prefix. If keep_old is True, existing files in target_dir will not be overridden, otherwise files can be clobbered (default). Wild-cards in file name specification are allowed.

ipsframework.ipsutil.getTimeString(timeArg=None)

Return a string representation of timeArg. timeArg is expected to be an appropriate object to be processed by time.strftime(). If timeArg is None, current time is used.

ipsframework.ipsutil.which(program, alt_paths=None)
class ipsframework.messages.Message(sender_id, receiver_id)

Base class for all IPS messages. Should not be used in actual communication.

FAILURE = 1
SUCCESS = 0
counter = 0
delimiter = ''
get_message_id()
identifier = 'MESSAGE'
class ipsframework.messages.MethodInvokeMessage(sender_id, receiver_id, call_id, target_method, *args, **keywords)

Message used by components to invoke methods on other components.

  • sender_id: component id of the sender

  • receiver_id: component id of the receiver

  • call_id: identifier of the call (generated by caller)

  • target_method: method to be invoked on the receiver

  • *args: arguments to be passed to the target_method

counter = 0
delimiter = '|'
identifier = 'INVOKE'
class ipsframework.messages.MethodResultMessage(sender_id, receiver_id, call_id, status, *args)

Message used to relay the return value after a method invocation.

  • sender_id: component id of the sender (callee)

  • receiver_id: component id of the receiver (caller)

  • call_id: identifier of the call (generated by caller)

  • status: either Message.SUCCESS or Message.FAILURE indicating the success of failure of the invocation.

  • *args: other information to be passed back to the caller.

counter = 0
delimiter = '|'
identifier = 'RESULT'
class ipsframework.messages.ServiceRequestMessage(sender_id, receiver_id, target_comp_id, target_method, *args, **keywords)

Message used by components to request the result of a service action by one of the IPS managers.

  • sender_id: component id of the sender

  • receiver_id: component id of the receiver (framework)

  • target_comp_id: component id of target component (typically framework)

  • target_method: name of method to be invoked on component target_comp_id

  • *args: any number of arguments. These are specific to the target method.

counter = 0
delimiter = '|'
identifier = 'REQUEST'
class ipsframework.messages.ServiceResponseMessage(sender_id, receiver_id, request_msg_id, status, *args)

Message used by managers to respond with the result of the service action to the calling component.

  • sender_id: component id of the sender (framework)

  • receiver_id: component id of the receiver (calling component)

  • request_msg_id: id of request message this is a response to.

  • status: either Message.SUCCESS or Message.FAILURE

  • *args: any number of arguments. These are specific to type of response.

counter = 0
delimiter = '|'
identifier = 'RESPONSE'

Framework Components

class ipsframework.portalBridge.PortalBridge(services, config)

Framework component to communicate with the SWIM web portal.

class SimulationData

Container for simulation data.

check_send_post_responses()
finalize(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

init(timestamp=0.0, **keywords)

Try to connect to the portal, subscribe to _IPS_MONITOR events and register callback process_event().

init_simulation(sim_name, sim_root)

Create and send information about simulation sim_name living in sim_root so the portal can set up corresponding structures to manage data from the sim.

process_event(topicName, theEvent)

Process a single event theEvent on topic topicName.

send_event(sim_data, event_data)

Send contents of event_data and sim_data to portal.

send_mpo_data(event_data, sim_data)
step(timestamp=0.0, **keywords)

Poll for events.

ipsframework.portalBridge.configure_mpo()
ipsframework.portalBridge.hash_file(file_name)

Return the MD5 hash of a file :rtype: str :param file_name: Full path to file :return: MD5 of file_name

ipsframework.portalBridge.send_post(conn, stop, url)

class ipsframework.runspaceInitComponent.runspaceInitComponent(services, config)

Framework component to manage runspace initialization, container file management, and file staging for simulation and analysis runs.

init(timestamp=0.0, **keywords)

Creates base directory, copies IPS and FacetsComposer input files.

step(timestamp=0.0, **keywords)

Copies individual subcomponent input files into working subdirectories.