Run Concurrent Workflows
This example demonstrates how the Python module concurrent.futures may be used to run workflows with multiple SIMA instances in parallel.
Source Code
run_parallel.py
1"""
2This example demonstrates how the Python module concurrent.futures may be used to run
3workflows with multiple SIMA instances in parallel.
4"""
5import os
6import sys
7import shutil
8from pathlib import Path
9from concurrent.futures import ThreadPoolExecutor
10from simapy.sre import SIMA
11from simapy.sima_writer import SIMAWriter
12from simapy.sima.signals.equallyspacedsignal import EquallySpacedSignal
13
14def run_single(case_num: int, a: float, b: float):
15 print(f'Starting case {case_num}')
16 ws_root = Path(__file__).parent / '..' / '..' / 'output' / 'workflow' / 'concurent'
17 ws = ws_root / f'ws_{case_num}'
18 if ws.exists():
19 shutil.rmtree(ws,ignore_errors=True)
20 os.makedirs(ws,exist_ok=True)
21
22 a_signal = EquallySpacedSignal(name='a')
23 a_signal.value = [a]
24 b_signal = EquallySpacedSignal(name='b')
25 b_signal.value = [b]
26
27 SIMAWriter().write([a_signal, b_signal], ws / 'input.json')
28
29 # Locate stask file in folder 'input' at the root of this repository
30 stask = Path(__file__).parent / '..' / '..' / 'input' / 'workflow' / 'simple_workflow_run.stask'
31
32 commands = []
33 commands.append('--import')
34 commands.append('file='+str(stask.absolute()))
35 commands.append('--run')
36 commands.append('task=WorkflowTask')
37 commands.append('workflow=outer')
38
39 # Requires that the environment is set, but an alternative path may be given
40
41 sima = SIMA()
42 # Create a handler to print the progress while running
43 def __handle_output(line):
44 # "@STATUS "Total" 800 1000"
45 if line.startswith("@STATUS"):
46 # Findt the last number
47 parts = line.split()
48 worked = float(parts[-1-1])
49 total = float(parts[-1])
50 progress = 100 * worked/total
51 print(f"Progress case {case_num}: ",progress,"%")
52
53 sima.console_handler = __handle_output
54 sima.run(working_dir=str(ws), command_args=commands)
55 print(f'Case {case_num} completed successfully')
56
57
58def main():
59 # Run a maximum of cpu_count() - 1 workflows at the same time
60 num_concurrent = os.cpu_count() - 1
61 # Total number of workflow runs
62 num_cases = 10
63 with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
64 jobs = []
65 for i in range(num_cases):
66 # Set some arbitrary values for a and b
67 a = i * 2.0
68 b = (i + 1) * 10.0
69 # Submit job
70 jobs.append(executor.submit(run_single, i, a, b))
71 any_failure = False
72 for i, job in enumerate(jobs):
73 try:
74 job.result()
75 except Exception as e:
76 print(f'Job {i} Failed: {e}')
77 any_failure = True
78
79 if any_failure:
80 print('Runs completed with errors')
81 sys.exit(1)
82 else:
83 print('All runs completed successfully')
84
85if __name__ == "__main__":
86 main()