Peano
Loading...
Searching...
No Matches
ParticleVTUReader.py
Go to the documentation of this file.
1"""!
2This file contains the ParticleVTUReader class, which hosts
3methods to extract particle data from .vtu or .pvd files
4generated by Peano.
5"""
6
7import os
8import errno
9import numpy as np
10from typing import Union
11
12try:
13 import vtuIO
14except ModuleNotFoundError:
15 raise ModuleNotFoundError(
16 "You need the 'VTUinterface' package to use the ParticleVTUReader"
17 )
18
19
21 """!
22 Clean up a string intended to be a particle attribute
23 from illegal characters.
24
25 Returns
26 -------
27
28 cleaned_attr: str
29 cleaned attribute string.
30 """
31
32 cleaned_attr = attr.strip()
33
34 # keep all illegal characters and their replacements
35 # in this dict. Keys will be replaced by values.
36 illegal_character_replacements = {
37 "-": "_",
38 " ": "_",
39 }
40
41 for key in illegal_character_replacements:
42 cleaned_attr = cleaned_attr.replace(key, illegal_character_replacements[key])
43
44 return cleaned_attr
45
46
47class VTUParticleSet(object):
48 """!
49 Container for particle data. All particle fields shall
50 have the same name as specified in the .vtu files, and
51 be accessible as an attribute of the instantiated object.
52
53 The individual particle values are instances of numpy arrays.
54
55 Methods
56 -------
57
58 get_attribute_list(self):
59 return list of all available particle fields
60
61 show_attribute_list(self):
62 prints out all available particle fields
63
64 merge(self, other):
65 merge another VTUParticleSet into this one.
66 """
67
68 def __init__(self, particle_attributes: list):
69 """
70 Parameters
71 ----------
72 particle_attributes : list of strings
73 contains names of all particle fields
74 """
75
76 self._nattributes = 0
77 # "translation" from particle fields in input .vtu file to
78 # cleaned up attributes of this object
80
81 if len(particle_attributes) == 0:
82 raise ValueError(
83 "Got empty particle attributes list. Can't work with that."
84 )
85
86 cleaned_attributes = []
87 for att in particle_attributes:
88 if not isinstance(att, str):
89 raise ValueError("Attribute '", att, "' is not a string?")
90 cleaned_attr = _clean_attribute_string(att)
91 setattr(self, cleaned_attr, None)
92
93 self._nattributes += 1
94 self._attribute_translator[att] = cleaned_attr
95 cleaned_attributes.append(cleaned_attr)
96
97 self._particle_attribute_list = cleaned_attributes
98
99 return
100
102 """!
103 Return the list of available attributes.
104 """
105 return self._particle_attribute_list
106
108 """!
109 Print the list of available attributes.
110 """
111 print("Available particle fields are:", self._particle_attribute_list)
112 return
113
114 def merge(self, other):
115 """!
116 Merge two VTUParticleSet objects into one (into self).
117 """
118
119 if self._nattributes != other._nattributes:
120 raise ValueError("Unequal number of attributes in datasets, can't merge")
121
122 for attr in self._particle_attribute_list:
123 this_arr = getattr(self, attr)
124 that_arr = getattr(other, attr)
125 newattr = np.concatenate((this_arr, that_arr))
126 setattr(self, attr, newattr)
127
128 return
129
130 def __str__(self):
131 return "Particle Data Container"
132
133 def __repr__(self):
134 return self.__str__()
135
136
137class ParticleVTUReader(object):
138 """!
139 Reads in particle fields from a single .vtu file, or all .vtu
140 at a given snapshot time specified in the provided .pvd file.
141
142
143 Methods
144 -------
145
146 get_all_vtufiles(self):
147 Read in and return all .vtu file names from the .pvd file
148 provided by pvdfilename at initialisation.
149
150 get_snapshot_time(self):
151 Returns the simulation time of the read-in snapshot
152
153 get_snapshot_times(pvdfile):
154 Returns all simulation times stored in pvd file
155
156 load(self):
157 actually read in the data, and return the particle data as a
158 VTUParticleSet object.
159
160 read_single_vtu_file(self, vtufilename: str):
161 Read in particle data from a single .vtu file, and return the filled
162 out VTUParticleSet object containing the particle data.
163 """
164
166 self,
167 vtufile: Union[str, None] = None,
168 snapshot_time: Union[float, None] = None,
169 pvdfile: Union[str, None] = None,
170 verbose: bool = False,
171 ):
172 """!
173 At initialization, either the ``vtufile`` argument needs to
174 be provided, or both the ``snapshot_time`` and ``pvdfile``
175 arguments in order to specify what data to read.
176
177 If ``vtufile`` is given, the ParticleVTUReader will read in a
178 single .vtu file.
179
180 If ``snapshot_time`` and ``pvdfile`` arguments are given, the
181 ParticleVTUReader will read in all .vtu files corresponding to the
182 snapshot at the provided ``snapshot_time``.
183
184 Parameters
185 ----------
186
187 vtufile: str
188 path to .vtu file to be read in
189
190 snapshot_time: float
191 time of the snapshot to extract
192
193 pvdfile: str
194 path to .pvd file to be read in
195
196 verbose: bool
197 how talkative the reading process should be
198
199 """
200
201 self._verbose = verbose
202 self._mode = None
203 self._vtufile = vtufile
204 self._pvdfile = pvdfile
205 self._snapshot_time = snapshot_time
206 # have we read in the snapshot time?
207 self._read_time = False
209
210 # First check that we have all necessary requirements
211 if vtufile is None:
212 if (snapshot_time is None) or (pvdfile is None):
213 raise ValueError(
214 "You need to specify either the `vtufile` argument, or "
215 + "*both* the `snapshot_time` and `pvdfile` arguments so I "
216 + "know what to read."
217 )
218 else:
219 self._mode = "multiple"
220 if self._verbose:
221 print("Setting mode to 'multiple'")
222 else:
223 self._mode = "single"
224 if self._verbose:
225 print("Setting mode to 'single'")
226
227 return
228
229 def load(self):
230 """!
231 Actually read in the data.
232
233 Returns
234 -------
235
236 particleData: VTUParticleSet
237 object containing all read-in particle data as attributes.
238 """
239
240 if self._mode == "single":
241 # read in the single file
242 return self.read_single_vtu_file(self._vtufile)
243
244 elif self._mode == "multiple":
245 vtufiles = self._get_snapshot_vtu_files_from_pvd(
246 self._pvdfile, self._snapshot_time
247 )
248 partdata = None
249 for f in vtufiles:
250 fullpath = self._get_full_vtufile_path(f)
251 new_data = self.read_single_vtu_file(fullpath)
252 if partdata is None:
253 partdata = new_data
254 else:
255 partdata.merge(new_data)
256
257 return partdata
258
259 else:
260 raise ValueError("Unknown mode? How did we get here?")
261
263 """!
264 Return the time/timestep information of the read snapshot.
265 This is only available if you read in the .pvd file, not
266 the .vtu file. If the .vtu file has been read, this method
267 returns `None`. Otherwise, it returns the time of the
268 snapshot, which may vary (slightly) from the snapshot time
269 you specified with when loading the data.
270 """
271
272 if self._read_time:
273 return self._snapshot_time_read
274 else:
275 return None
276
278 """!
279 Read in and return all .vtu file names from the .pvd file
280 provided by pvdfilename at initialisation.
281
282 Returns
283 -------
284
285 vtufilenames: list
286 list of .vtu filenames in the .pvd file, with their
287 full path.
288 """
289
290 if self._pvdfile is None:
291 raise ValueError(
292 """
293No .pvd file has been specified.
294You need to provide a .pvd file name during initialisation of
295the ParticleVTUReader() object.
296 """
297 )
298
299 # backend specified to vtuIO default to avoid warning messages
300 pvdfile = vtuIO.PVDIO(self._pvdfile, interpolation_backend="vtk")
301 allvtufiles = pvdfile.vtufilenames
302 fullpathfiles = [self._get_full_vtufile_path(f) for f in allvtufiles]
303 return fullpathfiles
304
305 def _get_full_vtufile_path(self, vtufile: str):
306 """!
307 The .pvd file will provide relative path names to its
308 own file path for the .vtu files.
309 This function returns the full path of the .vtu file.
310 """
311 if self._pvdfile is None:
312 raise ValueError(
313 """
314No .pvd file has been specified.
315You need to provide a .pvd file name during initialisation of
316the ParticleVTUReader() object.
317 """
318 )
319 pathprefix = os.path.dirname(self._pvdfile)
320 fullpath = os.path.join(pathprefix, vtufile)
321 return fullpath
322
323 def _get_snapshot_vtu_files_from_pvd(self, pvdfilename: str, time: float):
324 """!
325 Read in data from the .pvd file provided by pvdfilename.
326
327 Parameters
328 ----------
329
330 pvdfilename: str
331 filename to read in from
332
333 time: float
334 snapshot time to read in
335
336
337 Returns
338 -------
339
340 vtufilenames: list
341 list of .vtu filenames to read from
342 """
343
344 if not os.path.exists(pvdfilename):
345 raise FileNotFoundError(
346 errno.ENOENT, os.strerror(errno.ENOENT), pvdfilename
347 )
348
349 # 'interpolation_backend' is used to generate plots with seemingly
350 # continuous data. That's not what we're using this module for. We
351 # only want to read in particle data. So I use here the default
352 # 'interpolation_backend', which gets set anyway if it weren't
353 # specified, with the bonus that this way, no annoying warnings are
354 # printed.
355 pvdfile = vtuIO.PVDIO(pvdfilename, interpolation_backend="vtk")
356 timesteps = pvdfile.timesteps
357 if self._verbose:
358 print("Available snapshot time steps:", np.unique(timesteps))
359 allvtufiles = pvdfile.vtufilenames
360
361 # Find snapshots with time closest to desired output time
362 timediff = np.abs(timesteps - time)
363 snap_indexes = timediff == timediff.min()
364
365 # make sure we found something
366 if not snap_indexes.any():
367 errmsg = f"""
368 Couldn't find any snapshots?
369 Looking for snashots with time={self._snapshot_time}
370 pvd file provides following times: {timesteps}
371 pvd file provides following file names: {allvtufiles}
372 """
373 raise ValueError(errmsg)
374
375 # check that we found the correct snapshot time
376 close_enough = True
377 if self._snapshot_time == 0.0:
378 if timediff.min() > 1.0e-5:
379 close_enough = False
380
381 else:
382 if np.abs(timediff.min() / self._snapshot_time) > 1.0e-4:
383 close_enough = False
384
385 if self._verbose or not close_enough:
386 print(f"WARNING: You requested snapshot at time={self._snapshot_time}")
387 print(
388 "WARNING: Closest snapshot time I found is:",
389 timesteps[snap_indexes][0],
390 "; reading that in now",
391 )
392
393 vtufiles = np.array(allvtufiles)[snap_indexes]
394 vtufiles = vtufiles.tolist()
395
396 # store snapshot time
397 self._read_time = True
398 self._snapshot_time_read = timesteps[snap_indexes][0]
399
400 return vtufiles
401
402 def read_single_vtu_file(self, vtufilename: str):
403 """!
404 Read in particle data from a single .vtu file, and return the filled
405 out VTUParticleSet object containing the particle data.
406
407 Parameters
408 ----------
409
410 vtufilename: str
411 .vtu filename to read in from.
412
413
414 Returns
415 -------
416
417 particleData: VTUParticleSet
418 object containing all read-in particle data as attributes.
419 """
420
421 if not os.path.exists(vtufilename):
422 raise FileNotFoundError(
423 errno.ENOENT, os.strerror(errno.ENOENT), vtufilename
424 )
425
426 # 'interpolation_backend' is used to generate plots with seemingly
427 # continuous data. That's not what we're using this module for. We
428 # only want to read in particle data. So I use here the default
429 # 'interpolation_backend', which gets set anyway if it weren't
430 # specified, with the bonus that this way, no annoying warnings are
431 # printed.
432 vtufile = vtuIO.VTUIO(vtufilename, interpolation_backend="vtk")
433 point_field_names = vtufile.get_point_field_names()
434
435 print("-- Reading from", vtufilename)
436 if self._verbose:
437 print("File", vtufilename, "has particle fields:", point_field_names)
438
439 data = VTUParticleSet(point_field_names)
440
441 for attr in point_field_names:
442 part_fields = vtufile.get_point_field(attr)
443 setattr(data, data._attribute_translator[attr], part_fields)
444
445 if self._verbose:
446 print(
447 "File",
448 vtufilename,
449 "read in field",
450 attr,
451 "with shape",
452 part_fields.shape,
453 )
454
455 return data
456
457 def get_snapshot_times(pvdfilename: str):
458 """!
459 Read in available snapshot times from the .pvd file provided by
460 pvdfilename.
461
462 Parameters
463 ----------
464
465 pvdfilename: str
466 filename to read in from
467
468 Returns
469 -------
470
471 times: np.array
472 np.array of available snapshot times
473 """
474
475 if not os.path.exists(pvdfilename):
476 raise FileNotFoundError(
477 errno.ENOENT, os.strerror(errno.ENOENT), pvdfilename
478 )
479
480 # 'interpolation_backend' is used to generate plots with seemingly
481 # continuous data. That's not what we're using this module for. We
482 # only want to read in particle data. So I use here the default
483 # 'interpolation_backend', which gets set anyway if it weren't
484 # specified, with the bonus that this way, no annoying warnings are
485 # printed.
486 pvdfile = vtuIO.PVDIO(pvdfilename, interpolation_backend="vtk")
487 timesteps = pvdfile.timesteps
488 return timesteps
Reads in particle fields from a single .vtu file, or all .vtu at a given snapshot time specified in t...
get_snapshot_time(self)
Return the time/timestep information of the read snapshot.
_get_full_vtufile_path(self, str vtufile)
The .pvd file will provide relative path names to its own file path for the .vtu files.
read_single_vtu_file(self, str vtufilename)
Read in particle data from a single .vtu file, and return the filled out VTUParticleSet object contai...
__init__(self, Union[str, None] vtufile=None, Union[float, None] snapshot_time=None, Union[str, None] pvdfile=None, bool verbose=False)
At initialization, either the vtufile argument needs to be provided, or both the snapshot_time and pv...
get_all_vtufiles(self)
Read in and return all .vtu file names from the .pvd file provided by pvdfilename at initialisation.
get_snapshot_times(str pvdfilename)
Read in available snapshot times from the .pvd file provided by pvdfilename.
_get_snapshot_vtu_files_from_pvd(self, str pvdfilename, float time)
Read in data from the .pvd file provided by pvdfilename.
merge(self, other)
Merge two VTUParticleSet objects into one (into self).
_clean_attribute_string(str attr)
Clean up a string intended to be a particle attribute from illegal characters.