Commit 33e98f7f authored by Olivier Lantsoght's avatar Olivier Lantsoght
Browse files

[MBsysPy][MbsResults] Implement specific class into dirdyn, invdyn and solvekin

parent c78f7350
......@@ -25,10 +25,11 @@ from ..mbs_utilities import str_to_bytes
from ..mbs_utilities import mbs_msg
# importing MbsysPy classes
from .mbs_dirdyn import MbsResult
from .mbs_results import MbsResult
# importing libraries
from .._mbsysc_loader.loadlibs import libmodules
from .._mbsysc_loader.loadlibs import libutilities
# =============================================================================
# Global parameter of the current module
......@@ -197,9 +198,10 @@ class MbsInvdyn(object):
self.set_options(**kwargs)
error2 = 0 # Default value
if not self.store_results:
error = libmodules.mbs_run_invdyn(self.mbs_invdyn_ptr, self.mbs.mbs_data_ptr)
error2 = 0 # Unused for this module
else:
# save2file forced to 1 because if buffers don't have the complete
# results, results are loaded from files.
......@@ -210,17 +212,39 @@ class MbsInvdyn(object):
results_loaded = False
# Results(buffer) memory is kept BUT FILES WILL BE WRITTEN LATER
if error >= 0 and self.get_options("save2file"):
results_loaded = self._load_results()
# finish function is required to close the module and write the results to disk.
error2 = libmodules.mbs_invdyn_finish(self.mbs_invdyn_ptr, self.mbs.mbs_data_ptr)
if error2 >= 0 and error >= 0 and not results_loaded:
mbs_msg("The beginning of the integration is not available in the buffer.\n"
"The complete results are loaded from files.\n")
filename = bytes_to_str(ctypes.string_at(self.mbs_invdyn_ptr.contents.buffers[0].contents.filename))
self.results.load_results_from_file(filename, module=6)
if error >= 0:
if self.get_options("save2file"):
results_loaded = self.results.load_results_from_buffer(self.mbs_invdyn_ptr,
self.get_options("t0"),
self.get_options("resfilename"))
# If failed to load from buffer, save the user output (and vector) name
if not results_loaded:
results_filename = bytes_to_str(ctypes.string_at(self.mbs_dirdyn_ptr.contents.buffers[0].contents.filename))
# Check if user vector output have been defined
user_output_vector_filenames = []
nb_user_output_vector = libutilities.get_output_vector_nb()
first_buffer_id = self.buffer_nb - nb_user_output_vector
for i in range(nb_user_output_vector):
vector_name = bytes_to_str(self.mbs_dirdyn_ptr.contents.buffers[first_buffer_id + i].contents.filename)
user_output_vector_filenames.append(os.path.basename(vector_name))
# Check if user auto output have been used
user_output_filenames = []
nbOutput = self.mbs_dirdyn_ptr.contents.user_buffer.contents.nx
for i in range(nbOutput):
name = bytes_to_str(self.mbs_dirdyn_ptr.contents.user_buffer.contents.names[i])
user_output_filenames.append(self.get_options("resfilename") + '_' + name + '.res')
# finish function is required to close the module and write the results to disk.
error2 = libmodules.mbs_invdyn_finish(self.mbs_invdyn_ptr, self.mbs.mbs_data_ptr)
if error2 >= 0 and self.get_options("save2file") and not results_loaded:
mbs_msg("The beginning of the integration is not available in the buffer.\n"
"The complete results are loaded from files.\n")
self.results.load_results_from_file(results_filename,
user_output=user_output_filenames,
user_vector=user_output_vector_filenames)
# Unassign user functions
if self.mbs.opt_load_c < 2:
......@@ -252,69 +276,6 @@ class MbsInvdyn(object):
return self.results
def _load_results(self):
"""
Load the results from the buffers.
If the beginning of the integration is not available in the buffers,
the complete results are loaded from files.
Returns
-------
bool
True if the results have been fully loaded from memory.
False if the results must be loaded from files.
"""
# c_mbs_invdyn_write_buffers(self.mbs_invdyn_ptr)
size1 = self.mbs_invdyn_ptr.contents.buffers[0].contents.index
if size1 == 0:
size1 = self.mbs_invdyn_ptr.contents.buffers[0].contents.size
size2 = self.mbs_invdyn_ptr.contents.buffers[0].contents.nx + 1
# array are initialized to the time pointer so as to start index of joints at 1(we have to ensure contiguity between t and x in buffers ! ! !)
self.results.q = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[0].contents.tx, (size1, size2)))
# get time array from the q buffer
self.results.t = self.results.q[:, 0]
if not self.results.t[0] == self.get_options("t0"):
self.results.q = []
self.results.t = []
return False
# get qd and qdd buffer
self.results.qd = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[1].contents.tx, (size1, size2)))
self.results.qdd = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[2].contents.tx, (size1, size2)))
size2 = self.mbs_invdyn_ptr.contents.buffers[3].contents.nx + 1
self.results.Qq = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[3].contents.tx, (size1, size2)))
if self.module_name == "MbsInvdyn":
size2 = self.mbs_invdyn_ptr.contents.buffers[4].contents.nx + 1
self.results.Qa = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[4].contents.tx, (size1, size2)))
buffer_id = 5
else:
buffer_id = 4
if self.mbs.Nlink:
size2 = self.mbs_invdyn_ptr.contents.buffers[buffer_id].contents.nx + 1
self.results.Z = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[buffer_id].contents.tx, (size1, size2)))
self.results.Zd = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[buffer_id + 1].contents.tx, (size1, size2)))
self.results.Fl = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[buffer_id + 2].contents.tx, (size1, size2)))
buffer_id = buffer_id + 3
if self.mbs.nqc:
size2 = self.mbs_invdyn_ptr.contents.buffers[buffer_id].contents.nx + 1
self.results.Qc = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[buffer_id].contents.tx, (size1, size2)))
buffer_id = buffer_id + 1
if self.mbs.nhu:
size2 = self.mbs_invdyn_ptr.contents.buffers[buffer_id].contents.nx + 1
self.results.Lambda = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.buffers[buffer_id].contents.tx, (size1, size2)))
if self.mbs_invdyn_ptr.contents.user_buffer.contents.nx:
size = self.mbs_invdyn_ptr.contents.user_buffer.contents.index
nbOutput = self.mbs_invdyn_ptr.contents.user_buffer.contents.nx
user_out = np.copy(np.ctypeslib.as_array(self.mbs_invdyn_ptr.contents.user_buffer.contents.X[0], (nbOutput, size)))
for i in range(nbOutput):
name = bytes_to_str(self.mbs_invdyn_ptr.contents.user_buffer.contents.names[i])
self.results.outputs[name] = user_out[i, :]
return True
def set_user_fct_from_file(self, function_name, user_path, user_file):
"""
Load a user function from a file chosen by the user instead of the default one in the userfctR folder.
......@@ -329,8 +290,6 @@ class MbsInvdyn(object):
path to the new user function file.
user_file : str
name of the new user function file.
"""
self.mbs.__set_user_fct_from_file__(function_name, user_path, user_file)
......@@ -346,7 +305,6 @@ class MbsInvdyn(object):
name of the user function to replace.
user_fct_ptr : ptr
new user function pointer.
"""
self.mbs.__set_user_fct_from_ptr__(function_name, user_fct_ptr)
......
......@@ -25,12 +25,13 @@ from ..mbs_utilities import str_to_bytes
from ..mbs_utilities import mbs_msg
# importing MbsysPy classes
from .mbs_dirdyn import MbsResult
from .mbs_results import MbsResult
from .mbs_invdyn import MbsInvdyn
# importing libraries
from .._mbsysc_loader.loadlibs import libmodules
from .._mbsysc_loader.loadlibs import libutilities
# =============================================================================
# Global parameter of the current module
......@@ -162,9 +163,9 @@ class MbsSolvekin(MbsInvdyn):
self.set_options(**kwargs)
error_2 = 0 # Default value
if not self.store_results:
error_1 = libmodules.mbs_run_solvekin(self.mbs_solvekin_ptr, self.mbs.mbs_data_ptr)
error_2 = 0 # Unused for this path
else:
# save2file forced to 1 because if buffers don't have the complete
# results, results are loaded from files.
......@@ -175,16 +176,38 @@ class MbsSolvekin(MbsInvdyn):
# Results (buffer) memory is kept BUT FILES WILL BE WRITTEN LATER
results_loaded = False
if error_1 >= 0 and self.get_options("save2file"):
results_loaded = self._load_results()
error_2 = libmodules.mbs_solvekin_finish(self.mbs_solvekin_ptr, self.mbs.mbs_data_ptr)
if error_2 >= 0 and error_1 >= 0 and not results_loaded:
mbs_msg("The beginning of the integration is not available in the buffer.\n"
"The complete results are loaded from files.\n")
filename = bytes_to_str(ctypes.string_at(self.mbs_solvekin_ptr.contents.buffers[0].contents.filename))
self.results.load_results_from_file(filename, module=5)
if error_1 >= 0:
if self.get_options("save2file"):
results_loaded = self.results.load_results_from_buffer(self.mbs_solvekin_ptr,
self.get_options("t0"),
self.get_options("resfilename"))
# If failed to load from buffer, save the user output (and vector) name
if not results_loaded:
results_filename = bytes_to_str(ctypes.string_at(self.mbs_dirdyn_ptr.contents.buffers[0].contents.filename))
# Check if user vector output have been defined
user_output_vector_filenames = []
nb_user_output_vector = libutilities.get_output_vector_nb()
first_buffer_id = self.buffer_nb - nb_user_output_vector
for i in range(nb_user_output_vector):
vector_name = bytes_to_str(self.mbs_dirdyn_ptr.contents.buffers[first_buffer_id + i].contents.filename)
user_output_vector_filenames.append(os.path.basename(vector_name))
# Check if user auto output have been used
user_output_filenames = []
nbOutput = self.mbs_dirdyn_ptr.contents.user_buffer.contents.nx
for i in range(nbOutput):
name = bytes_to_str(self.mbs_dirdyn_ptr.contents.user_buffer.contents.names[i])
user_output_filenames.append(self.get_options("resfilename") + '_' + name + '.res')
error_2 = libmodules.mbs_solvekin_finish(self.mbs_solvekin_ptr, self.mbs.mbs_data_ptr)
if error_2 >= 0 and error_1 >= 0 and not results_loaded:
mbs_msg("The beginning of the integration is not available in the buffer.\n"
"The complete results are loaded from files.\n")
self.results.load_results_from_file(results_filename,
user_output=user_output_filenames,
user_vector=user_output_vector_filenames)
# Unassign user functions
if self.mbs.opt_load_c < 2:
......@@ -216,52 +239,6 @@ class MbsSolvekin(MbsInvdyn):
return self.results
def _load_results(self):
"""
Load the results.
If the beginning of the integration is not available in the buffers,
the complete results are loaded from files. If the option 'motion' is
set to 'oneshot', the results are loaded from MbsData
Returns
-------
bool
True if the results have been fully loaded from memory.
False if the results must be loaded from files.
"""
if self.get_options("motion") == "oneshot":
self.results.q = np.array(self.mbs.q)
self.results.qd = np.array(self.mbs.qd)
self.results.qdd = np.array(self.mbs.qdd)
else:
# c_mbs_invdyn_write_buffers(self.mbs_solvekin_ptr)
size1 = self.mbs_solvekin_ptr.contents.buffers[0].contents.index
if size1 == 0:
size1 = self.mbs_solvekin_ptr.contents.buffers[0].contents.size
size2 = self.mbs_solvekin_ptr.contents.buffers[0].contents.nx + 1
# array are initialized to the time pointer so as to start index of joints at 1 (we have to ensure contiguity between t and x in buffers ! ! !)
self.results.q = np.copy(np.ctypeslib.as_array(self.mbs_solvekin_ptr.contents.buffers[0].contents.tx, (size1, size2)))
# get time array from the q buffer
self.results.t = self.results.q[:, 0]
if not self.results.t[0] == self.get_options("t0"):
self.results.q = []
self.results.t = []
return False
# get qd and qdd buffer
self.results.qd = np.copy(np.ctypeslib.as_array(self.mbs_solvekin_ptr.contents.buffers[1].contents.tx, (size1, size2)))
self.results.qdd = np.copy(np.ctypeslib.as_array(self.mbs_solvekin_ptr.contents.buffers[2].contents.tx, (size1, size2)))
if self.mbs_solvekin_ptr.contents.user_buffer.contents.nx:
size = self.mbs_solvekin_ptr.contents.user_buffer.contents.index
nbOutput = self.mbs_solvekin_ptr.contents.user_buffer.contents.nx
user_out = np.copy(np.ctypeslib.as_array(self.mbs_solvekin_ptr.contents.user_buffer.contents.X[0], (nbOutput, size)))
for i in range(nbOutput):
name = bytes_to_str(self.mbs_solvekin_ptr.contents.user_buffer.contents.names[i])
self.results.outputs[name] = user_out[i, :]
return True
def set_user_fct_from_file(self, function_name, user_path, user_file):
"""
Load a user function from a file chosen by the user instead of the default one in the userfctR folder.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment