Coverage for src/rok4/layer.py: 85%

132 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-01 15:35 +0000

1"""Provide classes to use a layer. 

2 

3The module contains the following classe: 

4 

5- `Layer` - Descriptor to broadcast pyramids' data 

6""" 

7 

8# -- IMPORTS -- 

9 

10# standard library 

11import json 

12import os 

13import re 

14from json.decoder import JSONDecodeError 

15from typing import Dict, List, Tuple 

16 

17# package 

18from rok4.enums import PyramidType 

19from rok4.exceptions import FormatError, MissingAttributeError 

20from rok4.pyramid import Pyramid 

21from rok4.storage import get_data_str, get_infos_from_path, put_data_str 

22from rok4.utils import reproject_bbox 

23 

24 

25class Layer: 

26 """A data layer, raster or vector 

27 

28 Attributes: 

29 __name (str): layer's technical name 

30 __pyramids (Dict[str, Union[rok4.pyramid.Pyramid,str,str]]): used pyramids 

31 __format (str): pyramid's list path 

32 __tms (rok4.tile_matrix_set.TileMatrixSet): Used grid 

33 __keywords (List[str]): Keywords 

34 __levels (Dict[str, rok4.pyramid.Level]): Used pyramids' levels 

35 __best_level (rok4.pyramid.Level): Used pyramids best level 

36 __resampling (str): Interpolation to use fot resampling 

37 __bbox (Tuple[float, float, float, float]): data bounding box, TMS coordinates system 

38 __geobbox (Tuple[float, float, float, float]): data bounding box, EPSG:4326 

39 """ 

40 

41 @classmethod 

42 def from_descriptor(cls, descriptor: str) -> "Layer": 

43 """Create a layer from its descriptor 

44 

45 Args: 

46 descriptor (str): layer's descriptor path 

47 

48 Raises: 

49 FormatError: Provided path is not a well formed JSON 

50 MissingAttributeError: Attribute is missing in the content 

51 StorageError: Storage read issue (layer descriptor) 

52 MissingEnvironmentError: Missing object storage informations 

53 

54 Returns: 

55 Layer: a Layer instance 

56 """ 

57 try: 

58 data = json.loads(get_data_str(descriptor)) 

59 

60 except JSONDecodeError as e: 

61 raise FormatError("JSON", descriptor, e) 

62 

63 layer = cls() 

64 

65 storage_type, path, root, base_name = get_infos_from_path(descriptor) 

66 layer.__name = base_name[:-5] # on supprime l'extension.json 

67 

68 try: 

69 # Attributs communs 

70 layer.__title = data["title"] 

71 layer.__abstract = data["abstract"] 

72 layer.__load_pyramids(data["pyramids"]) 

73 

74 # Paramètres optionnels 

75 if "keywords" in data: 

76 for k in data["keywords"]: 

77 layer.__keywords.append(k) 

78 

79 if layer.type == PyramidType.RASTER: 

80 if "resampling" in data: 

81 layer.__resampling = data["resampling"] 

82 

83 if "styles" in data: 

84 layer.__styles = data["styles"] 

85 else: 

86 layer.__styles = ["normal"] 

87 

88 # Les bbox, native et géographique 

89 if "bbox" in data: 

90 layer.__geobbox = ( 

91 data["bbox"]["south"], 

92 data["bbox"]["west"], 

93 data["bbox"]["north"], 

94 data["bbox"]["east"], 

95 ) 

96 layer.__bbox = reproject_bbox(layer.__geobbox, "EPSG:4326", layer.__tms.srs, 5) 

97 # On force l'emprise de la couche, on recalcule donc les tuiles limites correspondantes pour chaque niveau 

98 for level in layer.__levels.values(): 

99 level.set_limits_from_bbox(layer.__bbox) 

100 else: 

101 layer.__bbox = layer.__best_level.bbox 

102 layer.__geobbox = reproject_bbox(layer.__bbox, layer.__tms.srs, "EPSG:4326", 5) 

103 

104 except KeyError as e: 

105 raise MissingAttributeError(descriptor, e) 

106 

107 return layer 

108 

109 @classmethod 

110 def from_parameters(cls, pyramids: List[Dict[str, str]], name: str, **kwargs) -> "Layer": 

111 """Create a default layer from parameters 

112 

113 Args: 

114 pyramids (List[Dict[str, str]]): pyramids to use and extrem levels, bottom and top 

115 name (str): layer's technical name 

116 **title (str): Layer's title (will be equal to name if not provided) 

117 **abstract (str): Layer's abstract (will be equal to name if not provided) 

118 **styles (List[str]): Styles identifier to authorized for the layer 

119 **resampling (str): Interpolation to use for resampling 

120 

121 Raises: 

122 Exception: name contains forbidden characters or used pyramids do not shared same parameters (format, tms...) 

123 

124 Returns: 

125 Layer: a Layer instance 

126 """ 

127 

128 layer = cls() 

129 

130 # Informations obligatoires 

131 if not re.match("^[A-Za-z0-9_-]*$", name): 

132 raise Exception( 

133 f"Layer's name have to contain only letters, number, hyphen and underscore, to be URL and storage compliant ({name})" 

134 ) 

135 

136 layer.__name = name 

137 layer.__load_pyramids(pyramids) 

138 

139 # Les bbox, native et géographique 

140 layer.__bbox = layer.__best_level.bbox 

141 layer.__geobbox = reproject_bbox(layer.__bbox, layer.__tms.srs, "EPSG:4326", 5) 

142 

143 # Informations calculées 

144 layer.__keywords.append(layer.type.name) 

145 layer.__keywords.append(layer.__name) 

146 

147 # Informations optionnelles 

148 if "title" in kwargs and kwargs["title"] is not None: 

149 layer.__title = kwargs["title"] 

150 else: 

151 layer.__title = name 

152 

153 if "abstract" in kwargs and kwargs["abstract"] is not None: 

154 layer.__abstract = kwargs["abstract"] 

155 else: 

156 layer.__abstract = name 

157 

158 if layer.type == PyramidType.RASTER: 

159 if "styles" in kwargs and kwargs["styles"] is not None and len(kwargs["styles"]) > 0: 

160 layer.__styles = kwargs["styles"] 

161 else: 

162 layer.__styles = ["normal"] 

163 

164 if "resampling" in kwargs and kwargs["resampling"] is not None: 

165 layer.__resampling = kwargs["resampling"] 

166 

167 return layer 

168 

169 def __init__(self) -> None: 

170 self.__format = None 

171 self.__tms = None 

172 self.__best_level = None 

173 self.__levels = {} 

174 self.__keywords = [] 

175 self.__pyramids = [] 

176 

177 def __load_pyramids(self, pyramids: List[Dict[str, str]]) -> None: 

178 """Load and check pyramids 

179 

180 Args: 

181 pyramids (List[Dict[str, str]]): List of descriptors' paths and optionnaly top and bottom levels 

182 

183 Raises: 

184 Exception: Pyramids' do not all own the same format 

185 Exception: Pyramids' do not all own the same TMS 

186 Exception: Pyramids' do not all own the same channels number 

187 Exception: Overlapping in usage pyramids' levels 

188 """ 

189 

190 # Toutes les pyramides doivent avoir les même caractéristiques 

191 channels = None 

192 for p in pyramids: 

193 pyramid = Pyramid.from_descriptor(p["path"]) 

194 bottom_level = p.get("bottom_level", None) 

195 top_level = p.get("top_level", None) 

196 

197 if bottom_level is None: 

198 bottom_level = pyramid.bottom_level.id 

199 

200 if top_level is None: 

201 top_level = pyramid.top_level.id 

202 

203 if self.__format is not None and self.__format != pyramid.format: 

204 raise Exception( 

205 f"Used pyramids have to own the same format : {self.__format} != {pyramid.format}" 

206 ) 

207 else: 

208 self.__format = pyramid.format 

209 

210 if self.__tms is not None and self.__tms.id != pyramid.tms.id: 

211 raise Exception( 

212 f"Used pyramids have to use the same TMS : {self.__tms.id} != {pyramid.tms.id}" 

213 ) 

214 else: 

215 self.__tms = pyramid.tms 

216 

217 if self.type == PyramidType.RASTER: 

218 if channels is not None and channels != pyramid.raster_specifications["channels"]: 

219 raise Exception( 

220 f"Used RASTER pyramids have to own the same number of channels : {channels} != {pyramid.raster_specifications['channels']}" 

221 ) 

222 else: 

223 channels = pyramid.raster_specifications["channels"] 

224 self.__resampling = pyramid.raster_specifications["interpolation"] 

225 

226 levels = pyramid.get_levels(bottom_level, top_level) 

227 for level in levels: 

228 if level.id in self.__levels: 

229 raise Exception(f"Level {level.id} is present in two used pyramids") 

230 self.__levels[level.id] = level 

231 

232 self.__pyramids.append( 

233 {"pyramid": pyramid, "bottom_level": bottom_level, "top_level": top_level} 

234 ) 

235 

236 self.__best_level = sorted(self.__levels.values(), key=lambda level: level.resolution)[0] 

237 

238 def __str__(self) -> str: 

239 return f"{self.type.name} layer '{self.__name}'" 

240 

241 @property 

242 def serializable(self) -> Dict: 

243 """Get the dict version of the layer object, descriptor compliant 

244 

245 Returns: 

246 Dict: descriptor structured object description 

247 """ 

248 serialization = { 

249 "title": self.__title, 

250 "abstract": self.__abstract, 

251 "keywords": self.__keywords, 

252 "wmts": {"authorized": True}, 

253 "tms": {"authorized": True}, 

254 "bbox": { 

255 "south": self.__geobbox[0], 

256 "west": self.__geobbox[1], 

257 "north": self.__geobbox[2], 

258 "east": self.__geobbox[3], 

259 }, 

260 "pyramids": [], 

261 } 

262 

263 for p in self.__pyramids: 

264 serialization["pyramids"].append( 

265 { 

266 "bottom_level": p["bottom_level"], 

267 "top_level": p["top_level"], 

268 "path": p["pyramid"].descriptor, 

269 } 

270 ) 

271 

272 if self.type == PyramidType.RASTER: 

273 serialization["wms"] = { 

274 "authorized": True, 

275 "crs": ["CRS:84", "IGNF:WGS84G", "EPSG:3857", "EPSG:4258", "EPSG:4326"], 

276 } 

277 

278 if self.__tms.srs.upper() not in serialization["wms"]["crs"]: 

279 serialization["wms"]["crs"].append(self.__tms.srs.upper()) 

280 

281 serialization["styles"] = self.__styles 

282 serialization["resampling"] = self.__resampling 

283 

284 return serialization 

285 

286 def write_descriptor(self, directory: str = None) -> None: 

287 """Print layer's descriptor as JSON 

288 

289 Args: 

290 directory (str, optional): Directory (file or object) where to print the layer's descriptor, called <layer's name>.json. Defaults to None, JSON is printed to standard output. 

291 """ 

292 content = json.dumps(self.serializable) 

293 

294 if directory is None: 

295 print(content) 

296 else: 

297 put_data_str(content, os.path.join(directory, f"{self.__name}.json")) 

298 

299 @property 

300 def type(self) -> PyramidType: 

301 if self.__format == "TIFF_PBF_MVT": 

302 return PyramidType.VECTOR 

303 else: 

304 return PyramidType.RASTER 

305 

306 @property 

307 def bbox(self) -> Tuple[float, float, float, float]: 

308 return self.__bbox 

309 

310 @property 

311 def geobbox(self) -> Tuple[float, float, float, float]: 

312 return self.__geobbox