5
5
from time import process_time , time
6
6
7
7
import h5py
8
- import hdf5plugin
9
8
import numpy as np
10
9
import simplekml
11
- from multiprocess import Lock , Process , JoinableQueue
10
+ from multiprocess import JoinableQueue , Lock , Process
12
11
from multiprocess .managers import BaseManager
13
12
14
13
from rocketpy import Function
@@ -170,7 +169,7 @@ def simulate(
170
169
If True, the results will be appended to the existing files. If
171
170
False, the files will be overwritten. Default is False.
172
171
light_mode : bool, optional
173
- If True, tonly variables from the export_list will be saved to
172
+ If True, only variables from the export_list will be saved to
174
173
the output file as a .txt file. If False, all variables will be
175
174
saved to the output file as a .h5 file. Default is False.
176
175
parallel : bool, optional
@@ -180,9 +179,6 @@ def simulate(
180
179
-------
181
180
None
182
181
"""
183
- if append and light_mode is False :
184
- raise ValueError ("Append mode is not available when light_mode is False." )
185
-
186
182
# initialize counters
187
183
self .number_of_simulations = number_of_simulations
188
184
self .iteration_count = self .num_of_loaded_sims if append else 0
@@ -194,22 +190,27 @@ def simulate(
194
190
195
191
# Run simulations
196
192
if parallel :
197
- self ._run_in_parallel (append , light_mode = light_mode )
193
+ self .__run_in_parallel (append , light_mode = light_mode )
198
194
else :
199
- self ._run_in_serial (append , light_mode = light_mode )
195
+ self .__run_in_serial (append , light_mode = light_mode )
200
196
201
- def _run_in_serial (self , append , light_mode ):
197
+ def __run_in_serial (self , append , light_mode ):
202
198
"""
203
199
Runs the monte carlo simulation in serial mode.
204
200
205
- Args:
201
+ Parameters
202
+ ----------
206
203
append: bool
207
204
If True, the results will be appended to the existing files. If
208
205
False, the files will be overwritten.
209
206
light_mode: bool
210
207
If True, only variables from the export_list will be saved to
211
208
the output file as a .txt file. If False, all variables will be
212
209
saved to the output file as a .h5 file.
210
+
211
+ Returns
212
+ -------
213
+ None
213
214
"""
214
215
215
216
# Create data files for inputs, outputs and error logging
@@ -224,12 +225,18 @@ def _run_in_serial(self, append, light_mode):
224
225
input_file = h5py .File (self ._input_file .with_suffix (".h5" ), open_mode )
225
226
output_file = h5py .File (self ._output_file .with_suffix (".h5" ), open_mode )
226
227
error_file = open (self ._error_file , open_mode , encoding = "utf-8" )
228
+
229
+ idx_i = self .__get_initial_sim_idx (input_file )
230
+ idx_o = self .__get_initial_sim_idx (output_file )
231
+
232
+ if idx_i != idx_o :
233
+ raise ValueError ("Input and output files are not synchronized. Append mode is not available." )
227
234
228
235
# Run simulations
229
236
try :
230
237
while self .iteration_count < self .number_of_simulations :
231
238
self .__run_single_simulation (
232
- self .iteration_count , input_file , output_file , light_mode = light_mode
239
+ self .iteration_count + idx_i , input_file , output_file , light_mode = light_mode
233
240
)
234
241
235
242
except KeyboardInterrupt :
@@ -247,11 +254,12 @@ def _run_in_serial(self, append, light_mode):
247
254
input_file , output_file , error_file , light_mode = light_mode
248
255
)
249
256
250
- def _run_in_parallel (self , append , light_mode , n_workers = None ):
257
+ def __run_in_parallel (self , append , light_mode , n_workers = None ):
251
258
"""
252
259
Runs the monte carlo simulation in parallel.
253
-
254
- Args:
260
+
261
+ Parameters
262
+ ----------
255
263
append: bool
256
264
If True, the results will be appended to the existing files. If
257
265
False, the files will be overwritten.
@@ -275,13 +283,12 @@ def _run_in_parallel(self, append, light_mode, n_workers=None):
275
283
inputs_lock = manager .Lock ()
276
284
outputs_lock = manager .Lock ()
277
285
errors_lock = manager .Lock ()
278
- sim_counter = manager .SimCounter ()
279
286
queue = manager .JoinableQueue ()
280
-
287
+
281
288
# Initialize queue
282
289
for _ in range (self .number_of_simulations ):
283
290
queue .put ("RUN" )
284
-
291
+
285
292
for _ in range (n_workers ):
286
293
queue .put ("STOP" )
287
294
@@ -303,27 +310,34 @@ def _run_in_parallel(self, append, light_mode, n_workers=None):
303
310
pass # initialize file
304
311
305
312
else :
306
- with h5py .File (self ._input_file .with_suffix (".h5" ), 'w' ) as _ :
307
- pass # initialize file
308
- with h5py .File (self ._output_file .with_suffix (".h5" ), 'w' ) as _ :
309
- pass # initialize file
310
-
313
+ # Change file extensions to .h5
311
314
file_paths ["input_file" ] = file_paths ["input_file" ].with_suffix (".h5" )
312
315
file_paths ["output_file" ] = file_paths ["output_file" ].with_suffix (".h5" )
313
316
file_paths ["error_file" ] = file_paths ["error_file" ].with_suffix (".h5" )
314
317
318
+ # Initialize files and get initial simulation index
319
+ with h5py .File (file_paths ["input_file" ], open_mode ) as f :
320
+ idx_i = self .__get_initial_sim_idx (f )
321
+ with h5py .File (file_paths ["output_file" ], open_mode ) as f :
322
+ idx_o = self .__get_initial_sim_idx (f )
323
+
324
+ if idx_i != idx_o :
325
+ raise ValueError ("Input and output files are not synchronized. Append mode is not available." )
326
+
315
327
# Initialize error file - always a .txt file
316
328
with open (self ._error_file , mode = open_mode ) as _ :
317
329
pass # initialize file
330
+
331
+ # Initialize simulation counter
332
+ sim_counter = manager .SimCounter (idx_i )
318
333
319
-
320
- print ("Starting monte carlo analysis" , end = "\r " )
334
+ print ("\n Starting monte carlo analysis" , end = "\r " )
321
335
print (f"Number of simulations: { self .number_of_simulations } " )
322
336
323
337
# Creates n_workers processes then starts them
324
338
for _ in range (n_workers ):
325
339
p = Process (
326
- target = self ._run_simulation_worker ,
340
+ target = self .__run_simulation_worker ,
327
341
args = (
328
342
self .environment ,
329
343
self .rocket ,
@@ -352,11 +366,11 @@ def _run_in_parallel(self, append, light_mode, n_workers=None):
352
366
parallel_end = time ()
353
367
354
368
print ("-" * 80 + "\n All workers joined, simulation complete." )
355
- print (f"In total, { sim_counter .get_count ()} simulations were performed." )
369
+ print (f"In total, { sim_counter .get_count () - idx_i } simulations were performed." )
356
370
print ("Simulation took" , parallel_end - parallel_start , "seconds to run." )
357
371
358
372
@staticmethod
359
- def _run_simulation_worker (
373
+ def __run_simulation_worker (
360
374
sto_env ,
361
375
sto_rocket ,
362
376
sto_flight ,
@@ -370,8 +384,9 @@ def _run_simulation_worker(
370
384
):
371
385
"""
372
386
Runs a simulation from a queue.
373
-
374
- Args:
387
+
388
+ Parameters
389
+ ----------
375
390
worker_no: int
376
391
Worker number.
377
392
n_sim: int
@@ -400,7 +415,7 @@ def _run_simulation_worker(
400
415
while True :
401
416
if queue .get () == "STOP" :
402
417
break
403
-
418
+
404
419
sim_idx = sim_counter .increment ()
405
420
sim_start = time ()
406
421
@@ -442,12 +457,16 @@ def _run_simulation_worker(
442
457
443
458
# Write flight setting and results to file
444
459
inputs_lock .acquire ()
445
- with open (file_paths ["input_file" ], mode = 'a' , encoding = "utf-8" ) as f :
460
+ with open (
461
+ file_paths ["input_file" ], mode = 'a' , encoding = "utf-8"
462
+ ) as f :
446
463
f .write (json .dumps (inputs_dict , cls = RocketPyEncoder ) + "\n " )
447
464
inputs_lock .release ()
448
-
465
+
449
466
outputs_lock .acquire ()
450
- with open (file_paths ["output_file" ], mode = 'a' , encoding = "utf-8" ) as f :
467
+ with open (
468
+ file_paths ["output_file" ], mode = 'a' , encoding = "utf-8"
469
+ ) as f :
451
470
f .write (json .dumps (results , cls = RocketPyEncoder ) + "\n " )
452
471
outputs_lock .release ()
453
472
@@ -468,12 +487,12 @@ def _run_simulation_worker(
468
487
469
488
inputs_lock .acquire ()
470
489
with h5py .File (file_paths ["input_file" ], 'a' ) as h5_file :
471
- MonteCarlo .dict_to_h5 (h5_file , '/' , export_inputs )
490
+ MonteCarlo .__dict_to_h5 (h5_file , '/' , export_inputs )
472
491
inputs_lock .release ()
473
492
474
493
outputs_lock .acquire ()
475
494
with h5py .File (file_paths ["output_file" ], 'a' ) as h5_file :
476
- MonteCarlo .dict_to_h5 (h5_file , '/' , export_outputs )
495
+ MonteCarlo .__dict_to_h5 (h5_file , '/' , export_outputs )
477
496
outputs_lock .release ()
478
497
479
498
sim_end = time ()
@@ -482,15 +501,15 @@ def _run_simulation_worker(
482
501
"-" * 80
483
502
+ f"\n Simulation { sim_idx } took { sim_end - sim_start } seconds to run."
484
503
)
485
-
504
+
486
505
except Exception as error :
487
506
print (f"Error on iteration { sim_idx } : { error } " )
488
507
errors_lock .acquire ()
489
508
with open (file_paths ["error_file" ], mode = 'a' , encoding = "utf-8" ) as f :
490
509
f .write (json .dumps (inputs_dict , cls = RocketPyEncoder ) + "\n " )
491
510
errors_lock .release ()
492
511
raise error
493
-
512
+
494
513
finally :
495
514
print ("Worker stopped." )
496
515
@@ -540,8 +559,8 @@ def __run_single_simulation(
540
559
export_inputs = {str (sim_idx ): input_parameters }
541
560
export_outputs = {str (sim_idx ): flight_results }
542
561
543
- self .dict_to_h5 (input_file , '/' , export_inputs )
544
- self .dict_to_h5 (output_file , '/' , export_outputs )
562
+ self .__dict_to_h5 (input_file , '/' , export_inputs )
563
+ self .__dict_to_h5 (output_file , '/' , export_outputs )
545
564
546
565
average_time = (process_time () - self .start_cpu_time ) / self .iteration_count
547
566
estimated_time = int (
@@ -1016,28 +1035,58 @@ def inspect_object_attributes(obj):
1016
1035
return result
1017
1036
1018
1037
@staticmethod
1019
- def dict_to_h5 (h5_file , path , dic ):
1038
+ def __get_initial_sim_idx (file ):
1039
+ """
1040
+ Get the initial simulation index from the filename.
1041
+
1042
+ Parameters
1043
+ ----------
1044
+ filename : str
1045
+ Name of the file to be analyzed.
1046
+
1047
+ Returns
1048
+ -------
1049
+ int
1050
+ Initial simulation index.
1051
+ """
1052
+ if len (file .keys ()) == 0 :
1053
+ return 0
1054
+
1055
+ keys = [int (key ) for key in file .keys ()]
1056
+ return max (keys ) + 1
1057
+
1058
+
1059
+ @staticmethod
1060
+ def __dict_to_h5 (h5_file , path , dic ):
1020
1061
"""
1021
1062
....
1022
1063
"""
1023
1064
for key , item in dic .items ():
1024
- if isinstance (
1025
- item , (np .int64 , np .float64 , int , float )
1026
- ):
1065
+ if isinstance (item , (np .int64 , np .float64 , int , float )):
1027
1066
data = np .array ([[item ]])
1028
- h5_file .create_dataset (path + key , data = data , shape = data .shape , dtype = data .dtype )
1067
+ h5_file .create_dataset (
1068
+ path + key , data = data , shape = data .shape , dtype = data .dtype
1069
+ )
1029
1070
elif isinstance (item , np .ndarray ):
1030
1071
if len (item .shape ) < 2 :
1031
- item = item .reshape (- 1 , 1 ) # Ensure it is a column vector
1032
- h5_file .create_dataset (path + key , data = item , shape = item .shape , dtype = item .dtype )
1072
+ item = item .reshape (- 1 , 1 ) # Ensure it is a column vector
1073
+ h5_file .create_dataset (
1074
+ path + key ,
1075
+ data = item ,
1076
+ shape = item .shape ,
1077
+ dtype = item .dtype ,
1078
+ )
1033
1079
elif isinstance (item , (str , bytes )):
1034
- h5_file .create_dataset (path + key , data = item )
1080
+ h5_file .create_dataset (
1081
+ path + key ,
1082
+ data = item ,
1083
+ )
1035
1084
elif isinstance (item , Function ):
1036
1085
raise TypeError (
1037
1086
"Function objects should be preprocessed before saving."
1038
1087
)
1039
1088
elif isinstance (item , dict ):
1040
- MonteCarlo .dict_to_h5 (h5_file , path + key + '/' , item )
1089
+ MonteCarlo .__dict_to_h5 (h5_file , path + key + '/' , item )
1041
1090
else :
1042
1091
pass # Implement other types as needed
1043
1092
@@ -1054,12 +1103,12 @@ def __init__(self):
1054
1103
1055
1104
1056
1105
class SimCounter :
1057
- def __init__ (self ):
1058
- self .count = 0
1106
+ def __init__ (self , initial_count ):
1107
+ self .count = initial_count
1059
1108
1060
1109
def increment (self ) -> int :
1061
1110
self .count += 1
1062
- return self .count
1111
+ return self .count - 1
1063
1112
1064
1113
def get_count (self ) -> int :
1065
1114
return self .count
0 commit comments