Code Listings
IPS
The Integrated Plasma Simulator (IPS) Framework. This framework enables loose, file-based coupling of certain class of nuclear fusion simulation codes.
For further design information see
Wael Elwasif, David E. Bernholdt, Aniruddha G. Shet, Samantha S. Foley, Randall Bramley, Donald B. Batchelor, and Lee A. Berry, The Design and Implementation of the SWIM Integrated Plasma Simulator, in The 18th Euromirco International Conference on Parallel, Distributed and Network - Based Computing (PDP 2010), 2010.
Samantha S. Foley, Wael R. Elwasif, David E. Bernholdt, Aniruddha G. Shet, and Randall Bramley, Extending the Concept of Component Interfaces: Experience with the Integrated Plasma Simulator, in Component - Based High - Performance Computing (CBHPC) 2009, 2009, (extended abstract).
D Batchelor, G Alba, E D’Azevedo, G Bateman, DE Bernholdt, L Berry, P Bonoli, R Bramley, J Breslau, M Chance, J Chen, M Choi, W Elwasif, S Foley, G Fu, R Harvey, E Jaeger, S Jardin, T Jenkins, D Keyes, S Klasky, S Kruger, L Ku, V Lynch, D McCune, J Ramos, D Schissel, D Schnack, and J Wright, Advances in Simulation of Wave Interactions with Extended MHD Phenomena, in Horst Simon, editor, SciDAC 2009, 14-18 June 2009, San Diego, California, USA, volume 180 of Journal of Physics: Conference Series, page 012054, Institute of Physics, 2009, 6pp.
Samantha S. Foley, Wael R. Elwasif, Aniruddha G. Shet, David E. Bernholdt, and Randall Bramley, Incorporating Concurrent Component Execution in Loosely Coupled Integrated Fusion Plasma Simulation, in Component-Based High-Performance Computing (CBHPC) 2008, 2008, (extended abstract).
D. Batchelor, C. Alba, G. Bateman, D. Bernholdt, L. Berry, P. Bonoli, R. Bramley, J. Breslau, M. Chance, J. Chen, M. Choi, W. Elwasif, G. Fu, R. Harvey, E. Jaeger, S. Jardin, T. Jenkins, D. Keyes, S. Klasky, S. Kruger, L. Ku, V. Lynch, D. McCune, J. Ramos, D. Schissel, D. Schnack, and J. Wright, Simulation of Wave Interactions with MHD, in Rick Stevens, editor, SciDAC 2008, 14-17 July 2008, Washington, USA, volume 125 of Journal of Physics: Conference Series, page 012039, Institute of Physics, 2008.
Wael R. Elwasif, David E. Bernholdt, Lee A. Berry, and Don B. Batchelor, Component Framework for Coupled Integrated Fusion Plasma Simulation, in HPC-GECO/CompFrame 2007, 21-22 October, Montreal, Quebec, Canada, 2007.
- Authors:
Wael R. Elwasif, Samantha Foley, Aniruddha G. Shet
- Organization:
Center for Simulation of RF Wave Interactions with Magnetohydrodynamics
Framework
- class ipsframework.ips.Framework(config_file_list: List[str], log_file_name: str, platform_file_name: str | None = None, debug: bool = False, verbose_debug: bool = False, cmd_nodes: int = 0, cmd_ppn: int = 0)
Create an IPS Framework Instance to coordinate the execution of IPS simulations
The Framework performs the following main tasks:
Initialize the different IPS managers that perform the bulk of the framework functionality
Manage communication queues, and route service requests from simulation components to appropriate managers.
Provide logging services to IPS managers.
Perform shutdown procedure on exit
- Parameters:
config_file_list (list) –
A list of simulation configuration files to be used in the simulaion. Each simulation configuration file must have the following parameters
SIM_ROOT The root directory for the simulation
SIM_NAME A name that identifies the simulation
LOG_FILE The name of a log file that is used to capture logging and error information for this simulation.
SIM_ROOT, SIM_NAME, and LOG_FILE must be unique across simulations.
log_file_name (str) – A file name where Framework logging messages are placed.
platform_file_name (str) – The name of the platform configuration file used in the simulation. If not specified it will try to find the one installed in the share directory.
debug (bool) – A flag indicating whether framework debugging messages are enabled (default = False)
verbose_debug (bool) – A flag adding more verbose framework debugging (default = False)
cmd_nodes (int) – Computer nodes (default = 0)
cmd_ppn (int) – Computer processor per nodes (default = 0)
- critical(msg: object, *args)
Produce critical message in simulation log file. See
logging.critical()for usage.
- debug(msg: object, *args)
Produce debugging message in simulation log file. See
logging.debug()for usage.
- error(msg: object, *args)
Produce error message in simulation log file. See
logging.error()for usage.
- exception(msg: object, *args)
Produce exception message in simulation log file. See
logging.exception()for usage.
- get_inq()
- Returns:
handle to the Framework’s input queue object
- Return type:
- info(msg: object, *args)
Produce informational message in simulation log file. See
logging.info()for usage.
- initiate_new_simulation(sim_name: str)
This is to be called by the configuration manager as part of dynamically creating a new simulation. The purpose here is to initiate the method invocations for the framework-visible components in the new simulation
- log(msg: object, *args)
Wrapper for
Framework.info().
- register_service_handler(service_list: Iterable[str], handler: Callable[[ServiceRequestMessage], None])
Register a call back method to handle a list of framework service invocations.
- Parameters:
service_list – a list of service names to call handler when invoked by components. The service name must match the target_method parameter in
messages.ServiceRequestMessage.handler – a Python callable object that takes a
messages.ServiceRequestMessage.
- run() bool
Run the communication outer loop of the framework.
This method implements the core communication and message dispatch functionality of the framework. The main phases of execution for the framework are:
Invoke the
initmethod on all framework-attached components, blocking pending method call termination.Generate method invocation messages for the remaining public method in the framework-centric components (i.e.
stepandfinalize).Generate a queue of method invocation messages for all public framework accessible components in the simulations being run. framework-accessible components are made up of the Init component (if is exists), and the Driver component. The generated messages invoke the public methods
init,step, andfinalize.Dispatch method invocations for each framework-centric component and physics simulation in order.
Exceptions that propagate to this method from the managed simulations causes the framework to abort any pending method invocation for the source simulation. Exceptions from framework-centeric component aborts further invocations to that component.
When all method invocations have been dispatched (or aborted),
configurationManager.ConfigurationManager.terminate_sim()is called to trigger normal termination of all component processes.- Returns:
Success status
- Return type:
- send_terminate_msg(sim_name: str, status=0)
This method remotely invokes the method
component.Component.terminate()on all componnets in the IPS simulationsim_name.- Parameters:
sim_name (str) – The simulation name from which all the components are terminated
status (
messages.Message.SUCCESS,messages.Message.FAILURE) – message status, defaults tomessages.Message.SUCCESS
- terminate_all_sims(status=0)
Terminate all active component instances.
This method remotely invokes the method
component.Component.terminate()on all componnets in the IPS simulation.- Parameters:
status (
messages.Message.SUCCESS,messages.Message.FAILURE) – message status, defaults tomessages.Message.SUCCESS
- warning(msg: object, *args)
Produce warning message in simulation log file. See
logging.warning()for usage.
Data Manager
- class ipsframework.dataManager.DataManager(fwk)
The data manager facilitates the movement and exchange of data files for the simulation.
- merge_current_plasma_state(msg)
Merge partial plasma state file with global master. Newly updated plasma state copied to caller’s workdir. Exception raised on copy error.
msg.args:
partial_state_file
target_state_file
log_file: stdout for merge process if not
None
- process_service_request(msg)
Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.
- stage_state(msg)
Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.
msg.args:
state_files
source_dir
target_dir
- update_state(msg)
Copy plasma state files from source dir to target dir. Return 0. Exception raised on copy error.
msg.args:
state_files
source_dir
target_dir
Task Manager
- class ipsframework.taskManager.TaskInit(nproc, binary, working_dir, tppn, tcpp, tgpp, block, omp, wnodes, wsocks, cmd_args, launch_cmd_extra_args)
- binary
Alias for field number 1
- block
Alias for field number 6
- cmd_args
Alias for field number 10
- launch_cmd_extra_args
Alias for field number 11
- nproc
Alias for field number 0
- omp
Alias for field number 7
- tcpp
Alias for field number 4
- tgpp
Alias for field number 5
- tppn
Alias for field number 3
- wnodes
Alias for field number 8
- working_dir
Alias for field number 2
- wsocks
Alias for field number 9
- class ipsframework.taskManager.TaskManager(fwk)
The task manager is responsible for facilitating component method invocations, and the launching of tasks.
- build_launch_cmd(nproc: int, binary: str, cmd_args: List[str], working_dir, ppn: int, max_ppn: int, nodes: str, accurateNodes: bool, partial_nodes: bool, task_id: int, cpp=0, omp=False, gpp=0, core_list='', launch_cmd_extra_args=None)
Construct task launch command to be executed by the component.
nproc - number of processes to use
binary - binary to launch
cmd_args - additional command line arguments for the binary
working_dir - full path to where the executable will be launched
ppn - processes per node value to use
max_ppn - maximum possible ppn for this allocation
nodes - comma separated list of node ids
accurateNodes - if
True, launch on nodes in nodes, otherwise the parallel launcher determines the process placement- partial_nodes - if
Trueand accurateNodes and task_launch_cmd == ‘mpirun’, a host file is created specifying the exact placement of processes on cores.
- partial_nodes - if
core_list - used for creating host file with process to core mappings
- finish_task(finish_task_msg: ServiceRequestMessage)
Cleanup after a task launched by a component terminates
finish_task_msg is expected to be of type
messages.ServiceRequestMessageMessage args:
task_id: task id of finished task
task_data: return code of task
- get_call_id()
Return a new call id
- get_task_id()
Return a new task id
- init_call(init_call_msg, manage_return=True)
Creates and sends a
messages.MethodInvokeMessagefrom the calling component to the target component. If manage_return isTrue, a record is added to outstanding_calls. Return call id.Message args:
method_name
+ arguments to be passed on as method arguments.
- init_task(init_task_msg: ServiceRequestMessage)
Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component. Return launch command to component via
messages.ServiceResponseMessage. Raise exception if task can not be launched at this time (ipsExceptions.BadResourceRequestException,ipsExceptions.InsufficientResourcesException).init_task_msg is expected to be of type
messages.ServiceRequestMessageMessage args:
nproc: number of processes the task needs
binary: full path to the executable to launch
# SIMYAN: added this to deal with the component directory change 2. working_dir: full path to directory where the task will be launched
tppn: processes per node for this task. (0 indicates that the default ppn is used.)
block: whether or not to wait until the task can be launched.
wnodes:
Truefor whole node allocation,Falseotherwise.wsocks:
Truefor whole socket allocation,Falseotherwise.+ cmd_args: any arguments for the executable
- init_task_pool(init_task_msg: ServiceRequestMessage)
Allocate resources needed for a new task and build the task launch command using the binary and arguments provided by the requesting component.
init_task_msg is expected to be of type
messages.ServiceRequestMessageMessage args:
task_dict: dictionary of task names and objects
- initialize(data_mgr, resource_mgr, config_mgr)
Initialize references to other managers and key values from configuration manager.
- printCurrTaskTable()
Prints the task table pretty-like.
- process_service_request(msg)
Invokes the appropriate public data manager method for the component specified in msg. Return method’s return value.
- return_call(response_msg)
Handle the response message generated by a component in response to a method invocation on that component.
reponse_msg is expected to be of type
messages.MethodResultMessage
- wait_call(wait_msg: ServiceRequestMessage)
Determine if the call has finished. If finished, return any data or errors. If not finished raise the appropriate blocking or nonblocking exception and try again later.
wait_msg is expected to be of type
messages.ServiceRequestMessageMessage args:
call_id: call id for which to wait
blocking: determines the wait is blocking or not
Resource Manager
- class ipsframework.resourceManager.Allocation(partial_node, nodelist, corelist, ppn, max_ppn, cpp, accurateNodes, cores_allocated)
- accurateNodes
Alias for field number 6
- corelist
Alias for field number 2
- cores_allocated
Alias for field number 7
- cpp
Alias for field number 5
- max_ppn
Alias for field number 4
- nodelist
Alias for field number 1
- partial_node
Alias for field number 0
- ppn
Alias for field number 3
- class ipsframework.resourceManager.ResourceManager(fwk)
The resource manager is responsible for detecting the resources allocated to the framework, allocating resources to task requests, and maintaining the associated bookkeeping.
- add_nodes(listOfNodes: list[tuple[str, int]]) int
Add node entries to
self.nodes. Typically used byinitialize()to initializeself.nodes. May be used to add nodes to a dynamic allocation in the future.listOfNodes is a list of tuples (node name, cores).
self.nodesis a dictionary where the keys are the node names and the values arenode_structure.Nodestructures.Return total number of cores.
- begin_RM_report()
Print header information for resource usage reporting file.
- check_core_cap(nproc: int, ppn: int) tuple[bool, str | list[Node]]
Determine if it is currently possible to allocate nproc processes with a ppn of ppn without further restrictions.. Return
Trueand list of nodes to use if successful. ReturnFalseand empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.
- check_gpus(ppn, task_gpp)
- check_whole_node_cap(nproc, ppn)
Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole nodes. Return
Trueand list of nodes to use if successful. ReturnFalseand empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.
- check_whole_sock_cap(nproc, ppn)
Determine if it is currently possible to allocate nproc processes with a ppn of ppn and whole sockets. Return
Trueand list of nodes to use if successful. ReturnFalseand empty list if there are not enough available resources at this time, but it is possible to eventually satisfy the request. Exception raised if the request can never be fulfilled.
- get_allocation(comp_id, nproc, task_id, whole_nodes, whole_socks, task_ppn=0, task_cpp=0, task_gpp=0)
Traverse available nodes to return:
If whole_nodes is
True:shared_nodes:
Falsenodes: list of node names
ppn: processes per node for launching the task
max_ppn: processes that can be launched
accurateNodes:
Trueif nodes uses the actual names of the nodes,Falseotherwise.
If whole_nodes is
False:shared_nodes:
Truenodes: list of node names
- node_file_entries: list of (node, corelist) tuples, where corelist is a list of core names.
Core names are integers from 0 to n-1 where n is the number of cores on a node.
ppn: processes per node for launching the task
max_ppn: processes that can be launched
accurateNodes:
Trueif nodes uses the actual names of the nodes,Falseotherwise.
Arguments:
nproc: the number of requested processes (int)
comp_id: component identifier, must be unique with respect to the framework (string)
task_id: task identifier from TM (int)
method: name of method (string)
task_ppn: ppn for this task (optional) (int)
- initialize(dataMngr, taskMngr, configMngr, cmd_nodes=0, cmd_ppn=0)
Initialize resource management structures, references to other managers (dataMngr, taskMngr, configMngr).
Resource information comes from the following in order of priority:
command line specification (cmd_nodes, cmd_ppn)
detection using parameters from platform config file
manual settings from platform config file
The second two sources are obtained through
resourceHelper.getResourceList().
- process_service_request(msg)
- release_allocation(task_id, status)
Set resources allocated to task task_id to available. status is not used, but may be used to correlate resource failures to task failures and implement task relaunch strategies.
- report_RM_status(notes='')
Print current RM status to the reporting_file (“resource_usage”) Entries consist of:
time in seconds since beginning of time (__init__ of RM)
# cores that are available
# cores that are allocated
% allocated cores
# processes launched by task
% cores used by processes
notes (a description of the event that changed the resource usage)
- sendEvent(eventName, info)
wrapper for constructing and publishing EM events
- class ipsframework.node_structure.Node(name, socks: int, cores: int, p)
Models a node in the allocation.
name: name of node, typically actual name from resource detection phase.
task_ids, owners: identifiers for the tasks and components that are currently using the node.
allocated, available: list of sockets that have cores allocated and available. A socket may appear in both lists if it is only partially allocated.
sockets: list of sockets belonging to this node
avail_cores: number of cores that are currently available.
total_cores: total number of cores that can be allocated on this node.
status: indicates if the node is ‘UP’ or ‘DOWN’. Currently not used, all nodes are considered functional..
- allocate(whole_nodes, whole_sockets, tid, o, procs)
Mark procs number of cores as allocated subject to the values of whole_nodes and whole_sockets. Return the number of cores allocated and their corresponding slots, a list of strings of the form:
<socket name>:<core name>
- print_sockets(fname='')
Pretty print of state of sockets.
- release(tid, o)
Mark cores used by task tid and component o as available. Return the number of cores released.
- class ipsframework.node_structure.Socket(name, cps, coreids=())
Models a socket in a node.
name: identifier for the socket
task_ids, owners: identifiers for the tasks and components that are currently using the socket.
allocated, available: lists of cores that are allocated and available.
cores: list of
Coreobjects belonging to this socketavail_cores: number of cores that are currently available.
total_cores: total number of cores that can be allocated on this socket.
- allocate(whole, tid, o, num_procs)
Mark num_procs cores as allocated subject to the value of whole. Return a list of strings of the form:
<socket name>:<core name>
- print_cores(fname='')
Pretty print of state of cores.
- release(tid)
Mark cores that are allocated to task tid as available. Return number of cores set to available.
- class ipsframework.node_structure.Core(name)
Models a core of a socket.
name: name of core
is_available: boolean value indicating the availability of the core.
task_id, owner: identifiers of the task and component using the core.
The Resource Helper file contains all of the code needed to figure out what host we are on and what resources we have. Taking this out of the resource manager will allow us to test it independent of the IPS.
- ipsframework.resourceHelper.getResourceList(services, host, partial_nodes=False)
Using the host information, the resources are detected. Return list of (<node name>, <processes per node>), cores per node, sockets per node, processes per node, and
Trueif the node names are accurate,Falseotherwise.
- ipsframework.resourceHelper.get_checkjob_info()
- ipsframework.resourceHelper.get_pbs_info()
Access info about allocation from PBS environment variables:
PBS_NNODESPBS_NODEFILE
- ipsframework.resourceHelper.get_platform_info()
Get information about the platform
Used to gather runtime information about the current platform. This can be be used for debugging purposes to ensure that the framework is running properly for a given system.
- Returns:
A dictionary containing hostname, cpu count, cpu core id for current running process, and available GPU devices if set; if the platform is supported it will also return CPU affinity
- ipsframework.resourceHelper.get_qstat_jobinfo()
Use
qstat -f $PBS_JOBIDto get the number of nodes and ppn of the allocation. Typically works on PBS systems.
- ipsframework.resourceHelper.get_qstat_jobinfo2()
A second way to use
qstat -f $PBS_JOBIDto get the number of nodes and ppn of the allocation. Typically works on PBS systems.
- ipsframework.resourceHelper.get_slurm_info()
Access environment variables set by Slurm to get the node names, tasks per node and number of processes.
SLURM_NODELISTSLURM_TASKS_PER_NODEorSLURM_JOB_TASKS_PER_NODESLURM_NPROC
- ipsframework.resourceHelper.manual_detection(services)
Use values listed in platform configuration file.
Component
IPS Framework Component
- class ipsframework.component.Component(services, config: Dict[str, Any])
Base class for all IPS components. Common set up, connection and invocation actions are implemented here.
- Parameters:
services (
ServicesProxy) – service proxy to communicate with frameworkconfig (dict) – configuration dictionary for this component
- property args
- property call_id
- checkpoint(timestamp: float = 0.0, **keywords)
Produce some default debugging information before the rest of the code is executed.
- property component_id
- property config
- finalize(timestamp: float = 0.0, **keywords)
Produce some default debugging information before the rest of the code is executed.
- init(timestamp: float = 0.0, **keywords)
Produce some default debugging information before the rest of the code is executed.
- property method_name
- restart(timestamp: float = 0.0, **keywords)
Produce some default debugging information before the rest of the code is executed.
- property services
- property start_time
Component Registry
- class ipsframework.componentRegistry.ComponentID(class_name, sim_name)
Object to facilitate the creation, serialization and deserialization of component ids.
- all_ids: ClassVar[dict[str, ComponentID]] = {}
- delimiter = '@'
- static deserialize(comp_id_string)
Return the deserialized version of the component id.
- get_class_name()
Return class name of component.
- get_instance_name()
Return instance name of component id.
- get_seq_num()
Return sequence number of component.
- get_serialization()
Return serialization.
- get_sim_name()
Return simulation name for the component.
- seq_num = 0
- class ipsframework.componentRegistry.ComponentRegistry(*args, **kwargs)
- class RegistryEntry(svc_response_q, invocation_q, component_ref, services, config)
Container for queues and references associated with a component.
- addEntry(component_id, svc_response_q, invocation_q, component_ref, services, config)
Create a component registry entry for component_id and its associated queues, component ref, services and configuration information.
- getComponentArtifact(component_id, artifact)
Return value of artifact in component_id’s registry entry.
- getEntry(component_id)
Return a registry entry.
- get_component_ids(sim_name)
Return all of the component ids associated with sim sim_name
- removeEntry(component_id)
- setComponentArtifact(component_id, artifact, value)
Set the value of artifact in component_id’s registry entry to value.
- class ipsframework.componentRegistry.SingletonMeta
Configuration Manager
- class ipsframework.configurationManager.ConfigurationManager(fwk, config_file_list: list[str | PathLike], platform_file_name: str | PathLike)
The configuration manager is responsible for paring the simulation and platform configuration files, creating the framework and simulation components, as well as providing an interface to accessing items from the configuration files (e.g., the time loop).
- class SimulationData(sim_name, start_time: float | None = None)
Structure to hold simulation data stored into the sim_map entry in the configurationManager class
- create_simulation(sim_name, config_file, override, sub_workflow=False)
- get_all_simulation_components_map()
- get_all_simulation_sim_root()
- get_component_map()
Return a dictionary of simulation names and lists of component references. (May only be the driver, and init (if present)???)
- get_config_parameter(sim_name, param)
Return value of param from simulation configuration file for sim_name.
- get_framework_components()
Return list of framework components.
- get_platform_parameter(param, silent=False)
Return value of platform parameter param. If silent is
False(default)Noneis returned when param not found, otherwise an exception is raised.
- get_port(sim_name, port_name)
Return a reference to the component from simulation sim_name implementing port port_name.
- get_sim_names()
Return list of names of simulations.
- get_sim_parameter(sim_name, param)
Return value of param from simulation configuration file for sim_name.
- get_simulation_components(sim_name)
- initialize(data_mgr, resource_mgr, task_mgr)
Parse the platform and simulation configuration files using the
ConfigObjmodule. Create and initialize simulation(s) and their components, framework components and loggers.
- process_service_request(msg)
Invokes public configuration manager method for a component. Return method’s return value.
- set_config_parameter(sim_name, param, value, target_sim_name)
Set the configuration parameter param to value value in target_sim_name. If target_sim_name is the framework, all simulations will get the change. Return value.
- terminate(status)
Terminates all processes attached to the framework. status not used.
- terminate_sim(sim_name)
Services
IPS Services
- class ipsframework.services.DVMPlugin(logger, oversubscribe=False, hwthreads=False)
- class ipsframework.services.RunningTask(process, start_time, timeout, nproc, cores_allocated, command, binary, args)
- class ipsframework.services.ServicesProxy(fwk, fwk_in_q: Queue, svc_response_q: Queue, sim_conf: dict[str, Any], log_pipe_name: str)
The ServicesProxy object is responsible for marshalling invocations of framework services to the framework process using a shared queue. The queue is shared among all components in a simulation. The results from framework services invocations are received via another, component-specific “framework response” queue.
Create a new ServicesProxy object
- Parameters:
fwk (
ipsframework.ips.Framework) – Enclosing IPS simulation frameworkfwk_in_q (
multiprocessing.Queue) – Framework input message queue - shared among all service objectssvc_response_q (
multiprocessing.Queue) – Service response message queue - one per service object.sim_conf (dict) – Simulation configuration dictionary, contains data from the simulation configuration file merged with the platform configuration file.
log_pipe_name (str) – Name of logging pipe for use by the IPS logging daemon.
- add_analysis_data_files(current_data_file_paths: list[str], timestamp: float = 0.0, replace: bool = False) None
If the IPS Portal is available, saves data files to IPS Portal. Files are indexed via specific timestamps.
If a connection to the IPS Portal cannot be verified for this run, this function does nothing.
- Parameters:
current_data_file_paths – list of paths to the current data files we want to copy to the Jupyter directory. These paths may be either absolute paths or IPS-appropriate relative paths. If path is a directory, add all files in directory and preserve directory structure on the IPS Portal.
timestamp – label to assign to the data (currently must be a floating point value)
replace – If True, replace the last data file added with the new data file. If False, simply append the new data file. (default: False) Note that if replace is not True but you attempt to overwrite it, a ValueError will be thrown.
- add_task(task_pool_name: str, task_name: str, nproc: int, working_dir: str, binary: str, *args, **keywords)
Add task task_name to task pool task_pool_name. Remaining arguments are the same as in
ServicesProxy.launch_task().
- call(component_id: ComponentID, method_name: str, *args, **keywords)
Invoke method method_name on component component_id with optional arguments *args. Will wait until call is finished. Return result from invoking the method.
- Parameters:
component_id (
ComponentID) – Component ID of requested componentmethod_name (str) – component method to call, e.g.
initorstep
- Returns:
service response message arguments
- call_nonblocking(component_id: ComponentID, method_name: str, *args, **keywords) int
Invoke method method_name on component component_id with optional arguments *args. Will not wait until finished.
- Parameters:
component_id (
ComponentID) – Component ID of requested componentmethod_name (str) – component method to call, e.g.
initorstep
- Returns:
call_id
- Return type:
- checkpoint_components(comp_id_list, time_stamp, Force=False, Protect=False)
Selectively checkpoint components in comp_id_list based on the configuration section CHECKPOINT. If Force is
True, the checkpoint will be taken even if the conditions for taking the checkpoint are not met. If Protect isTrue, then the data from the checkpoint is protected from clean up. Force and Protect are optional and default toFalse.The CHECKPOINT_MODE option controls determines if the components checkpoint methods are invoked.
Possible MODE options are:
- ALL:
Checkpint every time the call is made (equivalent to always setting Force =True)
- WALLTIME_REGULAR:
checkpoints are saved upon invocation of the service call
checkpoint_components(), when a time interval greater than, or equal to, the value of the configuration parameter WALLTIME_INTERVAL had passed since the last checkpoint. A checkpoint is assumed to have happened (but not actually stored) when the simulation starts. Calls tocheckpoint_components()before WALLTIME_INTERVAL seconds have passed since the last successful checkpoint result in a NOOP.- WALLTIME_EXPLICIT:
checkpoints are saved when the simulation wall clock time exceeds one of the (ordered) list of time values (in seconds) specified in the variable WALLTIME_VALUES. Let [t_0, t_1, …, t_n] be the list of wall clock time values specified in the configuration parameter WALLTIME_VALUES. Then checkpoint(T) = True if T >= t_j, for some j in [0,n] and there is no other time T_1, with T > T_1 >= T_j such that checkpoint(T_1) = True. If the test fails, the call results in a NOOP.
- PHYSTIME_REGULAR:
checkpoints are saved at regularly spaced “physics time” intervals, specified in the configuration parameter PHYSTIME_INTERVAL. Let PHYSTIME_INTERVAL = PTI, and the physics time stamp argument in the call to checkpoint_components() be pts_i, with i = 0, 1, 2, … Then checkpoint(pts_i) = True if pts_i >= n PTI , for some n in 1, 2, 3, … and pts_i - pts_prev >= PTI, where checkpoint(pts_prev) = True and pts_prev = max (pts_0, pts_1, ..pts_i-1). If the test fails, the call results in a NOOP.
- PHYSTIME_EXPLICIT:
checkpoints are saved when the physics time equals or exceeds one of the (ordered) list of physics time values (in seconds) specified in the variable PHYSTIME_VALUES. Let [pt_0, pt_1, …, pt_n] be the list of physics time values specified in the configuration parameter PHYSTIME_VALUES. Then checkpoint(pt) = True if pt >= pt_j, for some j in [0,n] and there is no other physics time pt_k, with pt > pt_k >= pt_j such that checkpoint(pt_k) = True. If the test fails, the call results in a NOOP.
The configuration parameter NUM_CHECKPOINT controls how many checkpoints to keep on disk. Checkpoints are deleted in a FIFO manner, based on their creation time. Possible values of NUM_CHECKPOINT are:
NUM_CHECKPOINT = n, with n > 0 –> Keep the most recent n checkpoints
NUM_CHECKPOINT = 0 –> No checkpoints are made/kept (except when Force =
True)NUM_CHECKPOINT < 0 –> Keep ALL checkpoints
Checkpoints are saved in the directory
${SIM_ROOT}/restart
- cleanup()
Clean up any state from the services. Called by the terminate method in the base class for components.
- create_simulation(config_file, override)
Create simulation
- create_sub_workflow(sub_name, config_file, override: dict[str, Any] | None = None, input_dir=None)
Create sub-workflow
- Parameters:
sub_name – name of sub-workflow
config_file – configuration file for sub-workflow
override – dictionary of configuration overrides; keys are component names and the items are attribute/key values associated with that component.
input_dir – input directory for sub-workflow components
- Returns:
tuple of simulation name, init component, driver component
- create_task_pool(task_pool_name: str)
Create an empty pool of tasks with the name task_pool_name. Raise exception if duplicate name.
- critical(msg: object, *args)
Produce critical message in simulation log file. See
logging.critical()for usage.
- debug(msg, *args)
Produce debugging message in simulation log file. See
logging.debug()for usage.
- error(msg: object, *args)
Produce error message in simulation log file. See
logging.error()for usage.
- exception(msg: object, *args)
Produce exception message in simulation log file. See
logging.exception()for usage.
- get_config_param(param: str, silent: bool = False, log: bool = True) Any
Return the value of the configuration parameter
param. Raise exception if not found and silent is False.Config params with special meaning to the framework include:
SIM_ROOT (mandatory)
SIM_NAME (mandatory)
LOG_FILE (mandatory)
LOG_LEVEL
RUN_ID
TOKOMAK_ID
SHOT_NUMBER
OUTPUT_PREFIX
SIMULATION_MODE (either NORMAL or RESTART)
NODE_ALLOCATION_MODE (either SHARED or EXCLUSIVE)
RESTART_TIME
RESTART_ROOT
CHECKPOINT
TIME_LOOP (this should generally be accessed via self.services.get_time_loop())
Any variable defined in the config file can be accessed via this function.
- Parameters:
- Returns:
dictionary of given parameter from configuration
- Return type:
- get_finished_tasks(task_pool_name: str)
Return dictionary of finished tasks and return values in task pool task_pool_name. Raise exception if no active or finished tasks.
- get_port(port_name: str) ComponentID
- Parameters:
port_name (str) – port name
- Returns:
Return a reference to the component implementing port port_name.
- Return type:
- get_restart_files(restart_root: str, timeStamp: float, file_list: str | list[str]) None
Copy files needed for component restart from the restart directory:
<restart_root>/restart/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}to the component’s work directory.
Copying errors are not fatal (exception raised).
- get_working_dir() str
Return the working directory of the calling component.
The structure of the working directory is defined using the configuration parameters CLASS, SUB_CLASS, and NAME of the component configuration section. The structure of the working directory is:
${SIM_ROOT}/work/$CLASS_${SUB_CLASS}_$NAME_<instance_num>- Returns:
working directory
- Return type:
- info(msg: object, *args)
Produce informational message in simulation log file. See
logging.info()for usage.
- initialize_jupyter_notebook(source_notebook_path: str, dest_notebook_name: str | None = None) None
If the IPS Portal is available, this function loads a notebook from source_notebook_path, adds a cell to load the data, and then saves the concatenated notebook to the Portal.
If a connection to the IPS Portal cannot be verified for this run, this function does nothing.
Does not modify the source notebook.
- Params:
source_notebook_path: location you want to load the source notebook from. This can be either an absolute path, or an IPS-appropriate relative path.
dest_notebook_name: (optional, default None) filename of the notebook to use when saving it to the IPS Portal. If not provided, this will defauly to the filename of the source notebook.
- kill_task(task_id: int) None
Kill launched task task_id. Return if successful. Raises exceptions if the task or process cannot be found or killed successfully.
- Parameters:
task_id (int) – task ID
- Raises:
Exception - if task could not successfully be killed.
- launch_task(nproc: int, working_dir: str, binary: str, *args, **keywords) int
Launch binary in working_dir on nproc processes. *args are any arguments to be passed to the binary on the command line. **keywords are any keyword arguments used by the framework to manage how the binary is launched. Keywords may be the following:
task_ppn : the processes per node value for this task
task_cpp : the cores per process, only used when
MPIRUN=sruncommandstask_gpp : the gpus per process, only used when
MPIRUN=sruncommands- ompIf
Truethe task will be launch with the correct OpenMP environment variables set, only used when
MPIRUN=srun
- ompIf
block : specifies that this task will block (or raise an exception) if not enough resources are available to run immediately. If
True, the task will be retried until it runs. IfFalse, an exception is raised indicating that there are not enough resources, but it is possible to eventually run. (default =True)tag : identifier for the portal. May be used to group related tasks.
logfile : file name for
stdout(andstderr) to be redirected to for this task. By defaultstderris redirected tostdout, andstdoutis not redirected.whole_nodes : if
True, the task will be given exclusive access to any nodes it is assigned. IfFalse, the task may be assigned nodes that other tasks are using or may use.whole_sockets : if
True, the task will be given exclusive access to any sockets of nodes it is assigned. IfFalse, the task may be assigned sockets that other tasks are using or may use.launch_cmd_extra_args : extra command arguments added the the MPIRUN command
Return task_id if successful. May raise exceptions related to opening the logfile, being unable to obtain enough resources to launch the task (
InsufficientResourcesException), bad task launch request (ResourceRequestMismatchException,BadResourceRequestException) or problems executing the command. These exceptions may be used to retry launching the task as appropriate.Note
This is a nonblocking function, users must use a version of
ServicesProxy.wait_task()to get result.
- launch_task_pool(task_pool_name: str, launch_interval: float = 0.0) dict[str, Any]
Construct messages to task manager to launch each task in task pool. Used by
TaskPoolto launch tasks in a task_pool.
- log(msg, *args)
Wrapper for
ServicesProxy.info().
- merge_current_state(partial_state_file: str, logfile: str | None = None, merge_binary: str | None = None) None
Merge partial plasma state with global state. Partial plasma state contains only the values that the component contributes to the simulation. Raise exceptions on bad merge. Optional logfile will capture
stdoutfrom merge. Optional merge_binary specifies path to executable code to do the merge (default value : “update_state”)
- publish(topicName: str, eventName: str, eventBody: Any) None
Publish event consisting of eventName and eventBody to topic topicName to the IPS event service.
- remove_task_pool(task_pool_name: str)
Kill all running tasks, clean up all finished tasks, and delete task pool.
- run_ensemble(template: str | PathLike, variables: dict[str, dict[str, list[str]]], run_dir: str | PathLike, name: str, num_nodes: int, cores_per_instance: int | None = None, oversubscribe: bool = False, hwthreads: bool = False)
Run ensemble of simulations given the template and variables.
variables is a nested dict that looks like this:
variables = {'a_sim_comp': {'A': [3, 2, 4], 'B': [2.34, 5.82, 0.1], 'C': ['bar', 'baz', 'quux']}, 'another_sim_comp': {'D': [7, 5, 9], 'B': [0.775, 0.080, 29.2], 'F': ['xyzzy', 'plud', 'thud']}}
That is, the keys are the simulation names and the values are dicts mapping parameter to a set of values. Ensembles will be spun up for each simulation for each combination of parameters. E.g., a_sim_comp will be run three times with the parameters of A, B, and C being set to 3, 2.34, ‘bar’ for one of the simulation instances, respectively. another_sim_comp behaves similarly with its respective parameters.
The ensembles will run under run_dir within a subdirectory uniquely named for each. The subdirectory will contain an IPS config file created from template with ? variables replaced with the values from variables.
TODO be able to specify the number of cores per instance
- Parameters:
template – configuration template file
variables – a dict of variables to pass to the ensemble runs
run_dir – in which to run the ensembles
name – ensemble name, or string to prepend to generated instance directory and file names
cores_per_instance – How many cores per ensemble instances?
num_nodes – Total number of nodes to allocate for the ensemble runs. There will be one Dask worker assigned to each of these nodes.
oversubscribe – Whether to allow oversubscription of nodes when launching the ensemble runs. Default is False.
hwthreads – Whether to use hardware threads
- Returns:
a list of dicts mapping created subdirs to simulation names and their parameters
- save_restart_files(timeStamp: float, file_list: str | list[str]) None
Copy files needed for component restart to the restart directory:
${SIM_ROOT}/restart/$timestamp/components/$CLASS_${SUB_CLASS}_$NAMECopying errors are not fatal (exception raised).
- send_portal_event(event_type: str = 'COMPONENT_EVENT', event_comment: str = '', event_time=None, elapsed_time=None)
Send event to web portal.
- setMonitorURL(url: str = '') None
Send event to portal setting the URL where the monitor component will put data.
- set_config_param(param: str, value: Any, target_sim_name: str | None = None) Any
Set configuration parameter param to value. Raise exceptions if the parameter cannot be changed or if there are problems setting the value. This tell the framework to call
ipsframework.configurationManager.ConfigurationManager.set_config_parameter()to change the parameter.- Parameters:
param (str) – The parameter requested from simulation config
value – The value to set the parameter
- Returns:
return value from setting parameter
- stage_input_files(input_file_list: str | Iterable[str]) None
Copy component input files to the component working directory (as obtained via a call to
ServicesProxy.get_working_dir()). Input files are assumed to be originally located in the directory variable INPUT_DIR in the component configuration section.File are copied using
ipsframework.ipsutil.copyFiles().
- stage_output_files(timeStamp: float, file_list: str | list[str], keep_old_files: bool = True, save_plasma_state: bool = True) None
Copy associated component output files (from the working directory) to the component simulation results directory. Output files are prefixed with the configuration parameter OUTPUT_PREFIX. The simulation results directory has the format:
${SIM_ROOT}/simulation_results/<timeStamp>/components/$CLASS_${SUB_CLASS}_$NAME_${SEQ_NUM}Additionally, plasma state files are archived for debugging purposes:
${SIM_ROOT}/history/plasma_state/<file_name>_$CLASS_${SUB_CLASS}_$NAME_<timeStamp>Copying errors are not fatal (exception raised).
- stage_subflow_output_files(subflow_name: str = 'ALL') dict[str, list[str]]
Gather outputs from sub-workflows. Sub-workflow output is defined to be the output files from its DRIVER component as they exist in the sub-workflow driver’s work area at the end of the sub-simulation. If subflow_name != ‘ALL’ then get output from only that sub-flow
- submit_tasks(task_pool_name, block=True, use_dask=False, dask_nodes=1, dask_ppw=None, launch_interval=0.0, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False, oversubscribe=False, hwthreads=False)
Launch all unfinished tasks in task pool task_pool_name. If block is
True, return when all tasks have been launched. If block isFalse, return when all tasks that can be launched immediately have been launched. Return number of tasks submitted.Optionally, dask can be used to schedule and run the task pool.
- subscribe(topicName: str, callback: Callable) None
Subscribe to topic topicName on the IPS event service and register callback as the method to be invoked when an event is published to that topic.
- update_state(state_files: list[str] | None = None) None
Copy local (updated) state to global state. If no state files are specified, component configuration specification is used. Raise exceptions upon copy.
- wait_call(call_id: int, block: bool = True)
If block is
True, return when the call has completed with the return code from the call. If block isFalse, raiseIncompleteCallExceptionif the call has not completed, and the return value is it has.- Parameters:
call_id (int) – call ID
- Returns:
service response message arguments
- wait_call_list(call_id_list: list[int], block=True)
Check the status of each of the call in call_id_list. If block is
True, return when all calls are finished. If block isFalse, raiseIncompleteCallExceptionif any of the calls have not completed, otherwise return. The return value is a dictionary of call_ids and return values.
- wait_task(task_id: int, timeout: int = -1, delay: int = 1) int
Check the status of task task_id. Return the return value of the task when finished successfully. Raise exceptions if the task is not found, or if there are problems finalizing the task.
- wait_task_nonblocking(task_id: int) int | None
Check the status of task task_id. If it has finished, the return value is populated with the actual value, otherwise
Noneis returned. A KeyError exception may be raised if the task is not found.- Parameters:
task_id (int) – task ID (PID)
- Returns:
return value of task if finished else None
- wait_tasklist(task_id_list: list[int], block: bool = True) dict[int, int]
Check the status of a list of tasks. If
blockisTrue, return a dictionary of return values when all tasks have completed. IfblockisFalse, return a dictionary containing entries for each completed task. Note that the dictionary may be empty. RaiseKeyErrorexception iftask_idnot found.
- warning(msg: object, *args)
Produce warning message in simulation log file. See
logging.warning()for usage.
- class ipsframework.services.Task(task_name: str, nproc: int, working_dir: str, binary: str, *args, **keywords)
Container for task information:
- Parameters:
name (str) – task name
nproc (int) – number of processes the task needs
working_dir (str) – location to launch task from
binary (str) – full path to executable to launch
*args – arguments for binary
**keywords – keyword arguments for launching the task. See
ServicesProxy.launch_task()for details.
- class ipsframework.services.TaskPool(name: str, services: ServicesProxy)
Class to contain and manage a pool of tasks.
- IDLE_TIMEOUT = 600
- add_task(task_name: str, nproc: int, working_dir: str, binary: str, *args, **keywords)
Create
Taskobject and add to queued_tasks of the task pool. Raise exception if task name already exists in task pool.
- dask_scheduler = ['/home/docs/checkouts/readthedocs.org/user_builds/ips-framework/envs/latest/bin/dask', 'scheduler']
- dask_worker = ['/home/docs/checkouts/readthedocs.org/user_builds/ips-framework/envs/latest/bin/dask', 'worker']
- get_dask_finished_tasks_status()
Return a dictionary of exit status values for all dask tasks that have finished since the last time finished tasks were polled.
This function also shuts down the dask client. (FIXME The fate of the Dask scheduler and workers is unknown.)
It also, as yet another side-effect if it sees there’s an associated self.worker_event_logfile. If there is one it will send monitor events for each record found in that file. It will then remove these log files.
I recommend possibly splitting this into three different, focused functions. One for gathering the exit statuses from all workers. Another for shutting down Dask, which means shutting down the client, scheduler, and workers, not just the client. (Though the scheduler and workers will eventually expire due to timeouts.) And another for creating events from Dask log messages. (With a boolean argument to denote whether these log files should be deleted after the fact. I.e., the practitioner may want to look at those even if they’re emitted as IPS events.
This also presumes that the dask workers will return an exit status, presumably of related subprocess calls. FIXME What if we have other Dask tasks that do not return an exit status?
- Returns:
dict mapping task name to exit status
- Return type:
- get_finished_tasks_status()
Return a dictionary of exit status values for all tasks that have finished since the last time finished tasks were polled.
- Returns:
dict mapping task name to exit status
- Return type:
- shifter = None
- submit_dask_tasks(block=True, dask_nodes=1, dask_ppw=None, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False, oversubscribe=False, hwthreads=False)
Launch tasks in queued_tasks using dask.
One dask worker will be started for each node unless dask_worker_per_gpu is True where one dask worker will be started for every GPU. So dask_node times GPUS_PER_NODE workers will be started.
- Parameters:
block (bool) – Unused, this will always return after tasks are submitted
dask_nodes (int) – Number of task nodes, default 1
dask_ppw (int) – Number of processes per dask worker, default is PROCS_PER_NODE However, dask_ppw will be “cores per instance” if using ensembles, so will be PROCS_PER_NODE // dask_ppw to enforce that each dask worker will have multiple cores, hopefully articulated via DVM (i.e., prun will be invoked and will coordinate with the DVM to allocate multiple cores to each worker thread). Note that the DVM daemon will be started by the registered Dask worker plugin DVMPlugin.
use_shifter (bool) – Option to launch dask scheduler and workers in shifter container
dask_worker_plugin (distributed.diagnostics.plugin.WorkerPlugin) – If provided this will be registered as a worker plugin with the dask client
dask_worker_per_gpu (bool) – If true then a separate worker will be started for each GPU and binded to that GPU
oversubscribe (bool) – Whether to allow oversubscription of nodes when launching the dask workers. Default is False.
hwthreads (bool) – Whether to use hardware threads
- FIXME consider having n processes instead of n threads given that we’re
likely running in a HPC context. See: https://distributed.dask.org/en/stable/efficiency.html#adjust-between-threads-and-processes
- Returns:
number of tasks submitted
- submit_tasks(block=True, use_dask=False, dask_nodes=1, dask_ppw=None, launch_interval=0.0, use_shifter=False, shifter_args=None, dask_worker_plugin=None, dask_worker_per_gpu=False, oversubscribe=False, hwthreads=False)
Launch tasks in queued_tasks. Finished tasks are handled before launching new ones. If block is
True, the number of tasks submitted is returned after all tasks have been launched and completed. If block isFalsethe number of tasks that can immediately be launched is returned.If
use_dask==Truethen the tasks are launched withsubmit_dask_tasks(). One dask worker will be started for each node unless dask_worker_per_gpu is True where one dask worker will be started for every GPU. So dask_node times GPUS_PER_NODE workers will be started.- Parameters:
block (bool) – If True then wait for task to complete, default True
use_dask (bool) – If True then use dask to launch tasks, default False
dask_nodes (int) – Number of task nodes, only used it
use_dask==Truedask_ppw (int) – Number of processes per dask worker, default is PROCS_PER_NODE, only used it
use_dask==Truelaunch_internal (float) – time to wait between launching tasks, default 0.0
use_shifter (bool) – Option to launch dask scheduler and workers in shifter container
shifter_args (str) – Optional arguments added to shifter when launching dask scheduler and workers
dask_worker_plugin (distributed.diagnostics.plugin.WorkerPlugin) – If provided this will be registered as a worker plugin with the dask client
dask_worker_per_gpu (bool) – If true then a separate worker will be started for each GPU and binded to that GPU
oversubscribe (bool) – If True then pass the oversubscribe option to mpirun when launching dask workers
hwthreads (bool) – If True then use hardware threads when launching tasks. Default is False.
- Returns:
number of tasks submitted
- terminate_tasks()
Kill all active tasks, clear all queued, blocked and finished tasks.
- ipsframework.services.launch(binary: Any, task_name: str, working_dir: str | PathLike, *args, **keywords)
This is used by
TaskPool.submit_dask_tasks()as the input todask.distributed.Client.submit().Valid keywords: * worker_event_logfile - where JSON log messages are written * logfile - where the task output is written; if not specified, STDOUT used * errfile - where the task error output is written; if not specified, STDOUT used * task_env - A dictionary of environment variables to set * timeout - The timeout in seconds for the task to complete. * cpus_per_proc - The number of cpus per process to use for the task. This implies that the DVMPlugin has set up a DVM daemon for this node. * oversubscribe - If True, then the number of processes can exceed the number of cores on the node. Default is False.
If the worker has the attribute dvm_uri_file, then we are running with a DVM (Distributed Virtual Machine) so the binary needs a prun prepended pointing to that.
If the worker doesn’t have a lock attribute, then we create one by assigning a threading lock to it. This is used to ensure that the worker’s event log is written to in a thread-safe manner.
- Parameters:
binary – The binary to launch. Either a string or a class.
task_name – The name of the task.
working_dir – The working directory in which to run this task
- Returns:
The task name and the return value from running the binary.
Other Utilities
IPS Exceptions
- exception ipsframework.ipsExceptions.BadResourceRequestException(caller_id, tid, request, deficit)
Exception raised by the resource manager when a component requests a quantity of resources that can never be satisfied during a get_allocation() call
- exception ipsframework.ipsExceptions.BlockedMessageException(msg, reason)
Exception Raised by the any manager when a blocking service invocation is made, and the invocation result is not readily available.
- exception ipsframework.ipsExceptions.GPUResourceRequestMismatchException(caller_id, tid, ppn, gpp, max_gpp)
Exception raised by the resource manager when it is possible to launch the requested number of GPUs per task
- exception ipsframework.ipsExceptions.IncompleteCallException(callID)
Exception Raised by the taskManager when a nonblocking wait_call() method is invoked before the call has finished.
- exception ipsframework.ipsExceptions.InsufficientResourcesException(caller_id, tid, request, deficit)
Exception Raised by the resource manager when not enough resources are available to satisfy an allocate() call
- exception ipsframework.ipsExceptions.InvalidResourceSettingsException(t, spn, cpn)
Exception raised by the resource helper to indicate inconsistent resource settings.
- exception ipsframework.ipsExceptions.ResourceRequestMismatchException(caller_id, tid, nproc, ppn, max_procs, max_ppn)
Exception raised by the resource manager when it is possible to launch the requested number of processes, but not on the requested number of processes per node.
- exception ipsframework.ipsExceptions.ResourceRequestUnequalPartitioningException(caller_id, tid, nproc, ppn, max_procs, max_ppn)
Exception raised by the resource manager when it is possible to launch the requested number of processes, but the requested number of processes and processes per node will result in unequal partitioning of nodes.
IPS Utilities
- ipsframework.ipsutil.copyFiles(src_dir: str, src_file_list: str | Iterable[str], target_dir: str, prefix='', keep_old: bool = False)
Copy files in src_file_list from src_dir to target_dir with an optional prefix. If keep_old is
True, existing files in target_dir will not be overridden, otherwise files can be clobbered (default). Wild-cards in file name specification are allowed.
- ipsframework.ipsutil.ensemble_instances_to_csv(instances: list[tuple[str, list[tuple[str, dict[str, object]]]]], path: str | PathLike) None
Take in a structure of variables suitable for passing to services.run_ensemble(), and write a CSV file from it.
So, for example, if the structure looks like this:
variables = {'a_comp': {'A': [3, 2, 4], 'B': [2.34, 5.82, 0.1], 'C': ['bar', 'baz', 'quux']}, 'another_comp': {'D': [7, 5, 9], 'B': [0.775, 0.080, 29.2], 'F': ['xyzzy', 'plud', 'thud']}}
The written CSV file will look like this:
a_comp:A,a_comp:B,a_comp:C,another_comp:D,another_comp:B,another_comp:F 3,2.34,bar,7,0.775,xyzzy 2,5.82,baz,5,0.080,plud 4,0.1,quux,9,29.2,thud
The generated CSV file follows RFC-4180 precisely.
- Parameters:
variables – dictionary of parameters identical to the return value of params_from_csv
path – Path to the CSV file
- ipsframework.ipsutil.getTimeString(timeArg: struct_time | None = None)
Return a string representation of timeArg. timeArg is expected to be an appropriate object to be processed by
time.strftime(). If timeArg isNone, current time is used.
- ipsframework.ipsutil.group_ensemble_variables_into_instances(variables: dict[str, dict[str, list[str]]], name: str)
convert component variables into something like this:
[['prefix_0', [['a_sim_comp', {'A': 3, 'B': 2.34, 'C': 'bar'}], ['another_sim_comp', {'D': 7, 'B': 0.775, 'F': 'xyzzy'}]]], ['prefix_1', [['a_sim_comp', {'A': 2, 'B': 5.82, 'C': 'baz'}], ['another_sim_comp', {'D': 5, 'B': 0.08, 'F': 'plud'}]]], ['prefix_2', [['a_sim_comp', {'A': 4, 'B': 0.1, 'C': 'quux'}], ['another_sim_comp', {'D': 9, 'B': 29.2, 'F': 'thud'}]]]]
prefix_n corresponds to a specific ensemble instance and will be used for a unique subdir name. That, in turn, references a list of lists where each list element is a component that, in turn, has a dict mapping component variables to values that will then be later used to flesh out a config file from a config template file.
- ipsframework.ipsutil.params_from_csv(infile: str | PathLike) dict[str, dict[str, list[str]]]
Read a CSV file and return a dictionary of parameters suitable for passing to services.run_ensemble()
For each simulation, A, with corresponding parameters, name1, name2, …, create columns following the pattern A:name1, A:name2, … in the CSV file. Each row will correspond to the parameter values used in each instance.
So, for example, if the CSV file looks like this:
a_comp:A, a_comp:B, a_comp:C, another_comp:D, another_comp:B, another_comp:F 3, 2.34, bar, 7, 0.775, xyzzy 2, 5.82, baz, 5, 0.080, plud 4, 0.1, quux, 9, 29.2, thud
The returned structure will look like this:
variables = {'a_comp': {'A': [3, 2, 4], 'B': [2.34, 5.82, 0.1], 'C': ['bar', 'baz', 'quux']}, 'another_comp': {'D': [7, 5, 9], 'B': [0.775, 0.080, 29.2], 'F': ['xyzzy', 'plud', 'thud']}}
Note that the corresponding config template file will need to specify sections for a_comp and another_comp that have placeholders for A, B, C, D, and F. The template file will be used to create the config files for each instance, of which there will be three from this example.
- Parameters:
infile – Path to the CSV file
- Returns:
Dictionary of parameters suitable for use in run_ensemble()
- class ipsframework.messages.Message(sender_id: ComponentID, receiver_id: ComponentID)
Base class for all IPS messages. Should not be used in actual communication.
- FAILURE = 1
- SUCCESS = 0
- counter = 0
- delimiter = ''
- get_message_id()
- identifier = 'MESSAGE'
- class ipsframework.messages.MethodInvokeMessage(sender_id: ComponentID, receiver_id: ComponentID, call_id: int, target_method: str, *args, **keywords)
Message used by components to invoke methods on other components.
sender_id: component id of the sender
receiver_id: component id of the receiver
call_id: identifier of the call (generated by caller)
target_method: method to be invoked on the receiver
*args: arguments to be passed to the target_method
- counter = 0
- delimiter = '|'
- identifier = 'INVOKE'
- class ipsframework.messages.MethodResultMessage(sender_id: ComponentID, receiver_id: ComponentID, call_id: int, status: Literal[0, 1], *args)
Message used to relay the return value after a method invocation.
sender_id: component id of the sender (callee)
receiver_id: component id of the receiver (caller)
call_id: identifier of the call (generated by caller)
status: either Message.SUCCESS or Message.FAILURE indicating the success of failure of the invocation.
*args: other information to be passed back to the caller.
- counter = 0
- delimiter = '|'
- identifier = 'RESULT'
- class ipsframework.messages.ServiceRequestMessage(sender_id: ComponentID, receiver_id: ComponentID, target_comp_id: ComponentID, target_method: str, *args, **keywords)
Message used by components to request the result of a service action by one of the IPS managers.
sender_id: component id of the sender
receiver_id: component id of the receiver (framework)
target_comp_id: component id of target component (typically framework)
target_method: name of method to be invoked on component target_comp_id
*args: any number of arguments. These are specific to the target method.
- counter = 0
- delimiter = '|'
- identifier = 'REQUEST'
- class ipsframework.messages.ServiceResponseMessage(sender_id: ComponentID, receiver_id: ComponentID, request_msg_id: str, status: Literal[0, 1], *args)
Message used by managers to respond with the result of the service action to the calling component.
sender_id: component id of the sender (framework)
receiver_id: component id of the receiver (calling component)
request_msg_id: id of request message this is a response to.
status: either Message.SUCCESS or Message.FAILURE
*args: any number of arguments. These are specific to type of response.
- counter = 0
- delimiter = '|'
- identifier = 'RESPONSE'
Framework Components
- class ipsframework.portalBridge.PortalBridge(services, config)
Framework component to communicate with the SWIM web portal.
- class SimulationData
Container for simulation data.
- check_send_post_responses()
- finalize(timestamp=0.0, **keywords)
Produce some default debugging information before the rest of the code is executed.
- http_req_and_response(manager: UrlRequestProcessManager, event_data)
- init(timestamp=0.0, **keywords)
Try to connect to the portal, subscribe to _IPS_MONITOR events and register callback
process_event().
- init_simulation(sim_name, sim_root)
Create and send information about simulation sim_name living in sim_root so the portal can set up corresponding structures to manage data from the sim.
- process_event(topicName, theEvent)
Process a single event theEvent on topic topicName.
- send_ensemble_variables(sim_data, event_data)
- send_event(sim_data, event_data)
Send contents of event_data and sim_data to portal.
- send_jupyter_notebook(sim_data, event_data)
- send_mpo_data(event_data, sim_data)
- send_notebook_data(sim_data, event_data)
- step(timestamp=0.0, **keywords)
Poll for events.
- ipsframework.portalBridge.hash_file(file_name)
Return the MD5 hash of a file :rtype: str :param file_name: Full path to file :return: MD5 of file_name
- ipsframework.portalBridge.send_ensemble_variables(conn: Connection, stop: Event, url: str, api_key: str, username: str)
- ipsframework.portalBridge.send_jupyter_notebook(conn: Connection, stop: Event, url: str, api_key: str, username: str)
- ipsframework.portalBridge.send_jupyter_notebook_data(conn: Connection, stop: Event, url: str, api_key: str, username: str)
- ipsframework.portalBridge.send_post(conn: Connection, stop: Event, url: str)
- class ipsframework.runspaceInitComponent.runspaceInitComponent(services, config)
Framework component to manage runspace initialization, container file management, and file staging for simulation and analysis runs.
- init(timestamp=0.0, **keywords)
Creates base directory, copies IPS and FacetsComposer input files.
- step(timestamp=0.0, **keywords)
Copies individual subcomponent input files into working subdirectories.