Coverage for src/rok4_tools/tmsizer_utils/processors/map.py: 79%

120 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 07:30 +0000

1"""Provide processor tranforming data. Output data is emitted as the input processor reading progresses 

2 

3The module contains the following classes: 

4 

5- `Gettile2tileindexProcessor` - Extract tile index from a WMTS GetTile URL 

6- `Tileindex2gettileProcessor` - Generate WMTS GetTile query parameters from tile index 

7- `Tileindex2pointProcessor` - Generate the tile's center coordinates from tile index 

8- `Geometry2tileindexProcessor` - Generate all tiles' indices intersecting the input geometry 

9""" 

10 

11import sys 

12from typing import Dict, List, Tuple, Union, Iterator 

13from math import floor 

14from urllib.parse import urlparse, parse_qs 

15 

16from osgeo import gdal, ogr, osr 

17ogr.UseExceptions() 

18osr.UseExceptions() 

19gdal.UseExceptions() 

20 

21from rok4.tile_matrix_set import TileMatrixSet 

22from rok4.utils import bbox_to_geometry 

23 

24from rok4_tools.tmsizer_utils.processors.processor import Processor 

25 

26class Gettile2tileindexProcessor(Processor): 

27 """Processor extracting tile index from a WMTS GetTile URL 

28 

29 Accepted input format is "GETTILE_PARAMS" and output format is "TILE_INDEX" 

30 

31 Attributes: 

32 __input (Processor): Processor from which data is read 

33 __level (str, optional): Tile matrix identifier to filter data 

34 """ 

35 

36 input_formats_allowed = ["GETTILE_PARAMS"] 

37 

38 def __init__(self, input: Processor, **options): 

39 """Constructor method 

40 

41 Args: 

42 input (Processor): Processor from which data is read 

43 **level (str, optional): Tile matrix identifier to filter data 

44 

45 Raises: 

46 ValueError: Input format is not allowed 

47 ValueError: Provided level is not in the pivot TMS 

48 """ 

49 

50 if input.format not in self.input_formats_allowed: 

51 raise ValueError(f"Input format {input.format} is not handled for Gettile2tileindexProcessor : allowed formats are {self.input_formats_allowed}") 

52 

53 super().__init__("TILE_INDEX") 

54 

55 self.__input = input 

56 if "level" in options.keys(): 

57 if self.tms.get_level(options["level"]) is None: 

58 raise ValueError(f"Provided level is not in the TMS") 

59 self.__level = options["level"] 

60 else: 

61 self.__level = None 

62 

63 def process(self) -> Iterator[Tuple[str, int, int]]: 

64 """Read an item from the input processor and extract tile index 

65 

66 Used query parameters are TILEMATRIXSET, TILEMATRIX, TILECOL and TILEROW. If one is missing, item is passed.  

67 TILEMATRISET have to be the pivot TMS's name (or just preffixed by its name). TILEMATRIX have to be present in the pivot TMS.  

68 If a filtering level is provided, different TILEMATRIX are passed. 

69 

70 Examples: 

71 

72 Get tile index 

73 

74 from rok4_tools.tmsizer_utils.processors.map import Gettile2tileindexProcessor 

75 

76 try: 

77 # Creation of Processor source_processor with format GETTILE_PARAMS 

78 processor = Gettile2tileindexProcessor(source_processor, level="19" ) 

79 for item in processor.process(): 

80 (level, col, row) = item 

81 

82 except Exception as e: 

83 print("{e}") 

84 

85 Yields: 

86 Iterator[Tuple[str, int, int]]: Tile index (level, col, row) 

87 """ 

88 

89 if self.__input.format == "GETTILE_PARAMS": 

90 for item in self.__input.process(): 

91 self._processed += 1 

92 

93 qs = parse_qs(urlparse(item.upper()).query) 

94 try: 

95 

96 # On se limite à un niveau et ce n'est pas celui de la requête 

97 if self.__level is not None and qs["TILEMATRIX"][0] != self.__level: 

98 continue 

99 

100 # La requête n'utilise pas le TMS en entrée 

101 if qs["TILEMATRIXSET"][0] != self.tms.name and not qs["TILEMATRIXSET"][0].startswith(f"{self.tms.name}_"): 

102 continue 

103 

104 # La requête demande un niveau que le TMS ne possède pas 

105 if self.tms.get_level(qs["TILEMATRIX"][0]) is None: 

106 continue 

107 

108 yield (str(qs["TILEMATRIX"][0]),int(qs["TILECOL"][0]),int(qs["TILEROW"][0])) 

109 except Exception as e: 

110 # La requête n'est pas un gettile ou n'est pas valide, il manque un paramètre, ou il a un mauvais format 

111 # on la passe simplement 

112 pass 

113 

114 def __str__(self) -> str: 

115 return f"Gettile2tileindexProcessor : {self._processed} {self.__input.format} items processed, extracting tile's indices" 

116 

117 

118class Tileindex2gettileProcessor(Processor): 

119 """Processor generating WMTS GetTile query parameters from tile index 

120 

121 Accepted input format is "TILE_INDEX" and output format is "GETTILE_PARAMS" 

122 

123 Attributes: 

124 __input (Processor): Processor from which data is read 

125 """ 

126 

127 input_formats_allowed = ["TILE_INDEX"] 

128 

129 def __init__(self, input: Processor): 

130 """Constructor method 

131 

132 Args: 

133 input (Processor): Processor from which data is read 

134 

135 Raises: 

136 ValueError: Input format is not allowed 

137 """ 

138 

139 if input.format not in self.input_formats_allowed: 

140 raise ValueError(f"Input format {input.format} is not handled for Tileindex2gettileProcessor : allowed formats are {self.input_formats_allowed}") 

141 

142 super().__init__("GETTILE_PARAMS") 

143 

144 self.__input = input 

145 

146 def process(self) -> Iterator[str]: 

147 """Read a tile index from the input processor and generate WMTS GetTile query parameters 

148 

149 Examples: 

150 

151 Get GetTile query parameters 

152 

153 from rok4_tools.tmsizer_utils.processors.map import Tileindex2gettileProcessor 

154 

155 try: 

156 # Creation of Processor source_processor with format TILE_INDEX 

157 processor = Tileindex2gettileProcessor(source_processor) 

158 for item in processor.process(): 

159 query_parameters = item 

160 

161 except Exception as e: 

162 print("{e}") 

163 

164 Yields: 

165 Iterator[str]: GetTile query parameters TILEMATRIXSET=<tms>&TILEMATRIX=<level>&TILECOL=<col>&TILEROW=<row> 

166 """ 

167 

168 if self.__input.format == "TILE_INDEX": 

169 for item in self.__input.process(): 

170 self._processed += 1 

171 

172 (level, col, row) = item 

173 

174 yield f"TILEMATRIXSET={self.tms.name}&TILEMATRIX={level}&TILECOL={col}&TILEROW={row}" 

175 

176 def __str__(self) -> str: 

177 return f"Tileindex2gettileProcessor : {self._processed} {self.__input.format} items processed, generating GetTile's query parameters" 

178 

179class Tileindex2pointProcessor(Processor): 

180 """Processor generating the tile's center coordinates from tile index 

181 

182 Accepted input format is "TILE_INDEX" and output format is "POINT" 

183 

184 Attributes: 

185 __input (Processor): Processor from which data is read 

186 """ 

187 

188 input_formats_allowed = ["TILE_INDEX"] 

189 

190 def __init__(self, input: Processor): 

191 """Constructor method 

192 

193 Args: 

194 input (Processor): Processor from which data is read 

195 

196 Raises: 

197 ValueError: Input format is not allowed 

198 """ 

199 

200 if input.format not in self.input_formats_allowed: 

201 raise ValueError(f"Input format {input.format} is not handled for Tileindex2pointProcessor : allowed formats are {self.input_formats_allowed}") 

202 

203 super().__init__("POINT") 

204 

205 self.__input = input 

206 

207 def process(self) -> Iterator[Tuple[float, float]]: 

208 """Read a tile index from the input processor and generate the tile's center coordinates 

209 

210 Examples: 

211 

212 Get tile's center coordinates 

213 

214 from rok4_tools.tmsizer_utils.processors.map import Tileindex2pointProcessor 

215 

216 try: 

217 # Creation of Processor source_processor with format TILE_INDEX 

218 processor = Tileindex2pointProcessor(source_processor) 

219 for item in processor.process(): 

220 (x, y) = item 

221 

222 except Exception as e: 

223 print("{e}") 

224 

225 Yields: 

226 Iterator[Tuple[float, float]]: point coordinates (x,y) 

227 """ 

228 

229 if self.__input.format == "TILE_INDEX": 

230 for item in self.__input.process(): 

231 self._processed += 1 

232 

233 (level, col, row) = item 

234 try: 

235 bb = self.tms.get_level(level).tile_to_bbox(col, row) 

236 

237 x_center = bb[0] + (bb[2] - bb[0]) / 2; 

238 y_center = bb[1] + (bb[3] - bb[1]) / 2; 

239 

240 yield (x_center, y_center) 

241 except Exception as e: 

242 # Le niveau n'est pas valide, on passe simplement 

243 pass 

244 

245 def __str__(self) -> str: 

246 return f"Tileindex2pointProcessor : {self._processed} {self.__input.format} items processed, extracting tile's center coordinates" 

247 

248 

249class Geometry2tileindexProcessor(Processor): 

250 """Processor generating the tile's center coordinates from tile index 

251 

252 Accepted input format is "GEOMETRY" and output format is "TILE_INDEX" 

253 

254 Attributes: 

255 __input (Processor): Processor from which data is read 

256 __geometry_format (str): Format of input string geometries. "WKT", "WKB" or "GeoJSON" 

257 __level (str): Tile matrix identifier to define intersecting tiles 

258 """ 

259 

260 input_formats_allowed = ["GEOMETRY"] 

261 geometry_formats_allowed = ["WKT", "WKB", "GeoJSON"] 

262 

263 def __init__(self, input: Processor, **options): 

264 """Constructor method 

265 

266 Args: 

267 input (Processor): Processor from which data is read 

268 **format (str): Format of input string geometries. "WKT", "WKB" or "GeoJSON" 

269 **level (str): Tile matrix identifier to define intersecting tiles 

270 

271 Raises: 

272 ValueError: Input format is not allowed 

273 KeyError: A mandatory option is missing 

274 ValueError: A mandatory option is not valid 

275 ValueError: Provided level is not in the pivot TMS 

276 """ 

277 

278 if input.format not in self.input_formats_allowed: 

279 raise ValueError(f"Input format {input.format} is not handled for Geometry2tileindexProcessor : allowed formats are {self.input_formats_allowed}") 

280 

281 super().__init__("TILE_INDEX") 

282 

283 self.__input = input 

284 

285 try: 

286 

287 if options["format"] not in self.geometry_formats_allowed: 

288 raise ValueError(f"Option 'format' for an input geometry is not handled ({options['format']}) : allowed formats are {self.geometry_formats_allowed}") 

289 

290 self.__geometry_format = options["format"] 

291 

292 if self.tms.get_level(options["level"]) is None: 

293 raise ValueError(f"Provided level is not in the TMS") 

294 

295 self.__level = options["level"] 

296 

297 except KeyError as e: 

298 raise KeyError(f"Option {e} is required to generate tile indices from geometries") 

299 

300 def process(self) -> Iterator[Tuple[str, int, int]]: 

301 """Read a geometry from the input processor and extract tile index 

302 

303 Geometry is parsed according to provided format. To determine intersecting tiles, geometry have to be a Polygon or a MultiPolygon.  

304 For an input geometry, all intersecting tiles for the provided level are yielded 

305 

306 Examples: 

307 

308 Get intersecting tiles' indices 

309 

310 from rok4_tools.tmsizer_utils.processors.map import Geometry2tileindexProcessor 

311 

312 try: 

313 # Creation of Processor source_processor with format GEOMETRY 

314 processor = Geometry2tileindexProcessor(source_processor, level="15", format="GeoJSON" ) 

315 for item in processor.process(): 

316 (level, col, row) = item 

317 

318 except Exception as e: 

319 print("{e}") 

320 

321 Yields: 

322 Iterator[Tuple[str, int, int]]: Tile index (level, col, row) 

323 """ 

324 

325 tile_matrix = self.tms.get_level(self.__level) 

326 

327 if self.__input.format == "GEOMETRY": 

328 

329 for item in self.__input.process(): 

330 self._processed += 1 

331 

332 try: 

333 geom = None 

334 if self.__geometry_format == "WKT": 

335 geom = ogr.ForceToMultiPolygon(ogr.CreateGeometryFromWkt(item)) 

336 elif self.__geometry_format == "GeoJSON": 

337 geom = ogr.ForceToMultiPolygon(ogr.CreateGeometryFromJson(item)) 

338 elif self.__geometry_format == "WKB": 

339 geom = ogr.ForceToMultiPolygon(ogr.CreateGeometryFromWkb(item)) 

340 

341 for i in range(0, geom.GetGeometryCount()): 

342 g = geom.GetGeometryRef(i) 

343 xmin, xmax, ymin, ymax = g.GetEnvelope() 

344 col_min, row_min, col_max, row_max = tile_matrix.bbox_to_tiles((xmin, ymin, xmax, ymax)) 

345 

346 for col in range(col_min, col_max + 1): 

347 for row in range(row_min, row_max + 1): 

348 tg = bbox_to_geometry(tile_matrix.tile_to_bbox(col, row)) 

349 if g.Intersects(tg): 

350 yield (self.__level, col, row) 

351 

352 except Exception as e: 

353 # La géométrie n'est pas valide, on la passe simplement 

354 print(e) 

355 continue 

356 

357 def __str__(self) -> str: 

358 return f"Geometry2tileindexProcessor : {self._processed} {self.__input.format} items processed (format {self.__geometry_format}), extracting intersecting tile's indices"