# MigFlow - Copyright (C) <2010-2018> # # # List of the contributors to the development of MigFlow: see AUTHORS file. # Description and complete License: see LICENSE file. # # This program (MigFlow) is free software: # you can redistribute it and/or modify it under the terms of the GNU Lesser General # Public License as published by the Free Software Foundation, either version # 3 of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program (see COPYING and COPYING.LESSER files). If not, # see . """Model for Immersed Granular Flow -- Particles user interface Contact: jonathan.lambrechts@uclouvain.be Webpage: www.migflow.be MigFlow computes immersed granular flows using an unresolved FEM-DEM model. The discrete phase is computed by updating iteratively the particle velocities until a set of velocities respecting the non-interpenetration contact law for the next time step is found """ from __future__ import division from . import mesh from . import VTK import shutil import numpy as np import os import sys from ctypes import * import signal signal.signal(signal.SIGINT, signal.SIG_DFL) dir_path = os.path.dirname(os.path.realpath(__file__)) lib2 = np.ctypeslib.load_library("libscontact2",dir_path) lib2f = np.ctypeslib.load_library("libscontact2f",dir_path) lib3 = np.ctypeslib.load_library("libscontact3",dir_path) is_64bits = sys.maxsize > 2**32 def _np2c(a,dtype=np.float64) : tmp = np.require(a,dtype,"C") r = c_void_p(tmp.ctypes.data) r.tmp = tmp return r class ParticleProblem : """Create the numerical structure containing all the physical particles that appear in the problem""" def _get_matrix(self, fName, ncol) : f = getattr(self._lib,"particleProblem"+fName) f.restype = c_void_p ptr = f(self._p) size = cast(ptr-(8 if is_64bits else 4), POINTER(c_size_t)).contents.value//8 buf = (size*c_double).from_address(ptr) return np.ctypeslib.as_array(buf).reshape([-1,ncol]) def _get_idx(self, fName) : f = getattr(self._lib,"particleProblem"+fName) f.restype = c_void_p ptr = f(self._p) size = cast(ptr-(8 if is_64bits else 4), POINTER(c_size_t)).contents.value//4 buf = (size*c_int).from_address(ptr) return np.ctypeslib.as_array(buf) def __init__(self, dim, friction_enabled=False) : """Build the particles contanier structure. Keyword arguments: dim -- dimension of the problem Raises: ValueError -- if dimension differs from 2 or 3 NameError -- if C builder cannot be found """ self._dim = dim self._friction_enabled = friction_enabled if dim == 2 : self._lib = lib2f if friction_enabled else lib2 self._coord_type = c_double*2 elif dim == 3 : self._lib = lib3 self._coord_type = c_double*3 else : raise ValueError("Dimension should be 2 or 3.") self._lib.particleProblemNew.restype = c_void_p self._p = c_void_p(self._lib.particleProblemNew()) if self._p == None : raise NameError("Cannot create particle problem.") def __del__(self): """Delete the particle solver structure.""" if(self._p is not None) : self._lib.particleProblemDelete(self._p) def volume(self): """Return the list of particle volumes.""" if self._dim == 2 : return np.pi * self._get_matrix("Particle", 2)[:, [0]]**2 else : return 4./3.*np.pi * self._get_matrix("Particle", 2)[:, [0]]**3 def r(self) : """Return the list of particle radii.""" return self._get_matrix("Particle", 2)[:, 0][:,None] def mass(self): """Return the list of particle masses.""" return self._get_matrix("Particle", 2)[:, 1][:,None] def position(self): """Return the list of particle centre position vectors.""" return self._get_matrix("Position", self._dim) def velocity(self): """Return the list of particle velocity vectors.""" return self._get_matrix("Velocity", self._dim) def omega(self): """Return the list of particle angular velocity. Only in 2D when friction is enabled. """ assert self._friction_enabled return self._get_matrix("Omega", 1) def disk_tag(self): return self._get_idx("DiskTag") def disk_material(self): return self._get_idx("DiskMaterial") def segments(self): return self._get_matrix("Segment",3*self._dim+(1 if self._friction_enabled else 0)) def segment_tag(self): return self._get_idx("SegmentTag") def segment_material(self): return self._get_idx("SegmentMaterial") def triangles(self): if self._dim == 3 : return self._get_matrix("Triangle",12) else : return np.ndarray([0,12]) def triangle_tag(self): if self._dim == 3 : return self._get_idx("TriangleTag") else : return np.array([],np.int32) def triangle_material(self): if self._dim == 3 : return self._get_idx("TriangleMaterial") else : return np.array([],np.int32) def set_boundary_velocity(self, tag, v) : self._lib.particleProblemGetTagId.restype = c_size_t tag = self._lib.particleProblemGetTagId(self._p, tag.encode()) disks = self._get_matrix("Disk",2*self._dim+1) diskTag = self._get_idx("DiskTag") disks[diskTag == tag, self._dim:2*self._dim] = v segments = self._get_matrix("Segment",3*self._dim) segmentTag = self._get_idx("SegmentTag") segments[segmentTag == tag, 2*self._dim:3*self._dim] = v if self._dim == 3 : triangles = self._get_matrix("Triangle",12) triangleTag = self._get_idx("TriangleTag") triangles[triangleTag == tag, 9:12] = v def set_tangent_boundary_velocity(self, tag, v): """ Sets the tangent velocity of the frictional boundary. Only in 2D when friction is enabled. Keyword arguments: tag -- Name of the boundary v -- velocity """ assert self._friction_enabled self._lib.particleProblemSetBoundaryTangentVelocity(self._p,tag.encode(),c_double(v)) def set_use_queue(self, use_queue = True): """ Sets the use of the queue in the resolution of the contacts.""" if use_queue: self._lib.particleProblemSetUseQueue(self._p,c_int(1)) else: self._lib.particleProblemSetUseQueue(self._p,c_int(0)) def set_friction_coefficient(self, mu, mat0="default",mat1="default") : """ Sets the friction coefficient between two materials. Only in 2D when friction is enabled. Keyword arguments: mu -- value of the friction coefficient mat0 -- first material mat1 -- second material """ assert self._friction_enabled self._lib.particleProblemSetFrictionCoefficient(self._p, c_double(mu), mat0.encode(),mat1.encode()) def iterate(self, dt, forces,tol=1e-8) : """Compute iteratively the set of velocities that respect the non-interpenetration constrain. Keyword arguments: dt -- numerical time step forces -- list of vectors containing the forces applied on the particles tol -- optional argument defining the interpenetration tolerance to stop the NLGS iterations of the NSCD """ self._get_matrix("ExternalForces",self._dim)[:] = forces self._lib.particleProblemIterate(self._p, c_double(np.min(self.r())), c_double(dt), c_double(tol), c_int(-1)) def write_vtk(self, odir, i, t) : """Write output files for post-visualization.""" v = self.velocity() if self._friction_enabled : omega = self.omega() else : omega = np.zeros((v.shape[0],1)) x = self.position() material = self._get_idx("ParticleMaterial").reshape(-1,1) if self._dim == 2 : v = np.insert(v,self._dim,0,axis=1) x = np.insert(x,self._dim,0,axis=1) data = [("Mass",self.mass()), ("Radius",self.r()),("Velocity",v),("Omega",omega),("Material",material)] nmat = self._lib.particleProblemNMaterial(self._p) self._lib.particleProblemGetMaterialTagName.restype = c_char_p tags = list([self._lib.particleProblemGetMaterialTagName(self._p,i) for i in range(nmat)]) VTK.write(odir+"/particles",i,t,None,x,data,vtktype="vtp",field_data=[(b"MaterialNames",VTK.string_array_encode(tags))]) xdisk = self._get_matrix("Disk",2*self._dim+(2 if self._friction_enabled else 1))[:,:self._dim] nd = xdisk.shape[0] xsegment = self.segments()[:,:self._dim*2].reshape([-1,self._dim]) ns = xsegment.shape[0]/2 xtriangles = self.triangles()[:,:self._dim*3].reshape([-1,self._dim]) nt = xtriangles.shape[0]/3 x = np.vstack([xdisk,xsegment,xtriangles]) if self._dim == 2 : x = np.insert(x,self._dim,0,axis=1) connectivity = np.arange(0,x.shape[0],dtype=np.int32) types = np.repeat(np.array([1,3,5],dtype=np.int32),[nd,ns,nt]) offsets = np.cumsum(np.repeat([1,2,3],[nd,ns,nt]),dtype=np.int32) tags = np.array(np.hstack([self.disk_tag(),self.segment_tag(),self.triangle_tag()])) materials = np.array(np.hstack([self.disk_material(),self.segment_material(),self.triangle_material()])) ntagname = self._lib.particleProblemNTag(self._p) self._lib.particleProblemGetTagName.restype = c_char_p tagnames = list([self._lib.particleProblemGetTagName(self._p,i) for i in range(ntagname)]) VTK.write(odir+"/boundaries",i,t,(types,connectivity,offsets),x,cell_data=[("Tag",tags.reshape([-1,1])),("Material",materials.reshape([-1,1]))],field_data=[(b"TagNames",VTK.string_array_encode(tagnames))]) self._lib.particleProblemContactN.restype = c_size_t fdata = [] for (ctype,name) in ((0,b"particle_particle"),(1,b"particle_disk"),(2,b"particle_segment"),(3,b"particle_triangle")) : n = self._lib.particleProblemContactN(self._p,c_int(ctype)) v = np.ndarray((n,2),dtype=np.float64,order="C") o = np.ndarray((n,2),dtype=np.uint64,order="C") self._lib.particleProblemContact(self._p,c_int(ctype),c_void_p(o.ctypes.data),c_void_p(v.ctypes.data)) fdata.append((name,v[:,[0]])) fdata.append((name+b"_t",v[:,[1]])) fdata.append((name+b"_idx",o)) x = np.array([[0.,0.,0.]]) VTK.write(odir+"/contacts",i,t,None,x,field_data=fdata,vtktype="vtp") VTK.write_multipart(odir+"/particle_problem",["particles_%05d.vtp"%i,"boundaries_%05d.vtu"%i,"contacts_%05d.vtp"%i],i,t) def read_vtk(self,dirname,i) : """Read an existing output file to set the particle data.""" x,_,d,cdata,fdata = VTK.read(dirname+"/particles_%05d.vtp"%i) matnames = VTK.string_array_decode(fdata["MaterialNames"]) matmap = {} for j,n in enumerate(matnames) : matmap[j] = self._lib.particleProblemGetMaterialTagId(self._p,n) partmat =np.vectorize(matmap.get)(d["Material"]) self._lib.particleProblemClearParticles(self._p) self._lib.particleProblemAddParticles(self._p,c_int(x.shape[0]), _np2c(x[:,:self._dim]), _np2c(d["Mass"]), _np2c(d["Radius"]), _np2c(d["Velocity"][:,:self._dim]),_np2c(d["Omega"]),_np2c(partmat,dtype=np.int32)) x,cells,d,cdata,fdata = VTK.read(dirname+"/boundaries_%05d.vtu"%i) tagnames = VTK.string_array_decode(fdata["TagNames"]) tagmap = {} for j,n in enumerate(tagnames) : tagmap[j] = self._lib.particleProblemGetTagId(self._p,n) self._lib.particleProblemClearBoundaries(self._p) offsets = np.hstack([[0],cells["offsets"]]) el = cells["connectivity"] tags = np.vectorize(tagmap.get)(cdata["Tag"]) materials = cdata["Material"] for idx in np.where(cells["types"] == 1)[0] : x0 = x[el[offsets[idx]],:self._dim] self._lib.particleProblemAddBoundaryDiskTagId(self._p, self._coord_type(*x0), c_double(0.), c_int(tags[idx,0]), c_int(materials[idx,0])) for idx in np.where(cells["types"] == 3)[0] : x0 = x[el[offsets[idx]],:self._dim] x1 = x[el[offsets[idx]+1],:self._dim] self._lib.particleProblemAddBoundarySegmentTagId(self._p, self._coord_type(*x0), self._coord_type(*x1), c_int(tags[idx,0]), c_int(materials[idx,0])) for idx in np.where(cells["types"] == 5)[0] : x0 = x[el[offsets[idx]],:self._dim] x1 = x[el[offsets[idx]+1],:self._dim] x2 = x[el[offsets[idx]+2],:self._dim] self._lib.particleProblemAddBoundaryTriangleTagId(self._p, self._coord_type(*x0), self._coord_type(*x1), self._coord_type(*x2), c_int(tags[idx,0]), c_int(materials[idx,0])) _,_,_,_,fdata = VTK.read(dirname+"/contacts_%05d.vtp"%i) ks = ("particle_particle","particle_disk","particle_segment","particle_triangle") v = np.vstack([fdata[k] for k in ks]) vt = np.vstack([fdata[k+"_t"] for k in ks]) v = np.hstack([v,vt]) o = np.vstack([fdata[k+"_idx"] for k in ks]) t = np.repeat([0,1,2,3],[fdata[k].shape[0] for k in ks]) self._lib.particleProblemSetContacts(self._p,c_int(t.shape[0]),_np2c(o,np.uint64),_np2c(v),_np2c(t,np.int32)) def add_boundary_disk(self, x0, r, tag, material="default") : self._lib.particleProblemAddBoundaryDisk.restype = c_size_t return self._lib.particleProblemAddBoundaryDisk(self._p, self._coord_type(*x0), c_double(r), tag.encode(),material.encode()) def add_boundary_segment(self, x0, x1, tag, material="default") : self._lib.particleProblemAddBoundarySegment.restype = c_size_t return self._lib.particleProblemAddBoundarySegment(self._p, self._coord_type(*x0), self._coord_type(*x1), tag.encode(),material.encode()) def add_boundary_triangle(self, x0, x1, x2, tag, material="default") : if self._dim != 3 : raise NameError("Triangle boundaries only available in 3D") self._lib.particleProblemAddBoundaryTriangle.restype = c_size_t return self._lib.particleProblemAddBoundaryTriangle(self._p, self._coord_type(*x0), self._coord_type(*x1), self._coord_type(*x2), tag.encode(),material.encode()) def add_particle(self, x, r, m, tag="default"): """Add a particle in the particle container. Keyword arguments: x -- tuple to set the centre particle position r -- particle radius m -- particle mass tag -- particle tag """ self._lib.particleProblemAddParticle(self._p, self._coord_type(*x), c_double(r), c_double(m), tag.encode()) def load_msh_boundaries(self, filename, tags, shift=None,material="default") : """Load the numerical domain and set the physical boundaries the particles cannot go through. Keyword arguments: filename -- name of the msh file tags -- tags of physical boundary defined in the msh file shift -- optional argument to shift the numerical domain material -- material of the boundary """ if shift == None : shift = [0]*self._dim m = mesh.Mesh(filename) self._addv = set() if self._dim == 2 : pnum = [m.getPhysicalNumber(1, i) for i in tags] for ent in m.entities : if ent.dimension != 1 or not [i for i in pnum if i in ent.physicals]: continue tag = 0 if not ent.physicals else ent.physicals[0] stag = m.getPhysicalName(ent.dimension, tag) or str(tag) for i, el in enumerate(ent.elements) : for v in el.vertices : if v[3] in self._addv : continue self._addv.add(v[3]) self.add_boundary_disk((v[0] + shift[0], v[1] + shift[1]), 0.000, stag,material) v0 = el.vertices[1] v1 = el.vertices[0] self.add_boundary_segment((v0[0] + shift[0], v0[1] + shift[1]), (v1[0] + shift[0], v1[1] + shift[1]), stag,material) else : self._adde = set() pnum = [m.getPhysicalNumber(2, i) for i in tags] for ent in m.entities : if ent.dimension !=2 or not [i for i in pnum if i in ent.physicals]: continue tag = 0 if not ent.physicals else ent.physicals[0] stag = m.getPhysicalName(ent.dimension, tag) or str(tag) for i, el in enumerate(ent.elements) : for j in range(3): v0 = el.vertices[j] if not v0[3] in self._addv : self._addv.add(v0[3]) self.add_boundary_disk((v0[0]+shift[0],v0[1]+shift[1],v0[2]+shift[2]),0.,stag,material) v1 = el.vertices[(j+1)%3] if (not (v0[3],v1[3]) in self._adde) and (not (v1[3],v0[3]) in self._adde) : self._adde.add((v0[3],v1[3])) self.add_boundary_segment( (v0[0]+shift[0],v0[1]+shift[1],v0[2]+shift[2]), (v1[0]+shift[0],v1[1]+shift[1],v1[2]+shift[2]), stag,material) v0 = el.vertices[1] v1 = el.vertices[0] v2 = el.vertices[2] self.add_boundary_triangle( (v0[0]+shift[0],v0[1]+shift[1],v0[2]+shift[2]), (v1[0]+shift[0],v1[1]+shift[1],v1[2]+shift[2]), (v2[0]+shift[0],v2[1]+shift[1],v2[2]+shift[2]), stag,material)