Code Listings¶
IPS¶
The Integrated Plasma Simulator (IPS) Framework. This framework enables loose, file-based coupling of certain class of nuclear fusion simulation codes.
For further design information see
Wael Elwasif, David E. Bernholdt, Aniruddha G. Shet, Samantha S. Foley, Randall Bramley, Donald B. Batchelor, and Lee A. Berry, The Design and Implementation of the SWIM Integrated Plasma Simulator, in The 18th Euromirco International Conference on Parallel, Distributed and Network - Based Computing (PDP 2010), 2010.
Samantha S. Foley, Wael R. Elwasif, David E. Bernholdt, Aniruddha G. Shet, and Randall Bramley, Extending the Concept of Component Interfaces: Experience with the Integrated Plasma Simulator, in Component - Based High - Performance Computing (CBHPC) 2009, 2009, (extended abstract).
D Batchelor, G Alba, E D’Azevedo, G Bateman, DE Bernholdt, L Berry, P Bonoli, R Bramley, J Breslau, M Chance, J Chen, M Choi, W Elwasif, S Foley, G Fu, R Harvey, E Jaeger, S Jardin, T Jenkins, D Keyes, S Klasky, S Kruger, L Ku, V Lynch, D McCune, J Ramos, D Schissel, D Schnack, and J Wright, Advances in Simulation of Wave Interactions with Extended MHD Phenomena, in Horst Simon, editor, SciDAC 2009, 14-18 June 2009, San Diego, California, USA, volume 180 of Journal of Physics: Conference Series, page 012054, Institute of Physics, 2009, 6pp.
Samantha S. Foley, Wael R. Elwasif, Aniruddha G. Shet, David E. Bernholdt, and Randall Bramley, Incorporating Concurrent Component Execution in Loosely Coupled Integrated Fusion Plasma Simulation, in Component-Based High-Performance Computing (CBHPC) 2008, 2008, (extended abstract).
D. Batchelor, C. Alba, G. Bateman, D. Bernholdt, L. Berry, P. Bonoli, R. Bramley, J. Breslau, M. Chance, J. Chen, M. Choi, W. Elwasif, G. Fu, R. Harvey, E. Jaeger, S. Jardin, T. Jenkins, D. Keyes, S. Klasky, S. Kruger, L. Ku, V. Lynch, D. McCune, J. Ramos, D. Schissel, D. Schnack, and J. Wright, Simulation of Wave Interactions with MHD, in Rick Stevens, editor, SciDAC 2008, 14-17 July 2008, Washington, USA, volume 125 of Journal of Physics: Conference Series, page 012039, Institute of Physics, 2008.
Wael R. Elwasif, David E. Bernholdt, Lee A. Berry, and Don B. Batchelor, Component Framework for Coupled Integrated Fusion Plasma Simulation, in HPC-GECO/CompFrame 2007, 21-22 October, Montreal, Quebec, Canada, 2007.
- Authors
Wael R. Elwasif, Samantha Foley, Aniruddha G. Shet
- Organization
Center for Simulation of RF Wave Interactions with Magnetohydrodynamics
Framework¶
- class ipsframework.ips.Framework(config_file_list, log_file_name, platform_file_name=None, debug=False, verbose_debug=False, cmd_nodes=0, cmd_ppn=0)¶
Create an IPS Framework Instance to coordinate the execution of IPS simulations
The Framework performs the following main tasks:
Initialize the different IPS managers that perform the bulk of the framework functionality
Manage communication queues, and route service requests from simulation components to appropriate managers.
Provide logging services to IPS managers.
Perform shutdown procedure on exit
- Parameters
config_file_list (list) –
A list of simulation configuration files to be used in the simulaion. Each simulation configuration file must have the following parameters
SIM_ROOT The root directory for the simulation
SIM_NAME A name that identifies the simulation
LOG_FILE The name of a log file that is used to capture logging and error information for this simulation.
SIM_ROOT, SIM_NAME, and LOG_FILE must be unique across simulations.
log_file_name (str) – A file name where Framework logging messages are placed.
platform_file_name (str) – The name of the platform configuration file used in the simulation. If not specified it will try to find the one installed in the share directory.
debug (bool) – A flag indicating whether framework debugging messages are enabled (default = False)
verbose_debug (bool) – A flag adding more verbose framework debugging (default = False)
cmd_nodes (int) – Computer nodes (default = 0)
cmd_ppn (int) – Computer processor per nodes (default = 0)
- critical(msg, *args)¶
Produce critical message in simulation log file. See
logging.critical()
for usage.
- debug(msg, *args)¶
Produce debugging message in simulation log file. See
logging.debug()
for usage.
- error(msg, *args)¶
Produce error message in simulation log file. See
logging.error()
for usage.
- exception(msg, *args)¶
Produce exception message in simulation log file. See
logging.exception()
for usage.
- get_inq()¶
- Returns
handle to the Framework’s input queue object
- Return type
- info(msg, *args)¶
Produce informational message in simulation log file. See
logging.info()
for usage.
- initiate_new_simulation(sim_name)¶
This is to be called by the configuration manager as part of dynamically creating a new simulation. The purpose here is to initiate the method invocations for the framework-visible components in the new simulation
- log(msg, *args)¶
Wrapper for
Framework.info()
.
- register_service_handler(service_list, handler)¶
Register a call back method to handle a list of framework service invocations.
- Parameters
service_list – a list of service names to call handler when invoked by components. The service name must match the target_method parameter in
messages.ServiceRequestMessage
.handler – a Python callable object that takes a
messages.ServiceRequestMessage
.
- run()¶
Run the communication outer loop of the framework.
This method implements the core communication and message dispatch functionality of the framework. The main phases of execution for the framework are:
Invoke the
init
method on all framework-attached components, blocking pending method call termination.Generate method invocation messages for the remaining public method in the framework-centric components (i.e.
step
andfinalize
).Generate a queue of method invocation messages for all public framework accessible components in the simulations being run. framework-accessible components are made up of the Init component (if is exists), and the Driver component. The generated messages invoke the public methods
init
,step
, andfinalize
.Dispatch method invocations for each framework-centric component and physics simulation in order.
Exceptions that propagate to this method from the managed simulations causes the framework to abort any pending method invocation for the source simulation. Exceptions from framework-centeric component aborts further invocations to that component.
When all method invocations have been dispatched (or aborted),
configurationManager.ConfigurationManager.terminate_sim()
is called to trigger normal termination of all component processes.- Returns
Success status
- Return type
- send_terminate_msg(sim_name, status=0)¶
This method remotely invokes the method
component.Component.terminate()
on all componnets in the IPS simulationsim_name
.- Parameters
sim_name (str) – The simulation name from which all the components are terminated
status (
messages.Message.SUCCESS
,messages.Message.FAILURE
) – message status, defaults tomessages.Message.SUCCESS
- terminate_all_sims(status=0)¶
Terminate all active component instances.
This method remotely invokes the method
component.Component.terminate()
on all componnets in the IPS simulation.- Parameters
status (
messages.Message.SUCCESS
,messages.Message.FAILURE
) – message status, defaults tomessages.Message.SUCCESS
- warning(msg, *args)¶
Produce warning message in simulation log file. See
logging.warning()
for usage.
Data Manager¶
- class ipsframework.dataManager.DataManager(fwk)¶
The data manager facilitates the movement and exchange of data files for the simulation.
- merge_current_plasma_state(msg)¶
Merge partial plasma state file with global master. Newly updated plasma state copied to caller’s workdir. Exception raised on copy error.
msg.args:
partial_state_file
target_state_file
log_file: stdout for merge process if not
None
- process_service_request(msg)¶
Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.
- stage_state(msg)¶
Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.
msg.args:
state_files
source_dir
target_dir
- update_state(msg)¶
Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.
msg.args:
state_files
source_dir
target_dir
Task Manager¶
- class ipsframework.taskManager.TaskInit(nproc, binary, working_dir, tppn, tcpp, tgpp, block, omp, wnodes, wsocks, cmd_args, launch_cmd_extra_args)¶
- binary¶
Alias for field number 1
- block¶
Alias for field number 6
- cmd_args¶
Alias for field number 10
- launch_cmd_extra_args¶
Alias for field number 11
- nproc¶
Alias for field number 0
- omp¶
Alias for field number 7
- tcpp¶
Alias for field number 4
- tgpp¶
Alias for field number 5
- tppn¶
Alias for field number 3
- wnodes¶
Alias for field number 8
- working_dir¶
Alias for field number 2
- wsocks¶
Alias for field number 9
- class ipsframework.taskManager.TaskManager(fwk)¶
The task manager is responsible for facilitating component method invocations, and the launching of tasks.
- build_launch_cmd(nproc, binary, cmd_args, working_dir, ppn, max_ppn, nodes, accurateNodes, partial_nodes, task_id, cpp=0, omp=False, gpp=0, core_list='', launch_cmd_extra_args=None)¶
Construct task launch command to be executed by the component.
nproc - number of processes to use
binary - binary to launch
cmd_args - additional command line arguments for the binary
working_dir - full path to where the executable will be launched
ppn - processes per node value to use
max_ppn - maximum possible ppn for this allocation
nodes - comma separated list of node ids
accurateNodes - if
True
, launch on nodes in nodes, otherwise the parallel launcher determines the process placement- partial_nodes - if
True
and accurateNodes and task_launch_cmd == ‘mpirun’, a host file is created specifying the exact placement of processes on cores.
- partial_nodes - if
core_list - used for creating host file with process to core mappings
- finish_task(finish_task_msg)¶
Cleanup after a task launched by a component terminates
finish_task_msg is expected to be of type
messages.ServiceRequestMessage
Message args:
task_id: task id of finished task
task_data: return code of task
- get_call_id()¶
Return a new call id
- get_task_id()¶
Return a new task id
- init_call(init_call_msg, manage_return=True)¶
Creates and sends a
messages.MethodInvokeMessage
from the calling component to the target component. If manage_return isTrue
, a record is added to outstanding_calls. Return call id.Message args:
method_name
+ arguments to be passed on as method arguments.
- init_task(init_task_msg)¶
Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component. Return launch command to component via
messages.ServiceResponseMessage
. Raise exception if task can not be launched at this time (ipsExceptions.BadResourceRequestException
,ipsExceptions.InsufficientResourcesException
).init_task_msg is expected to be of type
messages.ServiceRequestMessage
Message args:
nproc: number of processes the task needs
binary: full path to the executable to launch
# SIMYAN: added this to deal with the component directory change 2. working_dir: full path to directory where the task will be launched
tppn: processes per node for this task. (0 indicates that the default ppn is used.)
block: whether or not to wait until the task can be launched.
wnodes:
True
for whole node allocation,False
otherwise.wsocks:
True
for whole socket allocation,False
otherwise.+ cmd_args: any arguments for the executable
- init_task_pool(init_task_msg)¶
Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component.
init_task_msg is expected to be of type
messages.ServiceRequestMessage
Message args:
task_dict: dictionary of task names and objects
- initialize(data_mgr, resource_mgr, config_mgr)¶
Initialize references to other managers and key values from configuration manager.
- printCurrTaskTable()¶
Prints the task table pretty-like.
- process_service_request(msg)¶
Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.
- return_call(response_msg)¶
Handle the response message generated by a component in response to a method invocation on that component.
reponse_msg is expected to be of type
messages.MethodResultMessage
- wait_call(wait_msg)¶
Determine if the call has finished. If finished, return any data or errors. If not finished raise the appropriate blocking or nonblocking exception and try again later.
wait_msg is expected to be of type
messages.ServiceRequestMessage
Message args:
call_id: call id for which to wait
blocking: determines the wait is blocking or not
Resource Manager¶
- class ipsframework.resourceManager.Allocation(partial_node, nodelist, corelist, ppn, max_ppn, cpp, accurateNodes, cores_allocated)¶
- accurateNodes¶
Alias for field number 6
- corelist¶
Alias for field number 2
- cores_allocated¶
Alias for field number 7
- cpp¶
Alias for field number 5
- max_ppn¶
Alias for field number 4
- nodelist¶
Alias for field number 1
- partial_node¶
Alias for field number 0
- ppn¶
Alias for field number 3
- class ipsframework.resourceManager.ResourceManager(fwk)¶
The resource manager is responsible for detecting the resources allocated to the framework, allocating resources to task requests, and maintaining the associated bookkeeping.
- add_nodes(listOfNodes)¶
Add node entries to
self.nodes
. Typically used byinitialize()
to initializeself.nodes
. May be used to add nodes to a dynamic allocation in the future.listOfNodes is a list of tuples (node name, cores).
self.nodes
is a dictionary where the keys are the node names and the values arenode_structure.Node
structures.Return total number of cores.
- begin_RM_report()¶
Print header information for resource usage reporting file.
- check_core_cap(nproc, ppn)¶
Determine if it is currently possible to allocate nproc processes with a ppn of ppn without further restrictions.. Return
True
and list of nodes to use if successful. ReturnFalse
and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.
- check_gpus(ppn, task_gpp)¶
- check_whole_node_cap(nproc, ppn)¶
Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole nodes. Return
True
and list of nodes to use if successful. ReturnFalse
and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.
- check_whole_sock_cap(nproc, ppn)¶
Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole sockets. Return
True
and list of nodes to use if successful. ReturnFalse
and empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.
- get_allocation(comp_id, nproc, task_id, whole_nodes, whole_socks, task_ppn=0, task_cpp=0, task_gpp=0)¶
Traverse available nodes to return:
If whole_nodes is
True
:shared_nodes:
False
nodes: list of node names
ppn: processes per node for launching the task
max_ppn: processes that can be launched
accurateNodes:
True
if nodes uses the actual names of the nodes,False
otherwise.
If whole_nodes is
False
:shared_nodes:
True
nodes: list of node names
- node_file_entries: list of (node, corelist) tuples, where corelist is a list of core names.
Core names are integers from 0 to n-1 where n is the number of cores on a node.
ppn: processes per node for launching the task
max_ppn: processes that can be launched
accurateNodes:
True
if nodes uses the actual names of the nodes,False
otherwise.
Arguments:
nproc: the number of requested processes (int)
comp_id: component identifier, must be unique with respect to the framework (string)
task_id: task identifier from TM (int)
method: name of method (string)
task_ppn: ppn for this task (optional) (int)
- initialize(dataMngr, taskMngr, configMngr, cmd_nodes=0, cmd_ppn=0)¶
Initialize resource management structures, references to other managers (dataMngr, taskMngr, configMngr).
Resource information comes from the following in order of priority:
command line specification (cmd_nodes, cmd_ppn)
detection using parameters from platform config file
manual settings from platform config file
The second two sources are obtained through
resourceHelper.getResourceList()
.
- printRMState()¶
Print the node tree to
stdout
.
- process_service_request(msg)¶
- release_allocation(task_id, status)¶
Set resources allocated to task task_id to available. status is not used, but may be used to correlate resource failures to task failures and implement task relaunch strategies.
- report_RM_status(notes='')¶
Print current RM status to the reporting_file (“resource_usage”) Entries consist of:
time in seconds since beginning of time (__init__ of RM)
# cores that are available
# cores that are allocated
% allocated cores
# processes launched by task
% cores used by processes
notes (a description of the event that changed the resource usage)
- sendEvent(eventName, info)¶
wrapper for constructing and publishing EM events
- class ipsframework.node_structure.Node(name, socks, cores, p)¶
Models a node in the allocation.
name: name of node, typically actual name from resource detection phase.
task_ids, owners: identifiers for the tasks and components that are currently using the node.
allocated, available: list of sockets that have cores allocated and available. A socket may appear in both lists if it is only partially allocated.
sockets: list of sockets belonging to this node
avail_cores: number of cores that are currently available.
total_cores: total number of cores that can be allocated on this node.
status: indicates if the node is ‘UP’ or ‘DOWN’. Currently not used, all nodes are considered functional..
- allocate(whole_nodes, whole_sockets, tid, o, procs)¶
Mark procs number of cores as allocated subject to the values of whole_nodes and whole_sockets. Return the number of cores allocated and their corresponding slots, a list of strings of the form:
<socket name>:<core name>
- print_sockets(fname='')¶
Pretty print of state of sockets.
- release(tid, o)¶
Mark cores used by task tid and component o as available. Return the number of cores released.
- class ipsframework.node_structure.Socket(name, cps, coreids=())¶
Models a socket in a node.
name: identifier for the socket
task_ids, owners: identifiers for the tasks and components that are currently using the socket.
allocated, available: lists of cores that are allocated and available.
cores: list of
Core
objects belonging to this socketavail_cores: number of cores that are currently available.
total_cores: total number of cores that can be allocated on this socket.
- allocate(whole, tid, o, num_procs)¶
Mark num_procs cores as allocated subject to the value of whole. Return a list of strings of the form:
<socket name>:<core name>
- print_cores(fname='')¶
Pretty print of state of cores.
- release(tid)¶
Mark cores that are allocated to task tid as available. Return number of cores set to available.
- class ipsframework.node_structure.Core(name)¶
Models a core of a socket.
name: name of core
is_available: boolean value indicating the availability of the core.
task_id, owner: identifiers of the task and component using the core.
- allocate(tid, o)¶
Mark core as allocated.
- release()¶
Mark core as available.
The Resource Helper file contains all of the code needed to figure out what host we are on and what resources we have. Taking this out of the resource manager will allow us to test it independent of the IPS.
- ipsframework.resourceHelper.getResourceList(services, host, partial_nodes=False)¶
Using the host information, the resources are detected. Return list of (<node name>, <processes per node>), cores per node, sockets per node, processes per node, and
True
if the node names are accurate,False
otherwise.
- ipsframework.resourceHelper.get_checkjob_info()¶
- ipsframework.resourceHelper.get_pbs_info()¶
Access info about allocation from PBS environment variables:
PBS_NNODES
PBS_NODEFILE
- ipsframework.resourceHelper.get_qstat_jobinfo()¶
Use
qstat -f $PBS_JOBID
to get the number of nodes and ppn of the allocation. Typically works on PBS systems.
- ipsframework.resourceHelper.get_qstat_jobinfo2()¶
A second way to use
qstat -f $PBS_JOBID
to get the number of nodes and ppn of the allocation. Typically works on PBS systems.
- ipsframework.resourceHelper.get_slurm_info()¶
Access environment variables set by Slurm to get the node names, tasks per node and number of processes.
SLURM_NODELIST
SLURM_TASKS_PER_NODE
orSLURM_JOB_TASKS_PER_NODE
SLURM_NPROC
- ipsframework.resourceHelper.manual_detection(services)¶
Use values listed in platform configuration file.
Component¶
IPS Framework Component
- class ipsframework.component.Component(services, config)¶
Base class for all IPS components. Common set up, connection and invocation actions are implemented here.
- Parameters
services (
ServicesProxy
) – service proxy to communicate with frameworkconfig (dict) – configuration dictionary for this component
- property args¶
- property call_id¶
- checkpoint(timestamp=0.0, **keywords)¶
Produce some default debugging information before the rest of the code is executed.
- property component_id¶
- property config¶
- finalize(timestamp=0.0, **keywords)¶
Produce some default debugging information before the rest of the code is executed.
- init(timestamp=0.0, **keywords)¶
Produce some default debugging information before the rest of the code is executed.
- property method_name¶
- restart(timestamp=0.0, **keywords)¶
Produce some default debugging information before the rest of the code is executed.
- property services¶
- property start_time¶
- step(timestamp=0.0, **keywords)¶
Produce some default debugging information before the rest of the code is executed.
- terminate(status)¶
Clean up services and call
sys_exit
.
Component Registry¶
- class ipsframework.componentRegistry.ComponentID(class_name, sim_name)¶
Object to facilitate the creation, serialization and deserialization of component ids.
- all_ids = {}¶
- delimiter = '@'¶
- static deserialize(comp_id_string)¶
Return the deserialized version of the component id.
- get_class_name()¶
Return class name of component.
- get_instance_name()¶
Return instance name of component id.
- get_seq_num()¶
Return sequence number of component.
- get_serialization()¶
Return serialization.
- get_sim_name()¶
Return simulation name for the component.
- seq_num = 0¶
- class ipsframework.componentRegistry.ComponentRegistry(*args, **kwargs)¶
- class RegistryEntry(svc_response_q, invocation_q, component_ref, services, config)¶
Container for queues and references associated with a component.
- addEntry(component_id, svc_response_q, invocation_q, component_ref, services, config)¶
Create a component registry entry for component_id and its associated queues, component ref, services and configuration information.
- getComponentArtifact(component_id, artifact)¶
Return value of artifact in component_id’s registry entry.
- getEntry(component_id)¶
Return a registry entry.
- get_component_ids(sim_name)¶
Return all of the component ids associated with sim sim_name
- removeEntry(component_id)¶
- setComponentArtifact(component_id, artifact, value)¶
Set the value of artifact in component_id’s registry entry to value.
- class ipsframework.componentRegistry.SingletonMeta¶
Configuration Manager¶
- class ipsframework.configurationManager.ConfigurationManager(fwk, config_file_list, platform_file_name)¶
The configuration manager is responsible for paring the simulation and platform configuration files, creating the framework and simulation components, as well as providing an interface to accessing items from the configuration files (e.g., the time loop).
- class SimulationData(sim_name, start_time=1674857500.5921588)¶
Structure to hold simulation data stored into the sim_map entry in the configurationManager class
- create_simulation(sim_name, config_file, override, sub_workflow=False)¶
- get_all_simulation_components_map()¶
- get_all_simulation_sim_root()¶
- get_component_map()¶
Return a dictionary of simulation names and lists of component references. (May only be the driver, and init (if present)???)
- get_config_parameter(sim_name, param)¶
Return value of param from simulation configuration file for sim_name.
- get_framework_components()¶
Return list of framework components.
- get_platform_parameter(param, silent=False)¶
Return value of platform parameter param. If silent is
False
(default)None
is returned when param not found, otherwise an exception is raised.
- get_port(sim_name, port_name)¶
Return a reference to the component from simulation sim_name implementing port port_name.
- get_sim_names()¶
Return list of names of simulations.
- get_sim_parameter(sim_name, param)¶
Return value of param from simulation configuration file for sim_name.
- get_simulation_components(sim_name)¶
- initialize(data_mgr, resource_mgr, task_mgr)¶
Parse the platform and simulation configuration files using the
ConfigObj
module. Create and initialize simulation(s) and their components, framework components and loggers.
- process_service_request(msg)¶
Invokes public configuration manager method for a component. Return method’s return value.
- set_config_parameter(sim_name, param, value, target_sim_name)¶
Set the configuration parameter param to value value in target_sim_name. If target_sim_name is the framework, all simulations will get the change. Return value.
- terminate(status)¶
Terminates all processes attached to the framework. status not used.
- terminate_sim(sim_name)¶
Services¶
IPS Services
- class ipsframework.services.RunningTask(process, start_time, timeout, nproc, cores_allocated, command, binary, args)¶
- args¶
Alias for field number 7
- binary¶
Alias for field number 6
- command¶
Alias for field number 5
- cores_allocated¶
Alias for field number 4
- nproc¶
Alias for field number 3
- process¶
Alias for field number 0
- start_time¶
Alias for field number 1
- timeout¶
Alias for field number 2
- class ipsframework.services.ServicesProxy(fwk, fwk_in_q, svc_response_q, sim_conf, log_pipe_name)¶
The ServicesProxy object is responsible for marshalling invocations of framework services to the framework process using a shared queue. The queue is shared among all components in a simulation. The results from framework services invocations are received via another, component-specific “framework response” queue.
Create a new ServicesProxy object
- Parameters
fwk (
ipsframework.ips.Framework
) – Enclosing IPS simulation frameworkfwk_in_q (
multiprocessing.Queue
) – Framework input message queue - shared among all service objectssvc_response_q (
multiprocessing.Queue
) – Service response message queue - one per service object.sim_conf (dict) – Simulation configuration dictionary, contains data from the simulation configuration file merged with the platform configuration file.
log_pipe_name (str) – Name of logging pipe for use by the IPS logging daemon.
- add_task(task_pool_name, task_name, nproc, working_dir, binary, *args, **keywords)¶
Add task task_name to task pool task_pool_name. Remaining arguments are the same as in
ServicesProxy.launch_task()
.
- call(component_id, method_name, *args, **keywords)¶
Invoke method method_name on component component_id with optional arguments *args. Will wait until call is finished. Return result from invoking the method.
- Parameters
component_id (
ComponentID
) – Component ID of requested componentmethod_name (str) – component method to call, e.g.
init
orstep
- Returns
service response message arguments
- call_nonblocking(component_id, method_name, *args, **keywords)¶
Invoke method method_name on component component_id with optional arguments *args. Will not wait until finished.
- Parameters
component_id (
ComponentID
) – Component ID of requested componentmethod_name (str) – component method to call, e.g.
init
orstep
- Returns
call_id
- Return type
- checkpoint_components(comp_id_list, time_stamp, Force=False, Protect=False)¶
Selectively checkpoint components in comp_id_list based on the configuration section CHECKPOINT. If Force is
True
, the checkpoint will be taken even if the conditions for taking the checkpoint are not met. If Protect isTrue
, then the data from the checkpoint is protected from clean up. Force and Protect are optional and default toFalse
.The CHECKPOINT_MODE option controls determines if the components checkpoint methods are invoked.
Possible MODE options are:
- ALL:
Checkpint every time the call is made (equivalent to always setting Force =True)
- WALLTIME_REGULAR:
checkpoints are saved upon invocation of the service call
checkpoint_components()
, when a time interval greater than, or equal to, the value of the configuration parameter WALLTIME_INTERVAL had passed since the last checkpoint. A checkpoint is assumed to have happened (but not actually stored) when the simulation starts. Calls tocheckpoint_components()
before WALLTIME_INTERVAL seconds have passed since the last successful checkpoint result in a NOOP.- WALLTIME_EXPLICIT:
checkpoints are saved when the simulation wall clock time exceeds one of the (ordered) list of time values (in seconds) specified in the variable WALLTIME_VALUES. Let [t_0, t_1, …, t_n] be the list of wall clock time values specified in the configuration parameter WALLTIME_VALUES. Then checkpoint(T) = True if T >= t_j, for some j in [0,n] and there is no other time T_1, with T > T_1 >= T_j such that checkpoint(T_1) = True. If the test fails, the call results in a NOOP.
- PHYSTIME_REGULAR:
checkpoints are saved at regularly spaced “physics time” intervals, specified in the configuration parameter PHYSTIME_INTERVAL. Let PHYSTIME_INTERVAL = PTI, and the physics time stamp argument in the call to checkpoint_components() be pts_i, with i = 0, 1, 2, … Then checkpoint(pts_i) = True if pts_i >= n PTI , for some n in 1, 2, 3, … and pts_i - pts_prev >= PTI, where checkpoint(pts_prev) = True and pts_prev = max (pts_0, pts_1, ..pts_i-1). If the test fails, the call results in a NOOP.
- PHYSTIME_EXPLICIT:
checkpoints are saved when the physics time equals or exceeds one of the (ordered) list of physics time values (in seconds) specified in the variable PHYSTIME_VALUES. Let [pt_0, pt_1, …, pt_n] be the list of physics time values specified in the configuration parameter PHYSTIME_VALUES. Then checkpoint(pt) = True if pt >= pt_j, for some j in [0,n] and there is no other physics time pt_k, with pt > pt_k >= pt_j such that checkpoint(pt_k) = True. If the test fails, the call results in a NOOP.
The configuration parameter NUM_CHECKPOINT controls how many checkpoints to keep on disk. Checkpoints are deleted in a FIFO manner, based on their creation time. Possible values of NUM_CHECKPOINT are:
NUM_CHECKPOINT = n, with n > 0 –> Keep the most recent n checkpoints
NUM_CHECKPOINT = 0 –> No checkpoints are made/kept (except when Force =
True
)NUM_CHECKPOINT < 0 –> Keep ALL checkpoints
Checkpoints are saved in the directory
${SIM_ROOT}/restart
- cleanup()¶
Clean up any state from the services. Called by the terminate method in the base class for components.
- create_simulation(config_file, override)¶
Create simulation
- create_sub_workflow(sub_name, config_file, override=None, input_dir=None)¶
Create sub-workflow
- create_task_pool(task_pool_name)¶
Create an empty pool of tasks with the name task_pool_name. Raise exception if duplicate name.
- critical(msg, *args)¶
Produce critical message in simulation log file. See
logging.critical()
for usage.
- debug(msg, *args)¶
Produce debugging message in simulation log file. See
logging.debug()
for usage.
- error(msg, *args)¶
Produce error message in simulation log file. See
logging.error()
for usage.
- exception(msg, *args)¶
Produce exception message in simulation log file. See
logging.exception()
for usage.
- get_config_param(param, silent=False)¶
Return the value of the configuration parameter
param
. Raise exception if not found and silent is False.
- get_finished_tasks(task_pool_name)¶
Return dictionary of finished tasks and return values in task pool task_pool_name. Raise exception if no active or finished tasks.
- get_port(port_name)¶
- Parameters
port_name (str) – port name
- Returns
Return a reference to the component implementing port port_name.
- Return type
- get_restart_files(restart_root, timeStamp, file_list)¶
Copy files needed for component restart from the restart directory:
<restart_root>/restart/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}
to the component’s work directory.
Copying errors are not fatal (exception raised).
- get_time_loop()¶
Return the list of times as specified in the configuration file.
- Returns
list of times
- Return type
list of float
- get_working_dir()¶
Return the working directory of the calling component.
The structure of the working directory is defined using the configuration parameters CLASS, SUB_CLASS, and NAME of the component configuration section. The structure of the working directory is:
${SIM_ROOT}/work/$CLASS_${SUB_CLASS}_$NAME_<instance_num>
- Returns
working directory
- Return type
- info(msg, *args)¶
Produce informational message in simulation log file. See
logging.info()
for usage.
- kill_all_tasks()¶
Kill all tasks associated with this component.
- kill_task(task_id)¶
Kill launched task task_id. Return if successful. Raises exceptions if the task or process cannot be found or killed successfully.
- launch_task(nproc, working_dir, binary, *args, **keywords)¶
Launch binary in working_dir on nproc processes. *args are any arguments to be passed to the binary on the command line. **keywords are any keyword arguments used by the framework to manage how the binary is launched. Keywords may be the following:
task_ppn : the processes per node value for this task
task_cpp : the cores per process, only used when
MPIRUN=srun
commandstask_gpp : the gpus per process, only used when
MPIRUN=srun
commands- ompIf
True
the task will be launch with the correct OpenMP environment variables set, only used when
MPIRUN=srun
- ompIf
block : specifies that this task will block (or raise an exception) if not enough resources are available to run immediately. If
True
, the task will be retried until it runs. IfFalse
, an exception is raised indicating that there are not enough resources, but it is possible to eventually run. (default =True
)tag : identifier for the portal. May be used to group related tasks.
logfile : file name for
stdout
(andstderr
) to be redirected to for this task. By defaultstderr
is redirected tostdout
, andstdout
is not redirected.whole_nodes : if
True
, the task will be given exclusive access to any nodes it is assigned. IfFalse
, the task may be assigned nodes that other tasks are using or may use.whole_sockets : if
True
, the task will be given exclusive access to any sockets of nodes it is assigned. IfFalse
, the task may be assigned sockets that other tasks are using or may use.launch_cmd_extra_args : extra command arguments added the the MPIRUN command
Return task_id if successful. May raise exceptions related to opening the logfile, being unable to obtain enough resources to launch the task (
InsufficientResourcesException
), bad task launch request (ResourceRequestMismatchException
,BadResourceRequestException
) or problems executing the command. These exceptions may be used to retry launching the task as appropriate.Note
This is a nonblocking function, users must use a version of
ServicesProxy.wait_task()
to get result.
- launch_task_pool(task_pool_name, launch_interval=0.0)¶
Construct messages to task manager to launch each task in task pool. Used by
TaskPool
to launch tasks in a task_pool.
- log(msg, *args)¶
Wrapper for
ServicesProxy.info()
.
- merge_current_state(partial_state_file, logfile=None, merge_binary=None)¶
Merge partial plasma state with global state. Partial plasma state contains only the values that the component contributes to the simulation. Raise exceptions on bad merge. Optional logfile will capture
stdout
from merge. Optional merge_binary specifies path to executable code to do the merge (default value : “update_state”)
- process_events()¶
Poll for events on subscribed topics.
- publish(topicName, eventName, eventBody)¶
Publish event consisting of eventName and eventBody to topic topicName to the IPS event service.
- remove_task_pool(task_pool_name)¶
Kill all running tasks, clean up all finished tasks, and delete task pool.
- save_restart_files(timeStamp, file_list)¶
Copy files needed for component restart to the restart directory:
${SIM_ROOT}/restart/$timestamp/components/$CLASS_${SUB_CLASS}_$NAME
Copying errors are not fatal (exception raised).
- send_portal_event(event_type='COMPONENT_EVENT', event_comment='', event_time=None, elapsed_time=None)¶
Send event to web portal.
- setMonitorURL(url='')¶
Send event to portal setting the URL where the monitor component will put data.
- set_config_param(param, value, target_sim_name=None)¶
Set configuration parameter param to value. Raise exceptions if the parameter cannot be changed or if there are problems setting the value. This tell the framework to call
ipsframework.configurationManager.ConfigurationManager.set_config_parameter()
to change the parameter.- Parameters
param (str) – The parameter requested from simulation config
value – The value to set the parameter
- Returns
return value from setting parameter
- stage_input_files(input_file_list)¶
Copy component input files to the component working directory (as obtained via a call to
ServicesProxy.get_working_dir()
). Input files are assumed to be originally located in the directory variable INPUT_DIR in the component configuration section.File are copied using
ipsframework.ipsutil.copyFiles()
.- Parameters
input_file_list (str or Iterable of str) – input files can space separated string or iterable of strings
- stage_output_files(timeStamp, file_list, keep_old_files=True, save_plasma_state=True)¶
Copy associated component output files (from the working directory) to the component simulation results directory. Output files are prefixed with the configuration parameter OUTPUT_PREFIX. The simulation results directory has the format:
${SIM_ROOT}/simulation_results/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}
Additionally, plasma state files are archived for debugging purposes:
${SIM_ROOT}/history/plasma_state/<file_name>_$CLASS_${SUB_CLASS}_$NAME_<timeStamp>
Copying errors are not fatal (exception raised).
- stage_state(state_files=None)¶
Copy current state to work directory.
- stage_subflow_output_files(subflow_name='ALL')¶
Gather outputs from sub-workflows. Sub-workflow output is defined to be the output files from its DRIVER component as they exist in the sub-workflow driver’s work area at the end of the sub-simulation. If subflow_name != ‘ALL’ then get output from only that sub-flow
- submit_tasks(task_pool_name, block=True, use_dask=False, dask_nodes=1, dask_ppw=None, launch_interval=0.0, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False)¶
Launch all unfinished tasks in task pool task_pool_name. If block is
True
, return when all tasks have been launched. If block isFalse
, return when all tasks that can be launched immediately have been launched. Return number of tasks submitted.Optionally, dask can be used to schedule and run the task pool.
- subscribe(topicName, callback)¶
Subscribe to topic topicName on the IPS event service and register callback as the method to be invoked when an event is published to that topic.
- unsubscribe(topicName)¶
Remove subscription to topic topicName.
- update_state(state_files=None)¶
Copy local (updated) state to global state. If no state files are specified, component configuration specification is used. Raise exceptions upon copy.
- update_time_stamp(new_time_stamp=- 1)¶
Update time stamp on portal.
- wait_call(call_id, block=True)¶
If block is
True
, return when the call has completed with the return code from the call. If block isFalse
, raiseIncompleteCallException
if the call has not completed, and the return value is it has.- Parameters
call_id (int) – call ID
- Returns
service response message arguments
- wait_call_list(call_id_list, block=True)¶
Check the status of each of the call in call_id_list. If block is
True
, return when all calls are finished. If block isFalse
, raiseIncompleteCallException
if any of the calls have not completed, otherwise return. The return value is a dictionary of call_ids and return values.- Parameters
call_id_list (list of int) – list of call ID’s
- Returns
dict of call_id and return value
- Return type
- wait_task(task_id, timeout=- 1, delay=1)¶
Check the status of task task_id. Return the return value of the task when finished successfully. Raise exceptions if the task is not found, or if there are problems finalizing the task.
- wait_task_nonblocking(task_id)¶
Check the status of task task_id. If it has finished, the return value is populated with the actual value, otherwise
None
is returned. A KeyError exception may be raised if the task is not found.- Parameters
task_id (int) – task ID (PID)
- Returns
return value of task if finished else None
- wait_tasklist(task_id_list, block=True)¶
Check the status of a list of tasks. If
block
isTrue
, return a dictionary of return values when all tasks have completed. Ifblock
isFalse
, return a dictionary containing entries for each completed task. Note that the dictionary may be empty. RaiseKeyError
exception iftask_id
not found.
- warning(msg, *args)¶
Produce warning message in simulation log file. See
logging.warning()
for usage.
- class ipsframework.services.Task(task_name, nproc, working_dir, binary, *args, **keywords)¶
Container for task information:
- Parameters
name (str) – task name
nproc (int) – number of processes the task needs
working_dir (str) – location to launch task from
binary (str) – full path to executable to launch
*args – arguments for binary
**keywords – keyword arguments for launching the task. See
ServicesProxy.launch_task()
for details.
- class ipsframework.services.TaskPool(name, services)¶
Class to contain and manage a pool of tasks.
- add_task(task_name, nproc, working_dir, binary, *args, **keywords)¶
Create
Task
object and add to queued_tasks of the task pool. Raise exception if task name already exists in task pool.
- get_dask_finished_tasks_status()¶
Return a dictionary of exit status values for all dask tasks that have finished since the last time finished tasks were polled.
- Returns
dict mapping task name to exit status
- Return type
- get_finished_tasks_status()¶
Return a dictionary of exit status values for all tasks that have finished since the last time finished tasks were polled.
- Returns
dict mapping task name to exit status
- Return type
- submit_dask_tasks(block=True, dask_nodes=1, dask_ppw=None, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False)¶
Launch tasks in queued_tasks using dask.
One dask worker will be started for each node unless dask_worker_per_gpu is True where one dask worker will be started for every GPU. So dask_node times GPUS_PER_NODE workers will be started.
- Parameters
block (bool) – Unused, this will always return after tasks are submitted
dask_nodes (int) – Number of task nodes, default 1
dask_ppw (int) – Number of processes per dask worker, default is PROCS_PER_NODE
use_shifter (bool) – Option to launch dask scheduler and workers in shifter container
dask_worker_plugin (distributed.diagnostics.plugin.WorkerPlugin) – If provided this will be registered as a worker plugin with the dask client
dask_worker_per_gpu (bool) – If true then a separate worker will be started for each GPU and binded to that GPU
- submit_tasks(block=True, use_dask=False, dask_nodes=1, dask_ppw=None, launch_interval=0.0, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False)¶
Launch tasks in queued_tasks. Finished tasks are handled before launching new ones. If block is
True
, the number of tasks submitted is returned after all tasks have been launched and completed. If block isFalse
the number of tasks that can immediately be launched is returned.If
use_dask==True
then the tasks are launched withsubmit_dask_tasks()
. One dask worker will be started for each node unless dask_worker_per_gpu is True where one dask worker will be started for every GPU. So dask_node times GPUS_PER_NODE workers will be started.- Parameters
block (bool) – If True then wait for task to complete, default True
use_dask (bool) – If True then use dask to launch tasks, default False
dask_nodes (int) – Number of task nodes, only used it
use_dask==True
dask_ppw (int) – Number of processes per dask worker, default is PROCS_PER_NODE, only used it
use_dask==True
launch_internal (float) – time to wait between launching tasks, default 0.0
use_shifter (bool) – Option to launch dask scheduler and workers in shifter container
shifter_args (str) – Optional arguments added to shifter when launching dask scheduler and workers
dask_worker_plugin (distributed.diagnostics.plugin.WorkerPlugin) – If provided this will be registered as a worker plugin with the dask client
dask_worker_per_gpu (bool) – If true then a separate worker will be started for each GPU and binded to that GPU
- terminate_tasks()¶
Kill all active tasks, clear all queued, blocked and finished tasks.
- ipsframework.services.launch(binary, task_name, working_dir, *args, **keywords)¶
This is used by
TaskPool.submit_dask_tasks()
as the input todask.distributed.Client.submit()
.
Other Utilities¶
IPS Exceptions¶
- exception ipsframework.ipsExceptions.BadResourceRequestException(caller_id, tid, request, deficit)¶
Exception raised by the resource manager when a component requests a quantity of resources that can never be satisfied during a get_allocation() call
- exception ipsframework.ipsExceptions.BlockedMessageException(msg, reason)¶
Exception Raised by the any manager when a blocking service invocation is made, and the invocation result is not readily available.
- exception ipsframework.ipsExceptions.GPUResourceRequestMismatchException(caller_id, tid, ppn, gpp, max_gpp)¶
Exception raised by the resource manager when it is possible to launch the requested number of GPUs per task
- exception ipsframework.ipsExceptions.IncompleteCallException(callID)¶
Exception Raised by the taskManager when a nonblocking wait_call() method is invoked before the call has finished.
- exception ipsframework.ipsExceptions.InsufficientResourcesException(caller_id, tid, request, deficit)¶
Exception Raised by the resource manager when not enough resources are available to satisfy an allocate() call
- exception ipsframework.ipsExceptions.InvalidResourceSettingsException(t, spn, cpn)¶
Exception raised by the resource helper to indicate inconsistent resource settings.
- exception ipsframework.ipsExceptions.ResourceRequestMismatchException(caller_id, tid, nproc, ppn, max_procs, max_ppn)¶
Exception raised by the resource manager when it is possible to launch the requested number of processes, but not on the requested number of processes per node.
- exception ipsframework.ipsExceptions.ResourceRequestUnequalPartitioningException(caller_id, tid, nproc, ppn, max_procs, max_ppn)¶
Exception raised by the resource manager when it is possible to launch the requested number of processes, but the requested number of processes and processes per node will result in unequal partitioning of nodes.
IPS Utilities¶
- ipsframework.ipsutil.copyFiles(src_dir, src_file_list, target_dir, prefix='', keep_old=False)¶
Copy files in src_file_list from src_dir to target_dir with an optional prefix. If keep_old is
True
, existing files in target_dir will not be overridden, otherwise files can be clobbered (default). Wild-cards in file name specification are allowed.
- ipsframework.ipsutil.getTimeString(timeArg=None)¶
Return a string representation of timeArg. timeArg is expected to be an appropriate object to be processed by
time.strftime()
. If timeArg isNone
, current time is used.
- ipsframework.ipsutil.which(program, alt_paths=None)¶
- class ipsframework.messages.Message(sender_id, receiver_id)¶
Base class for all IPS messages. Should not be used in actual communication.
- FAILURE = 1¶
- SUCCESS = 0¶
- counter = 0¶
- delimiter = ''¶
- get_message_id()¶
- identifier = 'MESSAGE'¶
- class ipsframework.messages.MethodInvokeMessage(sender_id, receiver_id, call_id, target_method, *args, **keywords)¶
Message used by components to invoke methods on other components.
sender_id: component id of the sender
receiver_id: component id of the receiver
call_id: identifier of the call (generated by caller)
target_method: method to be invoked on the receiver
*args: arguments to be passed to the target_method
- counter = 0¶
- delimiter = '|'¶
- identifier = 'INVOKE'¶
- class ipsframework.messages.MethodResultMessage(sender_id, receiver_id, call_id, status, *args)¶
Message used to relay the return value after a method invocation.
sender_id: component id of the sender (callee)
receiver_id: component id of the receiver (caller)
call_id: identifier of the call (generated by caller)
status: either Message.SUCCESS or Message.FAILURE indicating the success of failure of the invocation.
*args: other information to be passed back to the caller.
- counter = 0¶
- delimiter = '|'¶
- identifier = 'RESULT'¶
- class ipsframework.messages.ServiceRequestMessage(sender_id, receiver_id, target_comp_id, target_method, *args, **keywords)¶
Message used by components to request the result of a service action by one of the IPS managers.
sender_id: component id of the sender
receiver_id: component id of the receiver (framework)
target_comp_id: component id of target component (typically framework)
target_method: name of method to be invoked on component target_comp_id
*args: any number of arguments. These are specific to the target method.
- counter = 0¶
- delimiter = '|'¶
- identifier = 'REQUEST'¶
- class ipsframework.messages.ServiceResponseMessage(sender_id, receiver_id, request_msg_id, status, *args)¶
Message used by managers to respond with the result of the service action to the calling component.
sender_id: component id of the sender (framework)
receiver_id: component id of the receiver (calling component)
request_msg_id: id of request message this is a response to.
status: either Message.SUCCESS or Message.FAILURE
*args: any number of arguments. These are specific to type of response.
- counter = 0¶
- delimiter = '|'¶
- identifier = 'RESPONSE'¶
Framework Components¶
- class ipsframework.portalBridge.PortalBridge(services, config)¶
Framework component to communicate with the SWIM web portal.
- class SimulationData¶
Container for simulation data.
- check_send_post_responses()¶
- finalize(timestamp=0.0, **keywords)¶
Produce some default debugging information before the rest of the code is executed.
- init(timestamp=0.0, **keywords)¶
Try to connect to the portal, subscribe to _IPS_MONITOR events and register callback
process_event()
.
- init_simulation(sim_name, sim_root)¶
Create and send information about simulation sim_name living in sim_root so the portal can set up corresponding structures to manage data from the sim.
- process_event(topicName, theEvent)¶
Process a single event theEvent on topic topicName.
- send_event(sim_data, event_data)¶
Send contents of event_data and sim_data to portal.
- send_mpo_data(event_data, sim_data)¶
- step(timestamp=0.0, **keywords)¶
Poll for events.
- ipsframework.portalBridge.configure_mpo()¶
- ipsframework.portalBridge.hash_file(file_name)¶
Return the MD5 hash of a file :rtype: str :param file_name: Full path to file :return: MD5 of file_name
- ipsframework.portalBridge.send_post(conn, stop, url)¶
- class ipsframework.runspaceInitComponent.runspaceInitComponent(services, config)¶
Framework component to manage runspace initialization, container file management, and file staging for simulation and analysis runs.
- init(timestamp=0.0, **keywords)¶
Creates base directory, copies IPS and FacetsComposer input files.
- step(timestamp=0.0, **keywords)¶
Copies individual subcomponent input files into working subdirectories.