Commit 5d2f46d7 authored by Jonathan Lambrechts's avatar Jonathan Lambrechts
Browse files

pycodestyle

parent f4b1d3d1
Pipeline #8399 passed with stages
in 2 minutes and 38 seconds
......@@ -35,39 +35,41 @@ import platform
import ctypes as c
def log(txt, title=False) :
def log(txt, title=False):
decorator = " * " if title else ""
print(decorator+txt+decorator)
class ProgressLog :
class ProgressLog:
def __init__(self, msg, title=False) :
def __init__(self, msg, title=False):
self._tic = time.time()
self._last_msg = ""
log(msg, title)
if sys.stdout.isatty() :
if sys.stdout.isatty():
print("")
def log(self, msg) :
if sys.stdout.isatty() :
print("\033[F\033[K",end="") # Cursor up one line and clear
print(msg + " (%.1fs)"%(time.time()-self._tic))
def log(self, msg):
if sys.stdout.isatty():
print("\033[F\033[K", end="") # Cursor up one line and clear
print(msg + " (%.1fs)" % (time.time()-self._tic))
self._last_msg = msg
def end(self) :
if not sys.stdout.isatty() :
def end(self):
if not sys.stdout.isatty():
print(self._last_msg)
_transform_cache = {}
def ensure_valid_points(x, pfrom, pto):
def project_points(x, pfrom, pto):
x = np.asarray(x)[:, :2]
if not pfrom.IsSame(pto):
trans = _transform_cache.get((pfrom,pto),None)
if trans is None :
trans = _transform_cache.get((pfrom, pto), None)
if trans is None:
trans = osr.CoordinateTransformation(pfrom, pto)
_transform_cache[(pfrom,pto)] = trans
_transform_cache[(pfrom, pto)] = trans
x = trans.TransformPoints(x)
x = np.asarray(x)[:, :2]
return x
......@@ -83,6 +85,7 @@ else:
lib = c.CDLL(libpath)
def np2c(a, dtype=np.float64):
tmp = np.require(a, dtype, "C")
r = c.c_void_p(tmp.ctypes.data)
......
......@@ -41,11 +41,16 @@ class Distance:
"""
_tools.log("Create distance field", True)
points = []
progress = _tools.ProgressLog("Sampling features for distance computation")
for icurve,curve in enumerate(_tools.chain(domain._curves, domain._interior_curves)):
msg = "Sampling features for distance computation"
progress = _tools.ProgressLog(msg)
all_curves_iter = _tools.chain(domain._curves, domain._interior_curves)
def size(x, proj):
return _tools.np.full([x.shape[0]], sampling)
for icurve, curve in enumerate(all_curves_iter):
if (tags is None) or (curve.tag in tags):
points.append(_curve_sample(curve, lambda x,proj : _tools.np.full([x.shape[0]],sampling),
None))
points.append(_curve_sample(curve, size, None))
progress.log("{} features sampled".format(icurve+1))
for point in _tools.chain(domain._interior_points):
if (tags is None) or (point.tag in tags):
......@@ -56,7 +61,8 @@ class Distance:
self._tree = _tools.cKDTree(points)
self._projection = domain._projection
def __call__(self, x: _tools.np.ndarray, projection: _tools.osr.SpatialReference
def __call__(self, x: _tools.np.ndarray,
projection: _tools.osr.SpatialReference
) -> _tools.np.ndarray:
"""Computes the distance between each point of x and the curves.
......@@ -82,16 +88,19 @@ class Raster:
Args:
filename: A geotiff file or any other raster supported by gdal.
"""
_tools.log("Create field from raster file \"{}\"".format(filename), True)
msg = "Create field from raster file \"{}\"".format(filename)
_tools.log(msg, True)
src_ds = _tools.gdal.Open(filename)
self._geo_matrix = src_ds.GetGeoTransform()
self._data = src_ds.GetRasterBand(1).ReadAsArray()
self._projection = _tools.osr.SpatialReference()
self._projection.ImportFromWkt(src_ds.GetProjection())
if int(_tools.gdal.__version__.split(".")[0]) >= 3 :
self._projection.SetAxisMappingStrategy(_tools.osr.OAMS_TRADITIONAL_GIS_ORDER)
if int(_tools.gdal.__version__.split(".")[0]) >= 3:
order = _tools.osr.OAMS_TRADITIONAL_GIS_ORDER
self._projection.SetAxisMappingStrategy(order)
def __call__(self, x: _tools.np.ndarray, projection: _tools.osr.SpatialReference
def __call__(self, x: _tools.np.ndarray,
projection: _tools.osr.SpatialReference
) -> _tools.np.ndarray:
"""Evaluates the field value on each point of x.
......@@ -101,9 +110,9 @@ class Raster:
Returns:
The field value on points x. [n]
"""
x = _tools.ensure_valid_points(x, projection, self._projection)
x = _tools.project_points(x, projection, self._projection)
gm = self._geo_matrix
lon,lat = x[:,0], x[:,1]
lon, lat = x[:, 0], x[:, 1]
det = gm[5]*gm[1]-gm[2]*gm[4]
pixx = ((lon-gm[0])*gm[5]-(lat-gm[3])*gm[2])/det
pixy = ((lat-gm[3])*gm[1]-(lon-gm[0])*gm[4])/det
......
......@@ -22,7 +22,8 @@ from . import _tools
__all__ = ["Domain", "CurveType", "coarsen_boundaries"]
MeshSizeCallback = _tools.Callable[[_tools.np.ndarray, _tools.osr.SpatialReference],
MeshSizeCallback = _tools.Callable[[_tools.np.ndarray,
_tools.osr.SpatialReference],
_tools.np.ndarray]
......@@ -60,7 +61,7 @@ def _generate_unique_points(x):
cid += 1
xo = _tools.np.ndarray([cid, 2])
xo[unique_id, :] = x
breakpoints = _tools.np.zeros([xo.shape[0]],dtype=bool)
breakpoints = _tools.np.zeros([xo.shape[0]], dtype=bool)
breakpoints[:nbreakpoints] = True
return xo, unique_id, breakpoints
......@@ -101,11 +102,11 @@ class Domain:
allpoints = _tools.np.row_stack(list(_tools.chain(
(curve.points for curve in curvesiter),
(point.x for point in self._interior_points))))
self._points, unique_id, breakpoints = _generate_unique_points(
self._points, unique_id, breakpts = _generate_unique_points(
allpoints)
# assign points id in curves
cid = 0
touched_interior = _tools.np.zeros((self._points.shape[0],),bool)
touched_interior = _tools.np.zeros((self._points.shape[0],), bool)
for curve in _tools.chain(self._curves, self._interior_curves):
n = curve.points.shape[0]
curve.pointsid = unique_id[cid:cid+n]
......@@ -116,14 +117,14 @@ class Domain:
point.pointid = unique_id[cid]
touched_interior[point.pointid] = True
cid += 1
breakpoints = _tools.np.logical_and(touched_interior,breakpoints)
breakpts = _tools.np.logical_and(touched_interior, breakpts)
# break curves on repeated points
def split_curves(curves, breakpoints, loops=[]):
def split_curves(curves, breakpts, loops=[]):
newcurves = []
newmap = []
for curve in curves:
breaks = _tools.np.where(breakpoints[curve.pointsid[1:-1]])[0]
breaks = _tools.np.where(breakpts[curve.pointsid[1:-1]])[0]
if len(breaks) == 0:
newcurves.append(curve)
newmap.append([len(newcurves)-1])
......@@ -137,15 +138,15 @@ class Domain:
newcurves.append(ncurve)
nc.append(len(newcurves)-1)
newmap.append(nc)
for iloop, loop in enumerate(loops) :
for iloop, loop in enumerate(loops):
newloop = []
for oi, ori in loop:
if ori :
for j in newmap[oi] :
newloop.append((j,ori))
else :
for j in reversed(newmap[oi]) :
newloop.append((j,ori))
if ori:
for j in newmap[oi]:
newloop.append((j, ori))
else:
for j in reversed(newmap[oi]):
newloop.append((j, ori))
loops[iloop] = newloop
return newcurves
......@@ -183,11 +184,11 @@ class Domain:
assert(i != -1)
except AssertionError:
raise ValueError("Invalid topology")
self._curves = split_curves(self._curves, breakpoints, self._curveloops)
self._interior_curves = split_curves(self._interior_curves, breakpoints)
self._curves = split_curves(self._curves, breakpts, self._curveloops)
self._interior_curves = split_curves(self._interior_curves, breakpts)
loopbboxarea = _tools.np.zeros([len(self._curveloops)])
for i, loop in enumerate(self._curveloops):
lpts = _tools.np.row_stack([self._curves[j].points for j, o in loop])
for i, l in enumerate(self._curveloops):
lpts = _tools.np.row_stack([self._curves[j].points for j, o in l])
bbox = _tools.np.max(lpts, axis=0)-_tools.np.min(lpts, axis=0)
loopbboxarea[i] = bbox[0]*bbox[1]
self._curveloops.insert(
......@@ -212,10 +213,11 @@ class Domain:
def _add_shapefile(self, filename, physical_name_field,
interior, points, curve_type):
progress = _tools.ProgressLog("Import features from \"{}\"".format(filename), True)
if filename[-5:] == ".gpkg" :
progress = _tools.ProgressLog(
"Import features from \"{}\"".format(filename), True)
if filename[-5:] == ".gpkg":
driver = _tools.ogr.GetDriverByName('GPKG')
else :
else:
driver = _tools.ogr.GetDriverByName('ESRI Shapefile')
data = driver.Open(filename, 0)
layer = data.GetLayer()
......@@ -232,7 +234,8 @@ class Domain:
"' not found in shapefile")
layerproj = layer.GetSpatialRef()
for i in layer:
phys = i.GetField(physfield) if not (physfield is None) else "boundary"
phys = (i.GetField(physfield)
if not (physfield is None) else "boundary")
self._add_geometry(i.geometry(), phys, layerproj, curve_type,
interior, points)
progress.log("{} features imported".format(count))
......@@ -248,7 +251,7 @@ class Domain:
physical_tag: the points physical tag
projection: the points coordinate system
"""
x = _tools.ensure_valid_points(points, projection, self._projection)
x = _tools.project_points(points, projection, self._projection)
points = list(_Point(p, physical_tag) for p in x)
self._interior_points += points
......@@ -263,7 +266,7 @@ class Domain:
projection: the points coordinate system
curve_type: curve interpolation
"""
points = _tools.ensure_valid_points(points, projection, self._projection)
points = _tools.project_points(points, projection, self._projection)
curve = _Curve(points, physical_tag, curve_type)
self._curves.append(curve)
......@@ -279,7 +282,7 @@ class Domain:
projection: the points coordinate system
curve_type: curve interpolation
"""
points = _tools.ensure_valid_points(points, projection, self._projection)
points = _tools.project_points(points, projection, self._projection)
curve = _Curve(points, physical_tag, curve_type)
self._interior_curves.append(curve)
......@@ -327,12 +330,11 @@ class Domain:
curve_type)
from .gmsh import _curve_sample
def coarsen_boundaries(domain: Domain, x0: _tools.Tuple[float, float],
x0projection: _tools.osr.SpatialReference,
x0_projection: _tools.osr.SpatialReference,
mesh_size: MeshSizeCallback) -> Domain:
""" Creates a new Domain with the same projection and coarsened
boundaries.
......@@ -340,13 +342,13 @@ def coarsen_boundaries(domain: Domain, x0: _tools.Tuple[float, float],
Args:
domain: the domain to coarsen
x0: the coordinates of one point inside the domain.
x0projection: the coordinates system of x0.
x0_projection: the coordinates system of x0.
mesh_size: a function returning the desired mesh element size for given
coordinates
"""
_tools.log("Coarsen boundaries", True)
x0 = _tools.ensure_valid_points(_tools.np.array([x0]), x0projection,
domain._projection)[0]
x0 = _tools.project_points(_tools.np.array([x0]), x0_projection,
domain._projection)[0]
sampled = []
tags = []
maxtag = 1
......@@ -362,7 +364,8 @@ def coarsen_boundaries(domain: Domain, x0: _tools.Tuple[float, float],
if curve.tag not in str2tag:
str2tag[curve.tag] = maxtag
maxtag += 1
tags.append(_tools.np.full(cs.shape[0], str2tag[curve.tag], dtype=_tools.np.int32))
tags.append(_tools.np.full(cs.shape[0], str2tag[curve.tag],
dtype=_tools.np.int32))
progress.log("{} curves sampled".format(icurve+1))
progress.end()
x = _tools.np.vstack(sampled)
......@@ -398,16 +401,22 @@ def coarsen_boundaries(domain: Domain, x0: _tools.Tuple[float, float],
p_l = _tools.c.POINTER(_tools.c.c_int)()
ms = mesh_size(x, domain._projection)
_tools.lib.gen_boundaries_from_points(
_tools.c.c_int(x.shape[0]), _tools.np2c(x), _tools.np2c(tags, _tools.np.int32),
_tools.c.c_int(tri.simplices.shape[0]), _tools.np2c(tri.simplices, _tools.np.int32),
_tools.c.c_int(x.shape[0]), _tools.np2c(x),
_tools.np2c(tags, _tools.np.int32),
_tools.c.c_int(tri.simplices.shape[0]),
_tools.np2c(tri.simplices, _tools.np.int32),
_tools.c.c_int(first), _tools.np2c(ms),
_tools.c.byref(p_xo), _tools.c.byref(n_xo), _tools.c.byref(p_l), _tools.c.byref(n_l))
_tools.c.byref(p_xo), _tools.c.byref(n_xo),
_tools.c.byref(p_l), _tools.c.byref(n_l))
#xptr = _tools.c.POINTER(n_xo.value*2*_tools.c.c_double)
#xbuf = _tools.c.cast(p_xo, xptr).contents
xbuf = _tools.c.cast(p_xo, _tools.c.POINTER(n_xo.value*2*_tools.c.c_double)).contents
xo = _tools.np.ctypeslib.frombuffer(xbuf, dtype=_tools.np.float64)
xo = xo.reshape([-1, 2]).copy()
_tools.lib.libcfree(p_xo)
linesbuf = _tools.c.cast(p_l, _tools.c.POINTER(n_l.value*_tools.c.c_int))
lines = _tools.np.ctypeslib.frombuffer(linesbuf.contents, dtype=_tools.np.int32)
lines = _tools.np.ctypeslib.frombuffer(linesbuf.contents,
dtype=_tools.np.int32)
odomain = Domain(domain._projection)
breaks = _tools.np.where(lines == -1)[0]
tagi2str = dict((i, s) for (s, i) in str2tag.items())
......@@ -421,4 +430,3 @@ def coarsen_boundaries(domain: Domain, x0: _tools.Tuple[float, float],
curve_type=CurveType.POLYLINE)
_tools.lib.libcfree(p_l)
return odomain
......@@ -26,7 +26,7 @@ __all__ = ["mesh", "convert_to_gis"]
gmsh.initialize()
gmsh.option.setNumber("General.Terminal", 1)
gmsh.option.setNumber("General.Verbosity",2)
gmsh.option.setNumber("General.Verbosity", 2)
def _gmsh_curve_geo(curve_type: _geometry.CurveType, pointsid):
......@@ -45,26 +45,26 @@ def _gmsh_curve_geo(curve_type: _geometry.CurveType, pointsid):
def _curve_sample_gmsh_tag(tag, lc, projection):
bounds = gmsh.model.getParametrizationBounds(1,tag)
xi = _tools.np.linspace(bounds[0][0],bounds[1][0],5)
bounds = gmsh.model.getParametrizationBounds(1, tag)
xi = _tools.np.linspace(bounds[0][0], bounds[1][0], 5)
count = 0
x = gmsh.model.getValue(1,tag,xi).reshape([-1,3])
size = lc(x,projection)
while True :
x = gmsh.model.getValue(1, tag, xi).reshape([-1, 3])
size = lc(x, projection)
while True:
count += 1
dist = _tools.np.linalg.norm(x[1:,:]-x[:-1,:],axis=1)
target = _tools.np.minimum(size[1:],size[:-1])
if not _tools.np.any(target<dist) :
dist = _tools.np.linalg.norm(x[1:, :]-x[:-1, :], axis=1)
target = _tools.np.minimum(size[1:], size[:-1])
if not _tools.np.any(target < dist):
break
exi = (xi[1:][target<dist]+xi[:-1][target<dist])/2
ex = gmsh.model.getValue(1,tag,exi).reshape([-1,3])
esize = lc(ex,projection)
xi = _tools.np.hstack([xi,exi])
exi = (xi[1:][target < dist]+xi[:-1][target < dist])/2
ex = gmsh.model.getValue(1, tag, exi).reshape([-1, 3])
esize = lc(ex, projection)
xi = _tools.np.hstack([xi, exi])
s = _tools.np.argsort(xi)
xi = xi[s]
x = _tools.np.row_stack([x,ex])[s,:]
size = _tools.np.hstack([size,esize])[s]
return x[:,:2], xi, size
x = _tools.np.row_stack([x, ex])[s, :]
size = _tools.np.hstack([size, esize])[s]
return x[:, :2], xi, size
def _curve_sample(curve, lc, projection):
......@@ -82,7 +82,8 @@ def _curve_sample(curve, lc, projection):
return r
def mesh(domain: _geometry.Domain, filename: str, mesh_size: _geometry.MeshSizeCallback,
def mesh(domain: _geometry.Domain, filename: str,
mesh_size: _geometry.MeshSizeCallback,
version: float = 4.0, intermediate_file_name: str = None) -> None:
""" Calls gmsh to generate a mesh from a geometry and a mesh size callback
......@@ -146,9 +147,11 @@ def mesh(domain: _geometry.Domain, filename: str, mesh_size: _geometry.MeshSizeC
gmsh.model.setPhysicalName(2, stag, "domain")
# 1D mesh
progress = _tools.ProgressLog("Sample curves for mesh size")
bounds = gmsh.model.getParametrizationBounds(1,2)
for icurve,(dim, tag) in enumerate(gmsh.model.getEntities(1)) :
_, xi, size = _curve_sample_gmsh_tag(tag, lambda x,p : mesh_size(x,p)/2, domain._projection)
bounds = gmsh.model.getParametrizationBounds(1, 2)
for icurve, (dim, tag) in enumerate(gmsh.model.getEntities(1)):
_, xi, size = _curve_sample_gmsh_tag(tag,
lambda x, p: mesh_size(x, p)/2,
domain._projection)
gmsh.model.mesh.setSizeAtParametricPoints(dim, tag, xi, size*2)
progress.log("{} curve sampled".format(icurve+1))
progress.end()
......@@ -163,7 +166,8 @@ def mesh(domain: _geometry.Domain, filename: str, mesh_size: _geometry.MeshSizeC
_tools.log("Generate initial 2D mesh")
_, x, u = gmsh.model.mesh.getNodes()
x = x.reshape([-1, 3])
initlc = _tools.np.linalg.norm(_tools.np.max(x, axis=0)-_tools.np.min(x, axis=0))/100
domain_size = _tools.np.max(x, axis=0)-_tools.np.min(x, axis=0)
initlc = _tools.np.linalg.norm(domain_size)/100
gmsh.option.setNumber("Mesh.CharacteristicLengthFromParametricPoints", 0)
gmsh.option.setNumber("Mesh.CharacteristicLengthMin", initlc)
gmsh.option.setNumber("Mesh.CharacteristicLengthMax", initlc)
......@@ -173,7 +177,7 @@ def mesh(domain: _geometry.Domain, filename: str, mesh_size: _geometry.MeshSizeC
bg_field = gmsh.model.mesh.field.add("PostView")
gmsh.model.mesh.field.setAsBackgroundMesh(bg_field)
for i in range(nadapt):
_tools.log("Generate refined 2D mesh (pass {}/{})".format(i+1,nadapt))
_tools.log("Generate refined 2D mesh (pass {}/{})".format(i+1, nadapt))
node_tags, node_x, _ = gmsh.model.mesh.getNodes(-1, -1)
node_x = node_x.reshape([-1, 3])
nodes_map = dict({tag: i for i, tag in enumerate(node_tags)})
......@@ -185,9 +189,9 @@ def mesh(domain: _geometry.Domain, filename: str, mesh_size: _geometry.MeshSizeC
enode = list(nodes_map[t] for t in enode)
enode = _tools.np.array(enode).reshape([-1, nn])
xnode = node_x[enode, :].swapaxes(1, 2).reshape([-1, nn*3])
data = _tools.np.column_stack([xnode, node_lc[enode]]).reshape([-1])
data = _tools.np.column_stack([xnode, node_lc[enode]])
gmsh.view.addListData(sf_view, "ST"if etype == 2 else "SQ",
enode.shape[0], data)
enode.shape[0], data.reshape([-1]))
gmsh.model.mesh.field.setNumber(bg_field, "ViewTag", sf_view)
if intermediate_file_name is not None:
if intermediate_file_name == "-":
......@@ -198,12 +202,13 @@ def mesh(domain: _geometry.Domain, filename: str, mesh_size: _geometry.MeshSizeC
gmsh.model.mesh.generate(2)
gmsh.view.remove(sf_view)
gmsh.model.mesh.field.remove(bg_field)
_tools.log("Write \"{}\" (msh version {})".format(filename,version))
_tools.log("Write \"{}\" (msh version {})".format(filename, version))
gmsh.option.setNumber("Mesh.MshFileVersion", version)
gmsh.write(filename)
def convert_to_gis(input_filename: str, projection: _tools.osr.SpatialReference,
def convert_to_gis(input_filename: str,
projection: _tools.osr.SpatialReference,
output_filename: str) -> None:
""" Converts a triangular gmsh mesh into shapefile or geopackage.
......@@ -216,7 +221,7 @@ def convert_to_gis(input_filename: str, projection: _tools.osr.SpatialReference,
output_filename : shape file (.shp) or geopackage (.gpkg) file
"""
_tools.log("Convert \"{}\" into \"{}\"".format(input_filename,
output_filename), True)
output_filename), True)
gmsh.model.add(str(_tools.uuid.uuid4()))
gmsh.open(input_filename)
if output_filename.endswith(".shp"):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment