Run Concurrent Workflows

This example demonstrates how the Python module concurrent.futures may be used to run workflows with multiple SIMA instances in parallel.

main()[source]
run_single(case_num, a, b)[source]

Source Code

run_parallel.py
 1"""
 2This example demonstrates how the Python module concurrent.futures may be used to run
 3workflows with multiple SIMA instances in parallel.
 4"""
 5import os
 6import sys
 7import shutil
 8from pathlib import Path
 9from concurrent.futures import ThreadPoolExecutor
10from simapy.sre import SIMA
11from simapy.sima_writer import SIMAWriter
12from simapy.sima.signals.equallyspacedsignal import EquallySpacedSignal
13
14def run_single(case_num: int, a: float, b: float):
15    print(f'Starting case {case_num}')
16    ws_root = Path(__file__).parent / '..' / '..' / 'output' / 'workflow' / 'concurent'
17    ws = ws_root / f'ws_{case_num}'
18    if ws.exists():
19        shutil.rmtree(ws,ignore_errors=True)
20    os.makedirs(ws,exist_ok=True)
21
22    a_signal = EquallySpacedSignal(name='a')
23    a_signal.value = [a]
24    b_signal = EquallySpacedSignal(name='b')
25    b_signal.value = [b]
26
27    SIMAWriter().write([a_signal, b_signal], ws / 'input.json')
28
29    # Locate stask file in folder 'input' at the root of this repository
30    stask = Path(__file__).parent / '..' / '..' / 'input' / 'workflow' / 'simple_workflow_run.stask'
31
32    commands = []
33    commands.append('--import')
34    commands.append('file='+str(stask.absolute()))
35    commands.append('--run')
36    commands.append('task=WorkflowTask')
37    commands.append('workflow=outer')
38
39    # Requires that the environment is set, but an alternative path may be given
40
41    sima = SIMA()
42    # Create a handler to print the progress while running
43    def __handle_output(line):
44        # "@STATUS "Total" 800 1000"
45        if line.startswith("@STATUS"):
46            # Findt the last number
47            parts = line.split()
48            worked = float(parts[-1-1])
49            total = float(parts[-1])
50            progress = 100 * worked/total
51            print(f"Progress case {case_num}: ",progress,"%")
52
53    sima.console_handler = __handle_output
54    sima.run(working_dir=str(ws), command_args=commands)
55    print(f'Case {case_num} completed successfully')
56
57
58def main():
59    # Run a maximum of cpu_count() - 1 workflows at the same time
60    num_concurrent = os.cpu_count() - 1
61    # Total number of workflow runs
62    num_cases = 10
63    with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
64        jobs = []
65        for i in range(num_cases):
66            # Set some arbitrary values for a and b
67            a = i * 2.0
68            b = (i + 1) * 10.0
69            # Submit job
70            jobs.append(executor.submit(run_single, i, a, b))
71        any_failure = False
72        for i, job in enumerate(jobs):
73            try:
74                job.result()
75            except Exception as e:
76                print(f'Job {i} Failed: {e}')
77                any_failure = True
78
79    if any_failure:
80        print('Runs completed with errors')
81        sys.exit(1)
82    else:
83        print('All runs completed successfully')
84
85if __name__ == "__main__":
86    main()