Run Parallel Conditions

Run Parallel Conditions.

This example demonstrates running multiple SIMA conditions in parallel with different input parameters. It also shows how to post-process results from multiple runs, first in each run and then in the main thread after all runs are completed.

The example uses a Riflex task to analyze a simple beam with different wave conditions.

After each run, the maximum axial force is calculated and stored in the result file. After all runs are completed, a summary file is created with the maximum axial force for each run and the overall maximum.

Environment variable SRE_EXE is used to point to the sre executable of SIMA if exe is not provided directly in create_run.

Source Code

run_parallel.py
  1"""Run Parallel Conditions.
  2
  3This example demonstrates running multiple SIMA conditions in parallel with different input parameters.
  4It also shows how to post-process results from multiple runs, first in each run and then in the main thread after all runs are completed.
  5
  6The example uses a Riflex task to analyze a simple beam with different wave conditions.
  7
  8After each run, the maximum axial force is calculated and stored in the result file.
  9After all runs are completed, a summary file is created with the maximum axial force for each run and the overall maximum.
 10
 11
 12Environment variable `SRE_EXE` is used to point to the sre executable of SIMA if exe is not provided directly in create_run.
 13
 14"""
 15
 16import os
 17import shutil
 18from pathlib import Path
 19import h5py
 20import numpy as np
 21from utils.runner import run_multiple
 22
 23
 24def __create_run(
 25    output_folder: Path, stask: Path, variables: dict, cleanup: bool = False
 26):
 27    """Create a run configuration for a single run with SIMA.
 28
 29    Args:
 30        output_folder (Path): Base directory where all outputs will be stored
 31        stask (Path): Path to the SIMA task file (.stask) to be executed
 32        variables (dict): Dictionary of input variables and their values (e.g., {"Hs": 2.0, "Tp": 5.0})
 33        cleanup (bool, optional): Whether to clean up temporary files after execution. Defaults to False.
 34
 35    Returns:
 36        dict: Run configuration dictionary with name, workspace, commands, and settings
 37    """
 38    case_name = "_".join([f"{key}={value}" for key, value in variables.items()])
 39    ws = output_folder / "tmp" / case_name
 40    result_folder = output_folder / "results" / case_name
 41    if result_folder.exists():
 42        shutil.rmtree(result_folder, ignore_errors=True)
 43
 44    task = "SimpleBeam"
 45    condition = "Initial"
 46    # Create input string from variables
 47    input = ";".join([f"{key}={value}" for key, value in variables.items()])
 48    runType = "dynamic"
 49    # Export all the results into the result file
 50    result_file = result_folder / "result.h5"
 51
 52    commands = []
 53    # Import the Riflex task from an stask file
 54    commands.append("--import")
 55    commands.append(f"file={stask.absolute()}")
 56    # Run dynamic analysis for the specified condition
 57    commands.append("--condition")
 58    commands.append(f"task={task}")
 59    commands.append(f"condition={condition}")
 60    commands.append(f"runType={runType}")
 61    # Export all the results into the result file
 62    commands.append(f"output={result_file}")
 63    commands.append(f"input={input}")
 64    # Copy the result files to the result folder
 65    commands.append("copy=true")
 66    commands.append("paths=*.res;*.inp")
 67    commands.append(f"destination={result_folder}")
 68
 69    # Postprocess function that will be called after the run is completed
 70    def post():
 71        __process_single(result_folder, task, condition)
 72        # Copy the log file to the result folder
 73        shutil.copy(ws / "sima.log", result_folder)
 74        if cleanup:
 75            # Remove workspace folder
 76            shutil.rmtree(ws, ignore_errors=True)
 77
 78    # Provide the path to SRE executable if not set in environment SRE_EXE
 79    exe = None
 80    return {
 81        "name": case_name,
 82        "exe": exe,
 83        "ws": ws,
 84        "commands": commands,
 85        "postprocess": post,
 86        "result_file": result_file,
 87        "task": task,
 88        "condition": condition,
 89        "cleanup": cleanup,
 90    }
 91
 92
 93def __process_single(result_folder: Path, task: str, condition: str):
 94    """Post-process results from a single SIMA run.
 95
 96    Calculates the maximum axial force for the given element and stores it back into the result file.
 97
 98    Args:
 99        result_folder (Path): Directory containing the result files from the analysis
100        task (str): Name of the task
101        condition (str): Name of the condition
102    """
103    file = result_folder / "result.h5"
104    # We want to both read and write to the file
105    with h5py.File(file, "r+") as f:
106        # Get the element from the file
107        element = f[f"/{task}/{condition}/Dynamic/BeamLine/segment_1/element_1"]
108        # Get the axial force from the element
109        axial_force = element["Axial force"][:]
110        # Calculate the maximum axial force
111        max_force = np.max(axial_force)
112        # Store the maximum axial force back into the file
113        element.create_dataset("axial_force_max", data=[max_force])
114
115
116def __process_multiple(runs: list[dict], output_folder: Path):
117    """Perform combined analysis of results from multiple parallel runs.
118
119    Creates a summary file with maximum axial forces from each run and identifies
120    the overall maximum across all conditions.
121    """
122    # Pick up all the result files to do some combined analysis
123    # Write a summary file
124    summary_file = output_folder / "summary.txt"
125    with open(summary_file, "w") as summary:
126        max_forces = []
127        for run in runs:
128            result_file = run["result_file"]
129            task = run["task"]
130            condition = run["condition"]
131            element_path = f"/{task}/{condition}/Dynamic/BeamLine/segment_1/element_1"
132            with h5py.File(result_file, "r") as f:
133                element = f[element_path]
134                axial_force_max = element["axial_force_max"][:]
135                summary.write(f"{run['name']}: {axial_force_max}\n")
136                max_forces.append(axial_force_max)
137        summary.write(f"Max force: {np.max(max_forces)}\n")
138
139
140if __name__ == "__main__":
141    here = Path(__file__).parent.absolute()
142    examples_root = (here / "../../../").resolve()
143    output_folder = examples_root / "output" / "run_parallel"
144    input_folder = examples_root / "input"
145
146    # Flag for cleanup, will remove working folders if True
147    cleanup = True
148
149    if cleanup:
150        shutil.rmtree(output_folder, ignore_errors=True)
151    os.makedirs(output_folder, exist_ok=True)
152
153    stask = input_folder / "riflex" / "simple_beam.stask"
154
155    # Prepare runs before sending them to the executor
156    runs = []
157    runs.append(__create_run(output_folder, stask, {"Hs": 2.0, "Tp": 5.0}, cleanup=cleanup))
158    runs.append(__create_run(output_folder, stask, {"Hs": 3.0, "Tp": 6.0}, cleanup=cleanup))
159    runs.append(__create_run(output_folder, stask, {"Hs": 4.0, "Tp": 7.0}, cleanup=cleanup))
160
161    # Schedule runs
162    run_multiple(runs)
163
164    # Then postprocess after all are done
165    __process_multiple(runs, output_folder)
166
167    tmp_folder = output_folder / "tmp"
168    if cleanup:
169        # Remove tmp folder
170        shutil.rmtree(tmp_folder, ignore_errors=True)