Code Listings

IPS

# local version The Integrated Plasma Simulator (IPS) Framework. This framework enables loose, file-based coupling of certain class of nuclear fusion simulation codes.

For further design information see

  • Wael Elwasif, David E. Bernholdt, Aniruddha G. Shet, Samantha S. Foley, Randall Bramley, Donald B. Batchelor, and Lee A. Berry, The Design and Implementation of the SWIM Integrated Plasma Simulator, in The 18th Euromirco International Conference on Parallel, Distributed and Network - Based Computing (PDP 2010), 2010.
  • Samantha S. Foley, Wael R. Elwasif, David E. Bernholdt, Aniruddha G. Shet, and Randall Bramley, Extending the Concept of Component Interfaces: Experience with the Integrated Plasma Simulator, in Component - Based High - Performance Computing (CBHPC) 2009, 2009, (extended abstract).
  • D Batchelor, G Alba, E D’Azevedo, G Bateman, DE Bernholdt, L Berry, P Bonoli, R Bramley, J Breslau, M Chance, J Chen, M Choi, W Elwasif, S Foley, G Fu, R Harvey, E Jaeger, S Jardin, T Jenkins, D Keyes, S Klasky, S Kruger, L Ku, V Lynch, D McCune, J Ramos, D Schissel, D Schnack, and J Wright, Advances in Simulation of Wave Interactions with Extended MHD Phenomena, in Horst Simon, editor, SciDAC 2009, 14-18 June 2009, San Diego, California, USA, volume 180 of Journal of Physics: Conference Series, page 012054, Institute of Physics, 2009, 6pp.
  • Samantha S. Foley, Wael R. Elwasif, Aniruddha G. Shet, David E. Bernholdt, and Randall Bramley, Incorporating Concurrent Component Execution in Loosely Coupled Integrated Fusion Plasma Simulation, in Component-Based High-Performance Computing (CBHPC) 2008, 2008, (extended abstract).
  • D. Batchelor, C. Alba, G. Bateman, D. Bernholdt, L. Berry, P. Bonoli, R. Bramley, J. Breslau, M. Chance, J. Chen, M. Choi, W. Elwasif, G. Fu, R. Harvey, E. Jaeger, S. Jardin, T. Jenkins, D. Keyes, S. Klasky, S. Kruger, L. Ku, V. Lynch, D. McCune, J. Ramos, D. Schissel, D. Schnack, and J. Wright, Simulation of Wave Interations with MHD, in Rick Stevens, editor, SciDAC 2008, 14-17 July 2008, Washington, USA, volume 125 of Journal of Physics: Conference Series, page 012039, Institute of Physics, 2008.
  • Wael R. Elwasif, David E. Bernholdt, Lee A. Berry, and Don B. Batchelor, Component Framework for Coupled Integrated Fusion Plasma Simulation, in HPC-GECO/CompFrame 2007, 21-22 October, Montreal, Quebec, Canada, 2007.
Authors:Wael R. Elwasif, Samantha Foley, Aniruddha G. Shet
Organization:Center for Simulation of RF Wave Interactions with Magnetohydrodynamics

Framework

class ipsframework.ips.Framework(config_file_list, log_file_name, platform_file_name=None, debug=False, verbose_debug=False, cmd_nodes=0, cmd_ppn=0)
critical(*args)

Produce critical message in simulation log file. Raise exception for bad formatting.

debug(*args)

Produce debugging message in simulation log file. Raise exception for bad formatting.

error(*args)

Produce error message in simulation log file. Raise exception for bad formatting.

exception(*args)

Produce exception message in simulation log file. Raise exception for bad formatting.

get_inq()
Return handle to the Framework’s input queue object
(multiprocessing.Queue)
info(*args)

Produce informational message in simulation log file. Raise exception for bad formatting.

log(*args)

Wrapper for Framework.info().

register_service_handler(service_list, handler)

Register a call back method to handle a list of framework service invocations.

  • handler: a Python callable object that takes a messages.ServiceRequestMessage.
  • service_list: a list of service names to call handler when invoked by components.
    The service name must match the target_method parameter in messages.ServiceRequestMessage.
run()

Run the communication outer loop of the framework.

This method implements the core communication and message dispatch functionality of the framework. The main phases of execution for the framework are:

  1. Invoke the init method on all framework-attached components, blocking pending method call termination.
  2. Generate method invocation messages for the remaining public method in the framework-centric components (i.e. step and finalize).
  3. Generate a queue of method invocation messages for all public framework accessible components in the simulations being run. framework-accessible components are made up of the Init component (if is exists), and the Driver component. The generated messages invoke the public methods init, step, and finalize.
  4. Dispatch method invocations for each framework-centric component and physics simulation in order.

Exceptions that propagate to this method from the managed simulations causes the framework to abort any pending method invocation for the source simulation. Exceptions from framework-centeric component aborts further invocations to that component.

When all method invocations have been dispatched (or aborted), Framework.terminate_sim() is called to trigger normal termination of all component processes.

send_terminate_msg(sim_name, status=0)

Invoke terminate(status) on components in a simulation

This method remotely invokes the method C{terminate()} on all componnets in the IPS simulation sim_name.

terminate_all_sims(status=0)

Terminate all active component instances.

This method remotely invokes the method C{terminate()} on all componnets in the IPS simulation.

warning(*args)

Produce warning message in simulation log file. Raise exception for bad formatting.

Data Manager

class ipsframework.dataManager.DataManager(fwk)

The data manager facilitates the movement and exchange of data files for the simulation.

merge_current_plasma_state(msg)

Merge partial plasma state file with global master. Newly updated plasma state copied to caller’s workdir. Exception raised on copy error.

msg.args:

  1. partial_state_file
  2. target_state_file
  3. log_file: stdout for merge process if not None
process_service_request(msg)

Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.

stage_state(msg)

Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.

msg.args:

  1. state_files
  2. source_dir
  3. target_dir
update_state(msg)

Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.

msg.args:

  1. state_files
  2. source_dir
  3. target_dir

Task Manager

class ipsframework.taskManager.TaskManager(fwk)

The task manager is responsible for facilitating component method invocations, and the launching of tasks.

build_launch_cmd(nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, task_id, core_list='')

Construct task launch command to be executed by the component.

  • nproc - number of processes to use
  • binary - binary to launch
  • cmd_args - additional command line arguments for the binary
  • working_dir - full path to where the executable will be launched
  • ppn - processes per node value to use
  • max_ppn - maximum possible ppn for this allocation
  • nodes - comma separated list of node ids
  • accurateNodes - if True, launch on nodes in nodes, otherwise the parallel launcher determines the process placement
  • partial_nodes - if True and accurateNodes and task_launch_cmd == ‘mpirun’,
    a host file is created specifying the exact placement of processes on cores.
  • core_list - used for creating host file with process to core mappings
finish_task(finish_task_msg)

Cleanup after a task launched by a component terminates

finish_task_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. task_id: task id of finished task
  2. task_data: return code of task
get_call_id()

Return a new call id

get_task_id()

Return a new task id

init_call(init_call_msg, manage_return=True)

Creates and sends a messages.MethodInvokeMessage from the calling component to the target component. If manage_return is True, a record is added to outstanding_calls. Return call id.

Message args:

  1. method_name
  2. + arguments to be passed on as method arguments.
init_task(init_task_msg)

Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component. Return launch command to component via messages.ServiceResponseMessage. Raise exception if task can not be launched at this time (ipsExceptions.BadResourceRequestException, ipsExceptions.InsufficientResourcesException).

init_task_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. nproc: number of processes the task needs
  2. binary: full path to the executable to launch

# SIMYAN: added this to deal with the component directory change 2. working_dir: full path to directory where the task will be launched

  1. tppn: processes per node for this task. (0 indicates that the default ppn is used.)
  2. block: whether or not to wait until the task can be launched.
  3. wnodes: True for whole node allocation, False otherwise.
  4. wsocks: True for whole socket allocation, False otherwise.
  5. + cmd_args: any arguments for the executable
init_task_pool(init_task_msg)

Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component.

init_task_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. task_dict: dictionary of task names and objects
initialize(data_mgr, resource_mgr, config_mgr)

Initialize references to other managers and key values from configuration manager.

printCurrTaskTable()

Prints the task table pretty-like.

process_service_request(msg)

Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.

return_call(response_msg)

Handle the response message generated by a component in response to a method invocation on that component.

reponse_msg is expected to be of type messages.MethodResultMessage

wait_call(wait_msg)

Determine if the call has finished. If finished, return any data or errors. If not finished raise the appropriate blocking or nonblocking exception and try again later.

wait_msg is expected to be of type messages.ServiceRequestMessage

Message args:

  1. call_id: call id for which to wait
  2. blocking: determines the wait is blocking or not

Resource Manager

class ipsframework.resourceManager.ResourceManager(fwk)

The resource manager is responsible for detecting the resources allocated to the framework, allocating resources to task requests, and maintaining the associated bookkeeping.

add_nodes(listOfNodes)

Add node entries to self.nodes. Typically used by initialize() to initialize self.nodes. May be used to add nodes to a dynamic allocation in the future.

listOfNodes is a list of tuples (node name, cores). self.nodes is a dictionary where the keys are the node names and the values are node_structure.Node structures.

Return total number of cores.

begin_RM_report()

Print header information for resource usage reporting file.

check_core_cap(nproc, ppn)

Determine if it is currently possible to allocate nproc processes with a ppn of ppn without further restrictions.. Return True and list of nodes to use if successful. Return False and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.

check_whole_node_cap(nproc, ppn)

Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole nodes. Return True and list of nodes to use if successful. Return False and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.

check_whole_sock_cap(nproc, ppn)

Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole sockets. Return True and list of nodes to use if successful. Return False and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.

get_allocation(comp_id, nproc, task_id, whole_nodes, whole_socks, task_ppn=0)

Traverse available nodes to return:

If whole_nodes is True:

  • shared_nodes: False
  • nodes: list of node names
  • ppn: processes per node for launching the task
  • max_ppn: processes that can be launched
  • accurateNodes: True if nodes uses the actual names of the nodes, False otherwise.

If whole_nodes is False:

  • shared_nodes: True
  • nodes: list of node names
  • node_file_entries: list of (node, corelist) tuples, where corelist is a list of core names.
    Core names are integers from 0 to n-1 where n is the number of cores on a node.
  • ppn: processes per node for launching the task
  • max_ppn: processes that can be launched
  • accurateNodes: True if nodes uses the actual names of the nodes, False otherwise.

Aguments:

  • nproc: the number of requested processes (int)
  • comp_id: component identifier, must be unique with respect to the framework (string)
  • task_id: task identifier from TM (int)
  • method: name of method (string)
  • task_ppn: ppn for this task (optional) (int)
initialize(dataMngr, taskMngr, configMngr, cmd_nodes=0, cmd_ppn=0)

Initialize resource management structures, references to other managers (dataMngr, taskMngr, configMngr).

Resource information comes from the following in order of priority:

  • command line specification (cmd_nodes, cmd_ppn)
  • detection using parameters from platform config file
  • manual settings from platform config file

The second two sources are obtained through resourceHelper.getResourceList().

printRMState()

Print the node tree to stdout.

process_service_request(msg)
release_allocation(task_id, status)

Set resources allocated to task task_id to available. status is not used, but may be used to correlate resource failures to task failures and implement task relaunch strategies.

report_RM_status(notes='')

Print current RM status to the reporting_file (“resource_usage”) Entries consist of:

  • time in seconds since beginning of time (__init__ of RM)
  • # cores that are available
  • # cores that are allocated
  • % allocated cores
  • # processes launched by task
  • % cores used by processes
  • notes (a description of the event that changed the resource usage)
sendEvent(eventName, info)

wrapper for constructing and publishing EM events


class ipsframework.node_structure.Node(name, socks, cores, p)

Models a node in the allocation.

  • name: name of node, typically actual name from resource detection phase.
  • task_ids, owners: identifiers for the tasks and components that are currently using the node.
  • allocated, available: list of sockets that have cores allocated and available. A socket may appear in both lists if it is only partially allocated.
  • sockets: list of sockets belonging to this node
  • avail_cores: number of cores that are currently available.
  • total_cores: total number of cores that can be allocated on this node.
  • status: indicates if the node is ‘UP’ or ‘DOWN’. Currently not used, all nodes are considered functional..
allocate(whole_nodes, whole_sockets, tid, o, procs)

Mark procs number of cores as allocated subject to the values of whole_nodes and whole_sockets. Return the number of cores allocated and their corresponding slots, a list of strings of the form:

<socket name>:<core name>
print_sockets(fname='')

Pretty print of state of sockets.

release(tid, o)

Mark cores used by task tid and component o as available. Return the number of cores released.

class ipsframework.node_structure.Socket(name, cps, coreids=[])

Models a socket in a node.

  • name: identifier for the socket
  • task_ids, owners: identifiers for the tasks and components that are currently using the socket.
  • allocated, available: lists of cores that are allocated and available.
  • cores: list of Core objects belonging to this socket
  • avail_cores: number of cores that are currently available.
  • total_cores: total number of cores that can be allocated on this socket.
allocate(whole, tid, o, num_procs)

Mark num_procs cores as allocated subject to the value of whole. Return a list of strings of the form:

<socket name>:<core name>
print_cores(fname='')

Pretty print of state of cores.

release(tid)

Mark cores that are allocated to task tid as available. Return number of cores set to available.

class ipsframework.node_structure.Core(name)

Models a core of a socket.

  • name: name of core
  • is_available: boolean value indicating the availability of the core.
  • task_id, owner: identifiers of the task and component using the core.
allocate(tid, o)

Mark core as allocated.

release()

Mark core as available.


The Resource Helper file contains all of the code needed to figure out what host we are on and what resources we have. Taking this out of the resource manager will allow us to test it independent of the IPS.

ipsframework.resourceHelper.getResourceList(services, host, partial_nodes=False)

Using the host information, the resources are detected. Return list of (<node name>, <processes per node>), cores per node, sockets per node, processes per node, and True if the node names are accurate, False otherwise.

ipsframework.resourceHelper.get_checkjob_info()
ipsframework.resourceHelper.get_pbs_info()

Access info about allocation from PBS environment variables:

PBS_NNODES PBS_NODEFILE
ipsframework.resourceHelper.get_qstat_jobinfo()

Use qstat -f $PBS_JOBID to get the number of nodes and ppn of the allocation. Typically works on PBS systems.

ipsframework.resourceHelper.get_qstat_jobinfo2()

A second way to use qstat -f $PBS_JOBID to get the number of nodes and ppn of the allocation. Typically works on PBS systems.

ipsframework.resourceHelper.get_slurm_info()

Access environment variables set by Slurm to get the node names, tasks per node and number of processes.

SLURM_NODELIST SLURM_TASKS_PER_NODE or SLURM_JOB_TASKS_PER_NODE SLURM_NPROC
ipsframework.resourceHelper.manual_detection(services)

Use values listed in platform configuration file.

Component

class ipsframework.component.Component(services, config)

Base class for all IPS components. Common set up, connection and invocation actions are implemented here.

checkpoint(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

finalize(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

init(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

restart(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

step(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

terminate(status)

Clean up services and call sys_exit.

Configuration Manager

class ipsframework.configurationManager.ConfigurationManager(fwk, config_file_list, platform_file_name)

The configuration manager is responsible for paring the simulation and platform configuration files, creating the framework and simulation components, as well as providing an interface to accessing items from the configuration files (e.g., the time loop).

class SimulationData(sim_name)

Structure to hold simulation data stored into the sim_map entry in the configurationManager class

create_simulation(sim_name, config_file, override, sub_workflow=False)
get_all_simulation_components_map()
get_component_map()

Return a dictionary of simulation names and lists of component references. (May only be the driver, and init (if present)???)

get_config_parameter(sim_name, param)

Return value of param from simulation configuration file for sim_name.

get_framework_components()

Return list of framework components.

get_platform_parameter(param, silent=False)

Return value of platform parameter param. If silent is False (default) None is returned when param not found, otherwise an exception is raised.

get_port(sim_name, port_name)

Return a reference to the component from simulation sim_name implementing port port_name.

get_sim_names()

Return list of names of simulations.

get_sim_parameter(sim_name, param)

Return value of param from simulation configuration file for sim_name.

get_simulation_components(sim_name)
initialize(data_mgr, resource_mgr, task_mgr)

Parse the platform and simulation configuration files using the ConfigObj module. Create and initialize simulation(s) and their components, framework components and loggers.

process_service_request(msg)

Invokes public configuration manager method for a component. Return method’s return value.

set_config_parameter(sim_name, param, value, target_sim_name)

Set the configuration parameter param to value value in target_sim_name. If target_sim_name is the framework, all simulations will get the change. Return value.

terminate(status)

Terminates all processes attached to the framework. status not used.

terminate_sim(sim_name)

Services

class ipsframework.services.ServicesProxy(fwk, fwk_in_q, svc_response_q, sim_conf, log_pipe_name)
add_task(task_pool_name, task_name, nproc, working_dir, binary, *args, **keywords)

Add task task_name to task pool task_pool_name. Remaining arguments are the same as in ServicesProxy.launch_task().

call(component_id, method_name, *args, **keywords)

Invoke method method_name on component component_id with optional arguments *args. Return result from invoking the method.

call_nonblocking(component_id, method_name, *args, **keywords)

Invoke method method_name on component component_id with optional arguments *args. Return call_id.

checkpoint_components(comp_id_list, time_stamp, Force=False, Protect=False)

Selectively checkpoint components in comp_id_list based on the configuration section CHECKPOINT. If Force is True, the checkpoint will be taken even if the conditions for taking the checkpoint are not met. If Protect is True, then the data from the checkpoint is protected from clean up. Force and Protect are optional and default to False.

The CHECKPOINT_MODE option controls determines if the components checkpoint methods are invoked.

Possible MODE options are:

ALL:
Checkpint everytime the call is made (equivalent to always setting Force =True)
WALLTIME_REGULAR:
checkpoints are saved upon invocation of the service call checkpoint_components(), when a time interval greater than, or equal to, the value of the configuration parameter WALLTIME_INTERVAL had passed since the last checkpoint. A checkpoint is assumed to have happened (but not actually stored) when the simulation starts. Calls to checkpoint_components() before WALLTIME_INTERVAL seconds have passed since the last successful checkpoint result in a NOOP.
WALLTIME_EXPLICIT:
checkpoints are saved when the simulation wall clock time exceeds one of the (ordered) list of time values (in seconds) specified in the variable WALLTIME_VALUES. Let [t_0, t_1, …, t_n] be the list of wall clock time values specified in the configuration parameter WALLTIME_VALUES. Then checkpoint(T) = True if T >= t_j, for some j in [0,n] and there is no other time T_1, with T > T_1 >= T_j such that checkpoint(T_1) = True. If the test fails, the call results in a NOOP.
PHYSTIME_REGULAR:
checkpoints are saved at regularly spaced “physics time” intervals, specified in the configuration parameter PHYSTIME_INTERVAL. Let PHYSTIME_INTERVAL = PTI, and the physics time stamp argument in the call to checkpoint_components() be pts_i, with i = 0, 1, 2, … Then checkpoint(pts_i) = True if pts_i >= n PTI , for some n in 1, 2, 3, … and pts_i - pts_prev >= PTI, where checkpoint(pts_prev) = True and pts_prev = max (pts_0, pts_1, ..pts_i-1). If the test fails, the call results in a NOOP.
PHYSTIME_EXPLICIT:
checkpoints are saved when the physics time equals or exceeds one of the (ordered) list of physics time values (in seconds) specified in the variable PHYSTIME_VALUES. Let [pt_0, pt_1, …, pt_n] be the list of physics time values specified in the configuration parameter PHYSTIME_VALUES. Then checkpoint(pt) = True if pt >= pt_j, for some j in [0,n] and there is no other physics time pt_k, with pt > pt_k >= pt_j such that checkpoint(pt_k) = True. If the test fails, the call results in a NOOP.

The configuration parameter NUM_CHECKPOINT controls how many checkpoints to keep on disk. Checkpoints are deleted in a FIFO manner, based on their creation time. Possible values of NUM_CHECKPOINT are:

  • NUM_CHECKPOINT = n, with n > 0 –> Keep the most recent n checkpoints
  • NUM_CHECKPOINT = 0 –> No checkpoints are made/kept (except when Force = True)
  • NUM_CHECKPOINT < 0 –> Keep ALL checkpoints

Checkpoints are saved in the directory ${SIM_ROOT}/restart

cleanup()

Clean up any state from the services. Called by the terminate method in the base class for components.

create_simulation(config_file, override)
create_sub_workflow(sub_name, config_file, override=None, input_dir=None)
create_task_pool(task_pool_name)

Create an empty pool of tasks with the name task_pool_name. Raise exception if duplicate name.

critical(*args)

Produce critical message in simulation log file. Raise exception for bad formatting.

debug(*args)

Produce debugging message in simulation log file. Raise exception for bad formatting.

error(*args)

Produce error message in simulation log file. Raise exception for bad formatting.

exception(*args)

Produce exception message in simulation log file. Raise exception for bad formatting.

get_config_param(param, silent=False)

Return the value of the configuration parameter param. Raise exception if not found.

get_finished_tasks(task_pool_name)

Return dictionary of finished tasks and return values in task pool task_pool_name. Raise exception if no active or finished tasks.

get_port(port_name)

Return a reference to the component implementing port port_name.

get_restart_files(restart_root, timeStamp, file_list)

Copy files needed for component restart from the restart directory:

<restart_root>/restart/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}

to the component’s work directory.

Copying errors are not fatal (exception raised).

get_time_loop()

Return the list of times as specified in the configuration file.

get_working_dir()

Return the working directory of the calling component.

The structure of the working directory is defined using the configuration parameters CLASS, SUB_CLASS, and NAME of the component configuration section. The structure of the working directory is:

${SIM_ROOT}/work/$CLASS_${SUB_CLASS}_$NAME_<instance_num>
info(*args)

Produce informational message in simulation log file. Raise exception for bad formatting.

kill_all_tasks()

Kill all tasks associated with this component.

kill_task(task_id)

Kill launched task task_id. Return if successful. Raises exceptions if the task or process cannot be found or killed successfully.

launch_task(nproc, working_dir, binary, *args, **keywords)

Launch binary in working_dir on nproc processes. *args are any arguments to be passed to the binary on the command line. **keywords are any keyword arguments used by the framework to manage how the binary is launched. Keywords may be the following:

  • task_ppn : the processes per node value for this task
  • block : specifies that this task will block (or raise an exception) if not enough resources are available to run immediately. If True, the task will be retried until it runs. If False, an exception is raised indicating that there are not enough resources, but it is possible to eventually run. (default = True)
  • tag : identifier for the portal. May be used to group related tasks.
  • logfile : file name for stdout (and stderr) to be redirected to for this task. By default stderr is redirected to stdout, and stdout is not redirected.
  • whole_nodes : if True, the task will be given exclusive access to any nodes it is assigned. If False, the task may be assigned nodes that other tasks are using or may use.
  • whole_sockets : if True, the task will be given exclusive access to any sockets of nodes it is assigned. If False, the task may be assigned sockets that other tasks are using or may use.

Return task_id if successful. May raise exceptions related to opening the logfile, being unable to obtain enough resources to launch the task (ipsExceptions.InsufficientResourcesException), bad task launch request (ipsExceptions.ResourceRequestMismatchException, ipsExceptions.BadResourceRequestException) or problems executing the command. These exceptions may be used to retry launching the task as appropriate.

Note

This is a nonblocking function, users must use a version of ServicesProxy.wait_task() to get result.

launch_task_pool(task_pool_name, launch_interval=0.0)

Construct messages to task manager to launch each task. Used by TaskPool to launch tasks in a task_pool.

launch_task_resilient(nproc, working_dir, binary, *args, **keywords)

not used

log(*args)

Wrapper for ServicesProxy.info().

merge_current_state(partial_state_file, logfile=None, merge_binary=None)

Merge partial plasma state with global state. Partial plasma state contains only the values that the component contributes to the simulation. Raise exceptions on bad merge. Optional logfile will capture stdout from merge. Optional merge_binary specifies path to executable code to do the merge (default value : “update_state”)

process_events()

Poll for events on subscribed topics.

publish(topicName, eventName, eventBody)

Publish event consisting of eventName and eventBody to topic topicName to the IPS event service.

remove_task_pool(task_pool_name)

Kill all running tasks, clean up all finished tasks, and delete task pool.

save_restart_files(timeStamp, file_list)

Copy files needed for component restart to the restart directory:

${SIM_ROOT}/restart/$timestamp/components/$CLASS_${SUB_CLASS}_$NAME

Copying errors are not fatal (exception raised).

send_portal_event(event_type='COMPONENT_EVENT', event_comment='')

Send event to web portal.

setMonitorURL(url='')

Send event to portal setting the URL where the monitor component will put data.

set_config_param(param, value, target_sim_name=None)

Set configuration parameter param to value. Raise exceptions if the parameter cannot be changed or if there are problems setting the value.

stage_input_files(input_file_list)

Copy component input files to the component working directory (as obtained via a call to ServicesProxy.get_working_dir()). Input files are assumed to be originally located in the directory variable INPUT_DIR in the component configuration section.

stage_output_files(timeStamp, file_list, keep_old_files=True, save_plasma_state=True)

Copy associated component output files (from the working directory) to the component simulation results directory. Output files are prefixed with the configuration parameter OUTPUT_PREFIX. The simulation results directory has the format:

${SIM_ROOT}/simulation_results/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}

Additionally, plasma state files are archived for debugging purposes:

${SIM_ROOT}/history/plasma_state/<file_name>_$CLASS_${SUB_CLASS}_$NAME_<timeStamp>

Copying errors are not fatal (exception raised).

stage_state(state_files=None)

Copy current state to work directory.

stage_subflow_output_files(subflow_name='ALL')
submit_tasks(task_pool_name, block=True, use_dask=False, dask_nodes=1, dask_ppn=None, launch_interval=0.0)

Launch all unfinished tasks in task pool task_pool_name. If block is True, return when all tasks have been launched. If block is False, return when all tasks that can be launched immediately have been launched. Return number of tasks submitted.

subscribe(topicName, callback)

Subscribe to topic topicName on the IPS event service and register callback as the method to be invoked whem an event is published to that topic.

unsubscribe(topicName)

Remove subscription to topic topicName.

update_state(state_files=None)

Copy local (updated) state to global state. If no state files are specified, component configuration specification is used. Raise exceptions upon copy.

update_time_stamp(new_time_stamp=-1)

Update time stamp on portal.

wait_call(call_id, block=True)

If block is True, return when the call has completed with the return code from the call. If block is False, raise ipsExceptions.IncompleteCallException if the call has not completed, and the return value is it has.

wait_call_list(call_id_list, block=True)

Check the status of each of the call in call_id_list. If block is True, return when all calls are finished. If block is False, raise ipsExceptions.IncompleteCallException if any of the calls have not completed, otherwise return. The return value is a dictionary of call_ids and return values.

wait_task(task_id, timeout=-1, delay=1)

Check the status of task task_id. Return the return value of the task when finished successfully. Raise exceptions if the task is not found, or if there are problems finalizing the task.

wait_task_nonblocking(task_id)

Check the status of task task_id. If it has finished, the return value is populated with the actual value, otherwise None is returned. A KeyError exception may be raised if the task is not found.

wait_task_resilient(task_id)

not used

wait_tasklist(task_id_list, block=True)

Check the status of a list of tasks. If block is True, return a dictionary of return values when all tasks have completed. If block is False, return a dictionary containing entries for each completed task. Note that the dictionary may be empty. Raise KeyError exception if task_id not found.

warning(*args)

Produce warning message in simulation log file. Raise exception for bad formatting.

class ipsframework.services.Task(task_name, nproc, working_dir, binary, *args, **keywords)

Container for task information:

  • name: task name
  • nproc: number of processes the task needs
  • working_dir: location to launch task from
  • binary: full path to executable to launch
  • *args: arguments for binary
  • **keywords: keyword arguments for launching the task. See ServicesProxy.launch_task() for details.
class ipsframework.services.TaskPool(name, services)

Class to contain and manage a pool of tasks.

add_task(task_name, nproc, working_dir, binary, *args, **keywords)

Create Task object and add to queued_tasks of the task pool. Raise exception if task name already exists in task pool.

dask = None
distributed = None
get_dask_finished_tasks_status()
get_finished_tasks_status()

Return a dictionary of exit status values for all tasks that have finished since the last time finished tasks were polled.

submit_dask_tasks(block=True, dask_nodes=1, dask_ppn=None)
submit_tasks(block=True, use_dask=False, dask_nodes=1, dask_ppn=None, launch_interval=0.0)

Launch tasks in queued_tasks. Finished tasks are handled before launching new ones. If block is True, the number of tasks submited is returned after all tasks have been launched and completed. If block is False the number of tasks that can immediately be launched is returned.

terminate_tasks()

Kill all active tasks, clear all queued, blocked and finished tasks.

Other Utilities

IPS Exceptions

exception ipsframework.ipsExceptions.AllocatedNodeDownException(identifier, tid, comp_id)

Exception is raised when an allocated node is discovered to be faulty. The task manager should catch the exception and do something with it.

exception ipsframework.ipsExceptions.BadResourceRequestException(caller_id, tid, request, deficit)

Exception raised by the resource manager when a component requests a quantity of resources that can never be satisfied during a get_allocation() call

exception ipsframework.ipsExceptions.BlockedMessageException(msg, reason)

Exception Raised by the any manager when a blocking service invocation is made, and the invocation result is not readily available.

exception ipsframework.ipsExceptions.IncompleteCallException(callID)

Exception Raised by the taskManager when a nonblocking wait_call() method is invoked before the call has finished.

exception ipsframework.ipsExceptions.InsufficientResourcesException(caller_id, tid, request, deficit)

Exception Raised by the resource manager when not enough resources are available to satisfy an allocate() call

exception ipsframework.ipsExceptions.InvalidResourceSettingsException(t, spn, cpn)

Exception raised by the resource helper to indicate inconsistent resource settings.

exception ipsframework.ipsExceptions.NonexistentResourceException(identifier)

Exception for any time nonexistent (nodes) are tried to be used

exception ipsframework.ipsExceptions.ReleaseMismatchException(caller_id, tid, old_alc, old_avc, new_alc, new_avc)

Exception raised by the resource manager when a release allocation request accounting yields unexpected results.

exception ipsframework.ipsExceptions.ResourceRequestMismatchException(caller_id, tid, nproc, ppn, max_procs, max_ppn)

Exception raised by the resource manager when it is possible to launch the requested number of processes, but not on the requested number of processes per node.

IPS Utilities

ipsframework.ipsutil.copyFiles(src_dir, src_file_list, target_dir, prefix='', keep_old=False)

Copy files in src_file_list from src_dir to target_dir with an optional prefix. If keep_old is True, existing files in target_dir will not be overridden, otherwise files can be clobbered (default). Wild-cards in file name specification are allowed.

ipsframework.ipsutil.getTimeString(timeArg=None)

Return a string representation of timeArg. timeArg is expected to be an appropriate object to be processed by time.strftime(). If timeArg is None, current time is used.

ipsframework.ipsutil.which(program, alt_paths=None)
class ipsframework.messages.ExitMessage(sender_id, receiver_id, status, *args)

Message used to communicate the exit status of a component.

  • sender_id: component id that is telling the component to die (framework)
  • receiver_id: component id that is to die
  • status: either Messages.SUCCESS or Messages.FAILURE indicating if the exit request is due to the simulation finishing successfully or in error.
  • *args: other information passed to the component to die.
counter = 0
delimiter = '|'
identifier = 'EXIT'
class ipsframework.messages.Message(sender_id, receiver_id)

Base class for all IPS messages. Should not be used in actual communication.

FAILURE = 1
SUCCESS = 0
counter = 0
delimiter = ''
get_message_id()
identifier = 'MESSAGE'
class ipsframework.messages.MethodInvokeMessage(sender_id, receiver_id, call_id, target_method, *args, **keywords)

Message used by components to invoke methods on other components.

  • sender_id: component id of the sender
  • receiver_id: component id of the receiver
  • call_id: identifier of the call (generated by caller)
  • target_method: method to be invoked on the receiver
  • *args: arguments to be passed to the target_method
counter = 0
delimiter = '|'
identifier = 'INVOKE'
class ipsframework.messages.MethodResultMessage(sender_id, receiver_id, call_id, status, *args)

Message used to relay the return value after a method invocation.

  • sender_id: component id of the sender (callee)
  • receiver_id: component id of the receiver (caller)
  • call_id: identifier of the call (generated by caller)
  • status: either Message.SUCCESS or Message.FAILURE indicating the success of failure of the invocation.
  • *args: other information to be passed back to the caller.
counter = 0
delimiter = '|'
identifier = 'RESULT'
class ipsframework.messages.ServiceRequestMessage(sender_id, receiver_id, target_comp_id, target_method, *args, **keywords)

Message used by components to request the result of a service action by one of the IPS managers.

  • sender_id: component id of the sender
  • receiver_id: component id of the receiver (framework)
  • target_comp_id: component id of target component (typically framework)
  • target_method: name of method to be invoked on component target_comp_id
  • *args: any number of arguments. These are specific to the target method.
counter = 0
delimiter = '|'
identifier = 'REQUEST'
class ipsframework.messages.ServiceResponseMessage(sender_id, receiver_id, request_msg_id, status, *args)

Message used by managers to respond with the result of the service action to the calling component.

  • sender_id: component id of the sender (framework)
  • receiver_id: component id of the receiver (calling component)
  • request_msg_id: id of request message this is a response to.
  • status: either Message.SUCCESS or Message.FAILURE
  • *args: any number of arguments. These are specific to type of response.
counter = 0
delimiter = '|'
identifier = 'RESPONSE'

Framework Components

class ipsframework.portalBridge.PortalBridge(services, config)

Framework component to communicate with the SWIM web portal.

class SimulationData

Container for simulation data.

finalize(timestamp=0.0, **keywords)

Produce some default debugging information before the rest of the code is executed.

get_elapsed_time()

Return total elapsed time since simulation started in seconds (including a possible fraction)

init(timestamp=0.0, **keywords)

Try to connect to the portal, subscribe to _IPS_MONITOR events and register callback process_event().

init_simulation(sim_name, sim_root)

Create and send information about simulation sim_name living in sim_root so the portal can set up corresponding structures to manage data from the sim.

process_event(topicName, theEvent)

Process a single event theEvent on topic topicName.

send_event(sim_data, event_data)

Send contents of event_data and sim_data to portal.

send_mpo_data(event_data, sim_data)
step(timestamp=0.0, **keywords)

Poll for events.

ipsframework.portalBridge.configure_mpo()
ipsframework.portalBridge.hash_file(file_name)

Return the MD5 hash of a file :rtype: str :param file_name: Full path to file :return: MD5 of file_name


ipsframework.runspaceInitComponent.catch_and_go(func_to_decorate)
class ipsframework.runspaceInitComponent.runspaceInitComponent(services, config)

Framework component to manage runspace initialization, container file management, and file staging for simulation and analysis runs.

init(**original_kwargs)
step(**original_kwargs)