Coverage for src/rok4/layer.py: 85%
132 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-01 15:35 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-10-01 15:35 +0000
1"""Provide classes to use a layer.
3The module contains the following classe:
5- `Layer` - Descriptor to broadcast pyramids' data
6"""
8# -- IMPORTS --
10# standard library
11import json
12import os
13import re
14from json.decoder import JSONDecodeError
15from typing import Dict, List, Tuple
17# package
18from rok4.enums import PyramidType
19from rok4.exceptions import FormatError, MissingAttributeError
20from rok4.pyramid import Pyramid
21from rok4.storage import get_data_str, get_infos_from_path, put_data_str
22from rok4.utils import reproject_bbox
25class Layer:
26 """A data layer, raster or vector
28 Attributes:
29 __name (str): layer's technical name
30 __pyramids (Dict[str, Union[rok4.pyramid.Pyramid,str,str]]): used pyramids
31 __format (str): pyramid's list path
32 __tms (rok4.tile_matrix_set.TileMatrixSet): Used grid
33 __keywords (List[str]): Keywords
34 __levels (Dict[str, rok4.pyramid.Level]): Used pyramids' levels
35 __best_level (rok4.pyramid.Level): Used pyramids best level
36 __resampling (str): Interpolation to use fot resampling
37 __bbox (Tuple[float, float, float, float]): data bounding box, TMS coordinates system
38 __geobbox (Tuple[float, float, float, float]): data bounding box, EPSG:4326
39 """
41 @classmethod
42 def from_descriptor(cls, descriptor: str) -> "Layer":
43 """Create a layer from its descriptor
45 Args:
46 descriptor (str): layer's descriptor path
48 Raises:
49 FormatError: Provided path is not a well formed JSON
50 MissingAttributeError: Attribute is missing in the content
51 StorageError: Storage read issue (layer descriptor)
52 MissingEnvironmentError: Missing object storage informations
54 Returns:
55 Layer: a Layer instance
56 """
57 try:
58 data = json.loads(get_data_str(descriptor))
60 except JSONDecodeError as e:
61 raise FormatError("JSON", descriptor, e)
63 layer = cls()
65 storage_type, path, root, base_name = get_infos_from_path(descriptor)
66 layer.__name = base_name[:-5] # on supprime l'extension.json
68 try:
69 # Attributs communs
70 layer.__title = data["title"]
71 layer.__abstract = data["abstract"]
72 layer.__load_pyramids(data["pyramids"])
74 # Paramètres optionnels
75 if "keywords" in data:
76 for k in data["keywords"]:
77 layer.__keywords.append(k)
79 if layer.type == PyramidType.RASTER:
80 if "resampling" in data:
81 layer.__resampling = data["resampling"]
83 if "styles" in data:
84 layer.__styles = data["styles"]
85 else:
86 layer.__styles = ["normal"]
88 # Les bbox, native et géographique
89 if "bbox" in data:
90 layer.__geobbox = (
91 data["bbox"]["south"],
92 data["bbox"]["west"],
93 data["bbox"]["north"],
94 data["bbox"]["east"],
95 )
96 layer.__bbox = reproject_bbox(layer.__geobbox, "EPSG:4326", layer.__tms.srs, 5)
97 # On force l'emprise de la couche, on recalcule donc les tuiles limites correspondantes pour chaque niveau
98 for level in layer.__levels.values():
99 level.set_limits_from_bbox(layer.__bbox)
100 else:
101 layer.__bbox = layer.__best_level.bbox
102 layer.__geobbox = reproject_bbox(layer.__bbox, layer.__tms.srs, "EPSG:4326", 5)
104 except KeyError as e:
105 raise MissingAttributeError(descriptor, e)
107 return layer
109 @classmethod
110 def from_parameters(cls, pyramids: List[Dict[str, str]], name: str, **kwargs) -> "Layer":
111 """Create a default layer from parameters
113 Args:
114 pyramids (List[Dict[str, str]]): pyramids to use and extrem levels, bottom and top
115 name (str): layer's technical name
116 **title (str): Layer's title (will be equal to name if not provided)
117 **abstract (str): Layer's abstract (will be equal to name if not provided)
118 **styles (List[str]): Styles identifier to authorized for the layer
119 **resampling (str): Interpolation to use for resampling
121 Raises:
122 Exception: name contains forbidden characters or used pyramids do not shared same parameters (format, tms...)
124 Returns:
125 Layer: a Layer instance
126 """
128 layer = cls()
130 # Informations obligatoires
131 if not re.match("^[A-Za-z0-9_-]*$", name):
132 raise Exception(
133 f"Layer's name have to contain only letters, number, hyphen and underscore, to be URL and storage compliant ({name})"
134 )
136 layer.__name = name
137 layer.__load_pyramids(pyramids)
139 # Les bbox, native et géographique
140 layer.__bbox = layer.__best_level.bbox
141 layer.__geobbox = reproject_bbox(layer.__bbox, layer.__tms.srs, "EPSG:4326", 5)
143 # Informations calculées
144 layer.__keywords.append(layer.type.name)
145 layer.__keywords.append(layer.__name)
147 # Informations optionnelles
148 if "title" in kwargs and kwargs["title"] is not None:
149 layer.__title = kwargs["title"]
150 else:
151 layer.__title = name
153 if "abstract" in kwargs and kwargs["abstract"] is not None:
154 layer.__abstract = kwargs["abstract"]
155 else:
156 layer.__abstract = name
158 if layer.type == PyramidType.RASTER:
159 if "styles" in kwargs and kwargs["styles"] is not None and len(kwargs["styles"]) > 0:
160 layer.__styles = kwargs["styles"]
161 else:
162 layer.__styles = ["normal"]
164 if "resampling" in kwargs and kwargs["resampling"] is not None:
165 layer.__resampling = kwargs["resampling"]
167 return layer
169 def __init__(self) -> None:
170 self.__format = None
171 self.__tms = None
172 self.__best_level = None
173 self.__levels = {}
174 self.__keywords = []
175 self.__pyramids = []
177 def __load_pyramids(self, pyramids: List[Dict[str, str]]) -> None:
178 """Load and check pyramids
180 Args:
181 pyramids (List[Dict[str, str]]): List of descriptors' paths and optionnaly top and bottom levels
183 Raises:
184 Exception: Pyramids' do not all own the same format
185 Exception: Pyramids' do not all own the same TMS
186 Exception: Pyramids' do not all own the same channels number
187 Exception: Overlapping in usage pyramids' levels
188 """
190 # Toutes les pyramides doivent avoir les même caractéristiques
191 channels = None
192 for p in pyramids:
193 pyramid = Pyramid.from_descriptor(p["path"])
194 bottom_level = p.get("bottom_level", None)
195 top_level = p.get("top_level", None)
197 if bottom_level is None:
198 bottom_level = pyramid.bottom_level.id
200 if top_level is None:
201 top_level = pyramid.top_level.id
203 if self.__format is not None and self.__format != pyramid.format:
204 raise Exception(
205 f"Used pyramids have to own the same format : {self.__format} != {pyramid.format}"
206 )
207 else:
208 self.__format = pyramid.format
210 if self.__tms is not None and self.__tms.id != pyramid.tms.id:
211 raise Exception(
212 f"Used pyramids have to use the same TMS : {self.__tms.id} != {pyramid.tms.id}"
213 )
214 else:
215 self.__tms = pyramid.tms
217 if self.type == PyramidType.RASTER:
218 if channels is not None and channels != pyramid.raster_specifications["channels"]:
219 raise Exception(
220 f"Used RASTER pyramids have to own the same number of channels : {channels} != {pyramid.raster_specifications['channels']}"
221 )
222 else:
223 channels = pyramid.raster_specifications["channels"]
224 self.__resampling = pyramid.raster_specifications["interpolation"]
226 levels = pyramid.get_levels(bottom_level, top_level)
227 for level in levels:
228 if level.id in self.__levels:
229 raise Exception(f"Level {level.id} is present in two used pyramids")
230 self.__levels[level.id] = level
232 self.__pyramids.append(
233 {"pyramid": pyramid, "bottom_level": bottom_level, "top_level": top_level}
234 )
236 self.__best_level = sorted(self.__levels.values(), key=lambda level: level.resolution)[0]
238 def __str__(self) -> str:
239 return f"{self.type.name} layer '{self.__name}'"
241 @property
242 def serializable(self) -> Dict:
243 """Get the dict version of the layer object, descriptor compliant
245 Returns:
246 Dict: descriptor structured object description
247 """
248 serialization = {
249 "title": self.__title,
250 "abstract": self.__abstract,
251 "keywords": self.__keywords,
252 "wmts": {"authorized": True},
253 "tms": {"authorized": True},
254 "bbox": {
255 "south": self.__geobbox[0],
256 "west": self.__geobbox[1],
257 "north": self.__geobbox[2],
258 "east": self.__geobbox[3],
259 },
260 "pyramids": [],
261 }
263 for p in self.__pyramids:
264 serialization["pyramids"].append(
265 {
266 "bottom_level": p["bottom_level"],
267 "top_level": p["top_level"],
268 "path": p["pyramid"].descriptor,
269 }
270 )
272 if self.type == PyramidType.RASTER:
273 serialization["wms"] = {
274 "authorized": True,
275 "crs": ["CRS:84", "IGNF:WGS84G", "EPSG:3857", "EPSG:4258", "EPSG:4326"],
276 }
278 if self.__tms.srs.upper() not in serialization["wms"]["crs"]:
279 serialization["wms"]["crs"].append(self.__tms.srs.upper())
281 serialization["styles"] = self.__styles
282 serialization["resampling"] = self.__resampling
284 return serialization
286 def write_descriptor(self, directory: str = None) -> None:
287 """Print layer's descriptor as JSON
289 Args:
290 directory (str, optional): Directory (file or object) where to print the layer's descriptor, called <layer's name>.json. Defaults to None, JSON is printed to standard output.
291 """
292 content = json.dumps(self.serializable)
294 if directory is None:
295 print(content)
296 else:
297 put_data_str(content, os.path.join(directory, f"{self.__name}.json"))
299 @property
300 def type(self) -> PyramidType:
301 if self.__format == "TIFF_PBF_MVT":
302 return PyramidType.VECTOR
303 else:
304 return PyramidType.RASTER
306 @property
307 def bbox(self) -> Tuple[float, float, float, float]:
308 return self.__bbox
310 @property
311 def geobbox(self) -> Tuple[float, float, float, float]:
312 return self.__geobbox