Coverage for src/rok4/pyramid.py: 75%

497 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-01 15:35 +0000

1"""Provide classes to use pyramid's data. 

2 

3The module contains the following classes: 

4 

5- `Pyramid` - Data container 

6- `Level` - Level of a pyramid 

7""" 

8 

9# -- IMPORTS -- 

10 

11# standard library 

12import io 

13import json 

14import os 

15import re 

16import tempfile 

17import zlib 

18from json.decoder import JSONDecodeError 

19from typing import Dict, Iterator, List, Tuple 

20 

21# 3rd party 

22import mapbox_vector_tile 

23import numpy 

24from PIL import Image 

25 

26# package 

27from rok4.enums import PyramidType, SlabType, StorageType 

28from rok4.exceptions import FormatError, MissingAttributeError 

29from rok4.storage import ( 

30 copy, 

31 get_data_binary, 

32 get_data_str, 

33 get_infos_from_path, 

34 get_path_from_infos, 

35 put_data_str, 

36 remove, 

37 size_path, 

38) 

39from rok4.tile_matrix_set import TileMatrix, TileMatrixSet 

40from rok4.utils import reproject_point, srs_to_spatialreference 

41 

42# -- GLOBALS -- 

43ROK4_IMAGE_HEADER_SIZE = 2048 

44"""Slab's header size, 2048 bytes""" 

45 

46 

47def b36_number_encode(number: int) -> str: 

48 """Convert base-10 number to base-36 

49 

50 Used alphabet is '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' 

51 

52 Args: 

53 number (int): base-10 number 

54 

55 Returns: 

56 str: base-36 number 

57 """ 

58 

59 alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" 

60 

61 base36 = "" 

62 

63 if 0 <= number < len(alphabet): 

64 return alphabet[number] 

65 

66 while number != 0: 

67 number, i = divmod(number, len(alphabet)) 

68 base36 = alphabet[i] + base36 

69 

70 return base36 

71 

72 

73def b36_number_decode(number: str) -> int: 

74 """Convert base-36 number to base-10 

75 

76 Args: 

77 number (str): base-36 number 

78 

79 Returns: 

80 int: base-10 number 

81 """ 

82 return int(number, 36) 

83 

84 

85def b36_path_decode(path: str) -> Tuple[int, int]: 

86 """Get slab's column and row from a base-36 based path 

87 

88 Args: 

89 path (str): slab's path 

90 

91 Returns: 

92 Tuple[int, int]: slab's column and row 

93 """ 

94 

95 path = path.replace("/", "") 

96 path = re.sub(r"(\.TIFF?)", "", path.upper()) 

97 

98 b36_column = "" 

99 b36_row = "" 

100 

101 while len(path) > 0: 

102 b36_column += path[0] 

103 b36_row += path[1] 

104 path = path[2:] 

105 

106 return b36_number_decode(b36_column), b36_number_decode(b36_row) 

107 

108 

109def b36_path_encode(column: int, row: int, slashs: int) -> str: 

110 """Convert slab indices to base-36 based path, with .tif extension 

111 

112 Args: 

113 column (int): slab's column 

114 row (int): slab's row 

115 slashs (int): slashs' number (to split path) 

116 

117 Returns: 

118 str: base-36 based path 

119 """ 

120 

121 b36_column = b36_number_encode(column) 

122 b36_row = b36_number_encode(row) 

123 

124 max_len = max(slashs + 1, len(b36_column), len(b36_row)) 

125 

126 b36_column = b36_column.rjust(max_len, "0") 

127 b36_row = b36_row.rjust(max_len, "0") 

128 

129 b36_path = "" 

130 

131 while len(b36_column) > 0: 

132 b36_path = b36_row[-1] + b36_path 

133 b36_path = b36_column[-1] + b36_path 

134 

135 b36_column = b36_column[:-1] 

136 b36_row = b36_row[:-1] 

137 

138 if slashs > 0: 

139 b36_path = "/" + b36_path 

140 slashs -= 1 

141 

142 return f"{b36_path}.tif" 

143 

144 

145class Level: 

146 """A pyramid's level, raster or vector 

147 

148 Attributes: 

149 __id (str): level's identifier. have to exist in the pyramid's used TMS 

150 __tile_limits (Dict[str, int]): minimum and maximum tiles' columns and rows of pyramid's content 

151 __slab_size (Tuple[int, int]): number of tile in a slab, widthwise and heightwise 

152 __tables (List[Dict]): for a VECTOR pyramid, description of vector content, tables and attributes 

153 """ 

154 

155 @classmethod 

156 def from_descriptor(cls, data: Dict, pyramid: "Pyramid") -> "Level": 

157 """Create a pyramid's level from the pyramid's descriptor levels element 

158 

159 Args: 

160 data (Dict): level's information from the pyramid's descriptor 

161 pyramid (Pyramid): pyramid containing the level to create 

162 

163 Raises: 

164 Exception: different storage or masks presence between the level and the pyramid 

165 MissingAttributeError: Attribute is missing in the content 

166 

167 Returns: 

168 Pyramid: a Level instance 

169 """ 

170 level = cls() 

171 

172 level.__pyramid = pyramid 

173 

174 # Attributs communs 

175 try: 

176 level.__id = data["id"] 

177 level.__tile_limits = data["tile_limits"] 

178 level.__slab_size = ( 

179 data["tiles_per_width"], 

180 data["tiles_per_height"], 

181 ) 

182 

183 # Informations sur le stockage : on les valide et stocke dans la pyramide 

184 if pyramid.storage_type.name != data["storage"]["type"]: 

185 raise Exception( 

186 f"Pyramid {pyramid.descriptor} owns levels using different storage types ({ data['storage']['type'] }) than its one ({pyramid.storage_type.name})" 

187 ) 

188 

189 if pyramid.storage_type == StorageType.FILE: 

190 pyramid.storage_depth = data["storage"]["path_depth"] 

191 

192 if "mask_directory" in data["storage"] or "mask_prefix" in data["storage"]: 

193 if not pyramid.own_masks: 

194 raise Exception( 

195 f"Pyramid {pyramid.__descriptor} does not define a mask format but level {level.__id} define mask storage informations" 

196 ) 

197 else: 

198 if pyramid.own_masks: 

199 raise Exception( 

200 f"Pyramid {pyramid.__descriptor} define a mask format but level {level.__id} does not define mask storage informations" 

201 ) 

202 

203 except KeyError as e: 

204 raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}") 

205 

206 # Attributs dans le cas d'un niveau vecteur 

207 if level.__pyramid.type == PyramidType.VECTOR: 

208 try: 

209 level.__tables = data["tables"] 

210 

211 except KeyError as e: 

212 raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}") 

213 

214 return level 

215 

216 @classmethod 

217 def from_other(cls, other: "Level", pyramid: "Pyramid") -> "Level": 

218 """Create a pyramid's level from another one 

219 

220 Args: 

221 other (Level): level to clone 

222 pyramid (Pyramid): new pyramid containing the new level 

223 

224 Raises: 

225 Exception: different storage or masks presence between the level and the pyramid 

226 MissingAttributeError: Attribute is missing in the content 

227 

228 Returns: 

229 Pyramid: a Level instance 

230 """ 

231 

232 level = cls() 

233 

234 # Attributs communs 

235 level.__id = other.__id 

236 level.__pyramid = pyramid 

237 level.__tile_limits = other.__tile_limits 

238 level.__slab_size = other.__slab_size 

239 

240 # Attributs dans le cas d'un niveau vecteur 

241 if level.__pyramid.type == PyramidType.VECTOR: 

242 level.__tables = other.__tables 

243 

244 return level 

245 

246 def __str__(self) -> str: 

247 return f"{self.__pyramid.type.name} pyramid's level '{self.__id}' ({self.__pyramid.storage_type.name} storage)" 

248 

249 @property 

250 def serializable(self) -> Dict: 

251 """Get the dict version of the pyramid object, pyramid's descriptor compliant 

252 

253 Returns: 

254 Dict: pyramid's descriptor structured object description 

255 """ 

256 serialization = { 

257 "id": self.__id, 

258 "tiles_per_width": self.__slab_size[0], 

259 "tiles_per_height": self.__slab_size[1], 

260 "tile_limits": self.__tile_limits, 

261 } 

262 

263 if self.__pyramid.type == PyramidType.VECTOR: 

264 serialization["tables"] = self.__tables 

265 

266 if self.__pyramid.storage_type == StorageType.FILE: 

267 serialization["storage"] = { 

268 "type": "FILE", 

269 "image_directory": f"{self.__pyramid.name}/DATA/{self.__id}", 

270 "path_depth": self.__pyramid.storage_depth, 

271 } 

272 if self.__pyramid.own_masks: 

273 serialization["storage"][ 

274 "mask_directory" 

275 ] = f"{self.__pyramid.name}/MASK/{self.__id}" 

276 

277 elif self.__pyramid.storage_type == StorageType.CEPH: 

278 serialization["storage"] = { 

279 "type": "CEPH", 

280 "image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}", 

281 "pool_name": self.__pyramid.storage_root, 

282 } 

283 if self.__pyramid.own_masks: 

284 serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}" 

285 

286 elif self.__pyramid.storage_type == StorageType.S3: 

287 serialization["storage"] = { 

288 "type": "S3", 

289 "image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}", 

290 "bucket_name": self.__pyramid.storage_root, 

291 } 

292 if self.__pyramid.own_masks: 

293 serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}" 

294 

295 return serialization 

296 

297 @property 

298 def id(self) -> str: 

299 return self.__id 

300 

301 @property 

302 def bbox(self) -> Tuple[float, float, float, float]: 

303 """Return level extent, based on tile limits 

304 

305 Returns: 

306 Tuple[float, float, float, float]: level terrain extent (xmin, ymin, xmax, ymax) 

307 """ 

308 

309 min_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox( 

310 self.__tile_limits["min_col"], self.__tile_limits["max_row"] 

311 ) 

312 max_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox( 

313 self.__tile_limits["max_col"], self.__tile_limits["min_row"] 

314 ) 

315 

316 return (min_bbox[0], min_bbox[1], max_bbox[2], max_bbox[3]) 

317 

318 @property 

319 def resolution(self) -> str: 

320 return self.__pyramid.tms.get_level(self.__id).resolution 

321 

322 @property 

323 def tile_matrix(self) -> TileMatrix: 

324 return self.__pyramid.tms.get_level(self.__id) 

325 

326 @property 

327 def slab_width(self) -> int: 

328 return self.__slab_size[0] 

329 

330 @property 

331 def slab_height(self) -> int: 

332 return self.__slab_size[1] 

333 

334 @property 

335 def tile_limits(self) -> Dict[str, int]: 

336 return self.__tile_limits 

337 

338 def is_in_limits(self, column: int, row: int) -> bool: 

339 """Is the tile indices in limits ? 

340 

341 Args: 

342 column (int): tile's column 

343 row (int): tile's row 

344 

345 Returns: 

346 bool: True if tiles' limits contain the provided tile's indices 

347 """ 

348 return ( 

349 self.__tile_limits["min_row"] <= row 

350 and self.__tile_limits["max_row"] >= row 

351 and self.__tile_limits["min_col"] <= column 

352 and self.__tile_limits["max_col"] >= column 

353 ) 

354 

355 def set_limits_from_bbox(self, bbox: Tuple[float, float, float, float]) -> None: 

356 """Set tile limits, based on provided bounding box 

357 

358 Args: 

359 bbox (Tuple[float, float, float, float]): terrain extent (xmin, ymin, xmax, ymax), in TMS coordinates system 

360 

361 """ 

362 

363 col_min, row_min, col_max, row_max = self.__pyramid.tms.get_level(self.__id).bbox_to_tiles( 

364 bbox 

365 ) 

366 self.__tile_limits = { 

367 "min_row": row_min, 

368 "max_col": col_max, 

369 "max_row": row_max, 

370 "min_col": col_min, 

371 } 

372 

373 

374class Pyramid: 

375 """A data pyramid, raster or vector 

376 

377 Attributes: 

378 __name (str): pyramid's name 

379 __descriptor (str): pyramid's descriptor path 

380 __list (str): pyramid's list path 

381 __tms (rok4.tile_matrix_set.TileMatrixSet): Used grid 

382 __levels (Dict[str, Level]): Pyramid's levels 

383 __format (str): Data format 

384 __storage (Dict[str, Union[rok4.enums.StorageType,str,int]]): Pyramid's storage informations (type, root and depth if FILE storage) 

385 __raster_specifications (Dict): If raster pyramid, raster specifications 

386 __content (Dict): Loading status (loaded), slab count (count) and list content (cache). 

387 

388 Example (S3 storage): 

389 

390 { 

391 'cache': { 

392 (<SlabType.DATA: 'DATA'>, '18', 5424, 7526): { 

393 'link': False, 

394 'md5': None, 

395 'root': 'pyramids@localhost:9000/LIMADM', 

396 'slab': 'DATA_18_5424_7526' 

397 } 

398 }, 

399 'count': 1, 

400 'loaded': True 

401 } 

402 """ 

403 

404 @classmethod 

405 def from_descriptor(cls, descriptor: str) -> "Pyramid": 

406 """Create a pyramid from its descriptor 

407 

408 Args: 

409 descriptor (str): pyramid's descriptor path 

410 

411 Raises: 

412 FormatError: Provided path or the descriptor is not a well formed JSON 

413 Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS 

414 MissingAttributeError: Attribute is missing in the content 

415 StorageError: Storage read issue (pyramid descriptor or TMS) 

416 MissingEnvironmentError: Missing object storage informations or TMS root directory 

417 

418 Examples: 

419 

420 S3 stored descriptor 

421 

422 from rok4.pyramid import Pyramid 

423 

424 try: 

425 pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") 

426 except Exception as e: 

427 print("Cannot load the pyramid from its descriptor") 

428 

429 Returns: 

430 Pyramid: a Pyramid instance 

431 """ 

432 try: 

433 data = json.loads(get_data_str(descriptor)) 

434 

435 except JSONDecodeError as e: 

436 raise FormatError("JSON", descriptor, e) 

437 

438 pyramid = cls() 

439 

440 pyramid.__storage["type"], path, pyramid.__storage["root"], base_name = get_infos_from_path( 

441 descriptor 

442 ) 

443 pyramid.__name = base_name[:-5] # on supprime l'extension.json 

444 pyramid.__descriptor = descriptor 

445 pyramid.__list = get_path_from_infos( 

446 pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list" 

447 ) 

448 

449 try: 

450 # Attributs communs 

451 pyramid.__tms = TileMatrixSet(data["tile_matrix_set"]) 

452 pyramid.__format = data["format"] 

453 

454 # Attributs d'une pyramide raster 

455 if pyramid.type == PyramidType.RASTER: 

456 pyramid.__raster_specifications = data["raster_specifications"] 

457 

458 if "mask_format" in data: 

459 pyramid.__masks = True 

460 else: 

461 pyramid.__masks = False 

462 

463 # Niveaux 

464 for level in data["levels"]: 

465 lev = Level.from_descriptor(level, pyramid) 

466 pyramid.__levels[lev.id] = lev 

467 

468 if pyramid.__tms.get_level(lev.id) is None: 

469 raise Exception( 

470 f"Pyramid {descriptor} owns a level with the ID '{lev.id}', not defined in the TMS '{pyramid.tms.name}'" 

471 ) 

472 

473 except KeyError as e: 

474 raise MissingAttributeError(descriptor, e) 

475 

476 if len(pyramid.__levels.keys()) == 0: 

477 raise Exception(f"Pyramid '{descriptor}' has no level") 

478 

479 return pyramid 

480 

481 @classmethod 

482 def from_other(cls, other: "Pyramid", name: str, storage: Dict, **kwargs) -> "Pyramid": 

483 """Create a pyramid from another one 

484 

485 Args: 

486 other (Pyramid): pyramid to clone 

487 name (str): new pyramid's name 

488 storage (Dict[str, Union[str, int]]): new pyramid's storage informations 

489 **mask (bool) : Presence or not of mask (only for RASTER) 

490 

491 Raises: 

492 FormatError: Provided path or the TMS is not a well formed JSON 

493 Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS 

494 MissingAttributeError: Attribute is missing in the content 

495 

496 Returns: 

497 Pyramid: a Pyramid instance 

498 """ 

499 try: 

500 # On convertit le type de stockage selon l'énumération 

501 if type(storage["type"]) is str: 

502 storage["type"] = StorageType[storage["type"]] 

503 

504 if storage["type"] == StorageType.FILE and name.find("/") != -1: 

505 raise Exception(f"A FILE stored pyramid's name cannot contain '/' : '{name}'") 

506 

507 if storage["type"] == StorageType.FILE and "depth" not in storage: 

508 storage["depth"] = 2 

509 

510 pyramid = cls() 

511 

512 # Attributs communs 

513 pyramid.__name = name 

514 pyramid.__storage = storage 

515 pyramid.__masks = other.__masks 

516 

517 pyramid.__descriptor = get_path_from_infos( 

518 pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.json" 

519 ) 

520 pyramid.__list = get_path_from_infos( 

521 pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list" 

522 ) 

523 pyramid.__tms = other.__tms 

524 pyramid.__format = other.__format 

525 

526 # Attributs d'une pyramide raster 

527 if pyramid.type == PyramidType.RASTER: 

528 if "mask" in kwargs: 

529 pyramid.__masks = kwargs["mask"] 

530 elif other.own_masks: 

531 pyramid.__masks = True 

532 else: 

533 pyramid.__masks = False 

534 pyramid.__raster_specifications = other.__raster_specifications 

535 

536 # Niveaux 

537 for level in other.__levels.values(): 

538 lev = Level.from_other(level, pyramid) 

539 pyramid.__levels[lev.id] = lev 

540 

541 except KeyError as e: 

542 raise MissingAttributeError(pyramid.descriptor, e) 

543 

544 return pyramid 

545 

546 def __init__(self) -> None: 

547 self.__storage = {} 

548 self.__levels = {} 

549 self.__masks = None 

550 

551 self.__content = {"loaded": False, "count": 0, "cache": {}} 

552 

553 def __str__(self) -> str: 

554 return f"{self.type.name} pyramid '{self.__name}' ({self.__storage['type'].name} storage)" 

555 

556 @property 

557 def serializable(self) -> Dict: 

558 """Get the dict version of the pyramid object, descriptor compliant 

559 

560 Returns: 

561 Dict: descriptor structured object description 

562 """ 

563 

564 serialization = {"tile_matrix_set": self.__tms.name, "format": self.__format} 

565 

566 serialization["levels"] = [] 

567 sorted_levels = sorted( 

568 self.__levels.values(), key=lambda level: level.resolution, reverse=True 

569 ) 

570 

571 for level in sorted_levels: 

572 serialization["levels"].append(level.serializable) 

573 

574 if self.type == PyramidType.RASTER: 

575 serialization["raster_specifications"] = self.__raster_specifications 

576 

577 if self.__masks: 

578 serialization["mask_format"] = "TIFF_ZIP_UINT8" 

579 

580 return serialization 

581 

582 @property 

583 def list(self) -> str: 

584 return self.__list 

585 

586 @property 

587 def descriptor(self) -> str: 

588 return self.__descriptor 

589 

590 @property 

591 def name(self) -> str: 

592 return self.__name 

593 

594 @property 

595 def tms(self) -> TileMatrixSet: 

596 return self.__tms 

597 

598 @property 

599 def raster_specifications(self) -> Dict: 

600 """Get raster specifications for a RASTER pyramid 

601 

602 Example: 

603 

604 RGB pyramid with red nodata 

605 

606 { 

607 "channels": 3, 

608 "nodata": "255,0,0", 

609 "photometric": "rgb", 

610 "interpolation": "bicubic" 

611 } 

612 

613 Returns: 

614 Dict: Raster specifications, None if VECTOR pyramid 

615 """ 

616 return self.__raster_specifications 

617 

618 @property 

619 def storage_type(self) -> StorageType: 

620 """Get the storage type 

621 

622 Returns: 

623 StorageType: FILE, S3 or CEPH 

624 """ 

625 return self.__storage["type"] 

626 

627 @property 

628 def storage_root(self) -> str: 

629 """Get the pyramid's storage root. 

630 

631 If storage is S3, the used cluster is removed. 

632 

633 Returns: 

634 str: Pyramid's storage root 

635 """ 

636 

637 return self.__storage["root"].split("@", 1)[ 

638 0 

639 ] # Suppression de l'éventuel hôte de spécification du cluster S3 

640 

641 @property 

642 def storage_depth(self) -> int: 

643 return self.__storage.get("depth", None) 

644 

645 @property 

646 def storage_s3_cluster(self) -> str: 

647 """Get the pyramid's storage S3 cluster (host name) 

648 

649 Returns: 

650 str: the host if known, None if the default one have to be used or if storage is not S3 

651 """ 

652 if self.__storage["type"] == StorageType.S3: 

653 try: 

654 return self.__storage["root"].split("@")[1] 

655 except IndexError: 

656 return None 

657 else: 

658 return None 

659 

660 @storage_depth.setter 

661 def storage_depth(self, d: int) -> None: 

662 """Set the tree depth for a FILE storage 

663 

664 Args: 

665 d (int): file storage depth 

666 

667 Raises: 

668 Exception: the depth is not equal to the already known depth 

669 """ 

670 if "depth" in self.__storage and self.__storage["depth"] != d: 

671 raise Exception(f"Pyramid {self.__descriptor} owns levels with different path depths") 

672 self.__storage["depth"] = d 

673 

674 @property 

675 def own_masks(self) -> bool: 

676 return self.__masks 

677 

678 @property 

679 def format(self) -> str: 

680 return self.__format 

681 

682 @property 

683 def channels(self) -> str: 

684 return self.raster_specifications["channels"] 

685 

686 @property 

687 def tile_extension(self) -> str: 

688 if self.__format in [ 

689 "TIFF_RAW_UINT8", 

690 "TIFF_LZW_UINT8", 

691 "TIFF_ZIP_UINT8", 

692 "TIFF_PKB_UINT8", 

693 "TIFF_RAW_FLOAT32", 

694 "TIFF_LZW_FLOAT32", 

695 "TIFF_ZIP_FLOAT32", 

696 "TIFF_PKB_FLOAT32", 

697 ]: 

698 return "tif" 

699 elif self.__format in ["TIFF_JPG_UINT8", "TIFF_JPG90_UINT8"]: 

700 return "jpg" 

701 elif self.__format == "TIFF_PNG_UINT8": 

702 return "png" 

703 elif self.__format == "TIFF_PBF_MVT": 

704 return "pbf" 

705 else: 

706 raise Exception( 

707 f"Unknown pyramid's format ({self.__format}), cannot return the tile extension" 

708 ) 

709 

710 @property 

711 def bottom_level(self) -> "Level": 

712 """Get the best resolution level in the pyramid 

713 

714 Returns: 

715 Level: the bottom level 

716 """ 

717 return sorted(self.__levels.values(), key=lambda level: level.resolution)[0] 

718 

719 @property 

720 def top_level(self) -> "Level": 

721 """Get the low resolution level in the pyramid 

722 

723 Returns: 

724 Level: the top level 

725 """ 

726 return sorted(self.__levels.values(), key=lambda level: level.resolution)[-1] 

727 

728 @property 

729 def type(self) -> PyramidType: 

730 """Get the pyramid's type (RASTER or VECTOR) from its format 

731 

732 Returns: 

733 PyramidType: RASTER or VECTOR 

734 """ 

735 if self.__format == "TIFF_PBF_MVT": 

736 return PyramidType.VECTOR 

737 else: 

738 return PyramidType.RASTER 

739 

740 def load_list(self) -> int: 

741 """Load list content and cache it 

742 

743 If list is already loaded, nothing done 

744 """ 

745 if self.__content["loaded"]: 

746 return self.__content["count"] 

747 

748 for slab, infos in self.list_generator(): 

749 self.__content["cache"][slab] = infos 

750 self.__content["count"] += 1 

751 

752 self.__content["loaded"] = True 

753 

754 return self.__content["count"] 

755 

756 def list_generator( 

757 self, level_id: str = None 

758 ) -> Iterator[Tuple[Tuple[SlabType, str, int, int], Dict]]: 

759 """Get list content 

760 

761 List is copied as temporary file, roots are read and informations about each slab is returned. If list is already loaded, we yield the cached content 

762 Args : 

763 level_id (str) : id of the level for load only one level 

764 

765 Examples: 

766 

767 S3 stored descriptor 

768 

769 from rok4.pyramid import Pyramid 

770 

771 try: 

772 pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") 

773 

774 for (slab_type, level, column, row), infos in pyramid.list_generator(): 

775 print(infos) 

776 

777 except Exception as e: 

778 print("Cannot load the pyramid from its descriptor and read the list") 

779 

780 Yields: 

781 Iterator[Tuple[Tuple[SlabType,str,int,int], Dict]]: Slab indices and storage informations 

782 

783 Value example: 

784 

785 ( 

786 (<SlabType.DATA: 'DATA'>, '18', 5424, 7526), 

787 { 

788 'link': False, 

789 'md5': None, 

790 'root': 'pyramids@localhost:9000/LIMADM', 

791 'slab': 'DATA_18_5424_7526' 

792 } 

793 ) 

794 

795 Raises: 

796 StorageError: Unhandled pyramid storage to copy list 

797 MissingEnvironmentError: Missing object storage informations 

798 """ 

799 if self.__content["loaded"]: 

800 for slab, infos in self.__content["cache"].items(): 

801 if level_id is not None: 

802 if slab[1] == level_id: 

803 yield slab, infos 

804 else: 

805 yield slab, infos 

806 else: 

807 # Copie de la liste dans un fichier temporaire (cette liste peut être un objet) 

808 list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) 

809 list_file = list_obj.name 

810 copy(self.__list, f"file://{list_file}") 

811 list_obj.close() 

812 

813 roots = {} 

814 s3_cluster = self.storage_s3_cluster 

815 

816 with open(list_file) as listin: 

817 # Lecture des racines 

818 for line in listin: 

819 line = line.rstrip() 

820 

821 if line == "#": 

822 break 

823 

824 root_id, root_path = line.split("=", 1) 

825 

826 if s3_cluster is None: 

827 roots[root_id] = root_path 

828 else: 

829 # On a un nom de cluster S3, on l'ajoute au nom du bucket dans les racines 

830 root_bucket, root_path = root_path.split("/", 1) 

831 roots[root_id] = f"{root_bucket}@{s3_cluster}/{root_path}" 

832 

833 # Lecture des dalles 

834 for line in listin: 

835 line = line.rstrip() 

836 

837 parts = line.split(" ", 1) 

838 slab_path = parts[0] 

839 slab_md5 = None 

840 if len(parts) == 2: 

841 slab_md5 = parts[1] 

842 

843 root_id, slab_path = slab_path.split("/", 1) 

844 

845 slab_type, level, column, row = self.get_infos_from_slab_path(slab_path) 

846 infos = { 

847 "root": roots[root_id], 

848 "link": root_id != "0", 

849 "slab": slab_path, 

850 "md5": slab_md5, 

851 } 

852 

853 if level_id is not None: 

854 if level == level_id: 

855 yield ((slab_type, level, column, row), infos) 

856 else: 

857 yield ((slab_type, level, column, row), infos) 

858 

859 remove(f"file://{list_file}") 

860 

861 def get_level(self, level_id: str) -> "Level": 

862 """Get one level according to its identifier 

863 

864 Args: 

865 level_id: Level identifier 

866 

867 Returns: 

868 The corresponding pyramid's level, None if not present 

869 """ 

870 

871 return self.__levels.get(level_id, None) 

872 

873 def get_levels(self, bottom_id: str = None, top_id: str = None) -> List[Level]: 

874 """Get sorted levels in the provided range from bottom to top 

875 

876 Args: 

877 bottom_id (str, optionnal): specific bottom level id. Defaults to None. 

878 top_id (str, optionnal): specific top level id. Defaults to None. 

879 

880 Raises: 

881 Exception: Provided levels are not consistent (bottom > top or not in the pyramid) 

882 

883 Examples: 

884 

885 All levels 

886 

887 from rok4.pyramid import Pyramid 

888 

889 try: 

890 pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") 

891 levels = pyramid.get_levels() 

892 

893 except Exception as e: 

894 print("Cannot load the pyramid from its descriptor and get levels") 

895 

896 From pyramid's bottom to provided top (level 5) 

897 

898 from rok4.pyramid import Pyramid 

899 

900 try: 

901 pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") 

902 levels = pyramid.get_levels(None, "5") 

903 

904 except Exception as e: 

905 print("Cannot load the pyramid from its descriptor and get levels") 

906 

907 Returns: 

908 List[Level]: asked sorted levels 

909 """ 

910 

911 sorted_levels = sorted(self.__levels.values(), key=lambda level: level.resolution) 

912 

913 levels = [] 

914 

915 begin = False 

916 if bottom_id is None: 

917 # Pas de niveau du bas fourni, on commence tout en bas 

918 begin = True 

919 else: 

920 if self.get_level(bottom_id) is None: 

921 raise Exception( 

922 f"Pyramid {self.name} does not contain the provided bottom level {bottom_id}" 

923 ) 

924 

925 if top_id is not None and self.get_level(top_id) is None: 

926 raise Exception(f"Pyramid {self.name} does not contain the provided top level {top_id}") 

927 

928 end = False 

929 

930 for level in sorted_levels: 

931 if not begin and level.id == bottom_id: 

932 begin = True 

933 

934 if begin: 

935 levels.append(level) 

936 if top_id is not None and level.id == top_id: 

937 end = True 

938 break 

939 else: 

940 continue 

941 

942 if top_id is None: 

943 # Pas de niveau du haut fourni, on a été jusqu'en haut et c'est normal 

944 end = True 

945 

946 if not begin or not end: 

947 raise Exception( 

948 f"Provided levels ids are not consistent to extract levels from the pyramid {self.name}" 

949 ) 

950 

951 return levels 

952 

953 def write_descriptor(self) -> None: 

954 """Write the pyramid's descriptor to the final location (in the pyramid's storage root)""" 

955 

956 content = json.dumps(self.serializable) 

957 put_data_str(content, self.__descriptor) 

958 

959 def get_infos_from_slab_path(self, path: str) -> Tuple[SlabType, str, int, int]: 

960 """Get the slab's indices from its storage path 

961 

962 Args: 

963 path (str): Slab's storage path 

964 

965 Examples: 

966 

967 FILE stored pyramid 

968 

969 from rok4.pyramid import Pyramid 

970 

971 try: 

972 pyramid = Pyramid.from_descriptor("/path/to/descriptor.json") 

973 slab_type, level, column, row = self.get_infos_from_slab_path("DATA/12/00/4A/F7.tif") 

974 # (SlabType.DATA, "12", 159, 367) 

975 except Exception as e: 

976 print("Cannot load the pyramid from its descriptor and convert a slab path") 

977 

978 S3 stored pyramid 

979 

980 from rok4.pyramid import Pyramid 

981 

982 try: 

983 pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/pyramid.json") 

984 slab_type, level, column, row = self.get_infos_from_slab_path("s3://bucket_name/path/to/pyramid/MASK_15_9164_5846") 

985 # (SlabType.MASK, "15", 9164, 5846) 

986 except Exception as e: 

987 print("Cannot load the pyramid from its descriptor and convert a slab path") 

988 

989 Returns: 

990 Tuple[SlabType, str, int, int]: Slab's type (DATA or MASK), level identifier, slab's column and slab's row 

991 """ 

992 if self.__storage["type"] == StorageType.FILE: 

993 parts = path.split("/") 

994 

995 # Le partie du chemin qui contient la colonne et ligne de la dalle est à la fin, en fonction de la profondeur choisie 

996 # depth = 2 -> on doit utiliser les 3 dernières parties pour la conversion 

997 column, row = b36_path_decode("/".join(parts[-(self.__storage["depth"] + 1) :])) 

998 level = parts[-(self.__storage["depth"] + 2)] 

999 raw_slab_type = parts[-(self.__storage["depth"] + 3)] 

1000 

1001 # Pour être retro compatible avec l'ancien nommage 

1002 if raw_slab_type == "IMAGE": 

1003 raw_slab_type = "DATA" 

1004 

1005 slab_type = SlabType[raw_slab_type] 

1006 

1007 return slab_type, level, column, row 

1008 else: 

1009 parts = re.split(r"[/_]", path) 

1010 column = parts[-2] 

1011 row = parts[-1] 

1012 level = parts[-3] 

1013 raw_slab_type = parts[-4] 

1014 

1015 # Pour être retro compatible avec l'ancien nommage 

1016 if raw_slab_type == "IMG": 

1017 raw_slab_type = "DATA" 

1018 elif raw_slab_type == "MSK": 

1019 raw_slab_type = "MASK" 

1020 

1021 slab_type = SlabType[raw_slab_type] 

1022 

1023 return slab_type, level, int(column), int(row) 

1024 

1025 def get_slab_path_from_infos( 

1026 self, slab_type: SlabType, level: str, column: int, row: int, full: bool = True 

1027 ) -> str: 

1028 """Get slab's storage path from the indices 

1029 

1030 Args: 

1031 slab_type (SlabType): DATA or MASK 

1032 level (str): Level identifier 

1033 column (int): Slab's column 

1034 row (int): Slab's row 

1035 full (bool, optional): Full path or just relative path from pyramid storage root. Defaults to True. 

1036 

1037 Returns: 

1038 str: Absolute or relative slab's storage path 

1039 """ 

1040 if self.__storage["type"] == StorageType.FILE: 

1041 slab_path = os.path.join( 

1042 slab_type.value, level, b36_path_encode(column, row, self.__storage["depth"]) 

1043 ) 

1044 else: 

1045 slab_path = f"{slab_type.value}_{level}_{column}_{row}" 

1046 

1047 if full: 

1048 return get_path_from_infos( 

1049 self.__storage["type"], self.__storage["root"], self.__name, slab_path 

1050 ) 

1051 else: 

1052 return slab_path 

1053 

1054 def get_tile_data_binary(self, level: str, column: int, row: int) -> str: 

1055 """Get a pyramid's tile as binary string 

1056 

1057 To get a tile, 3 steps : 

1058 * calculate slab path from tile index 

1059 * read slab index to get offsets and sizes of slab's tiles 

1060 * read the tile into the slab 

1061 

1062 Args: 

1063 level (str): Tile's level 

1064 column (int): Tile's column 

1065 row (int): Tile's row 

1066 

1067 Limitations: 

1068 Pyramids with one-tile slab are not handled 

1069 

1070 Examples: 

1071 

1072 FILE stored raster pyramid, to extract a tile containing a point and save it as independent image 

1073 

1074 from rok4.pyramid import Pyramid 

1075 

1076 try: 

1077 pyramid = Pyramid.from_descriptor("/data/pyramids/SCAN1000.json") 

1078 level, col, row, pcol, prow = pyramid.get_tile_indices(992904.46, 6733643.15, "9", srs = "IGNF:LAMB93") 

1079 data = pyramid.get_tile_data_binary(level, col, row) 

1080 

1081 if data is None: 

1082 print("No data") 

1083 else: 

1084 tile_name = f"tile_{level}_{col}_{row}.{pyramid.tile_extension}" 

1085 with open(tile_name, "wb") as image: 

1086 image.write(data) 

1087 print (f"Tile written in {tile_name}") 

1088 

1089 except Exception as e: 

1090 print("Cannot save a pyramid's tile : {e}") 

1091 

1092 Raises: 

1093 Exception: Level not found in the pyramid 

1094 NotImplementedError: Pyramid owns one-tile slabs 

1095 MissingEnvironmentError: Missing object storage informations 

1096 StorageError: Storage read issue 

1097 

1098 Returns: 

1099 str: data, as binary string, None if no data 

1100 """ 

1101 

1102 level_object = self.get_level(level) 

1103 

1104 if level_object is None: 

1105 raise Exception(f"No level {level} in the pyramid") 

1106 

1107 if level_object.slab_width == 1 and level_object.slab_height == 1: 

1108 raise NotImplementedError("One-tile slab pyramid is not handled") 

1109 

1110 if not level_object.is_in_limits(column, row): 

1111 return None 

1112 

1113 # Indices de la dalle 

1114 slab_column = column // level_object.slab_width 

1115 slab_row = row // level_object.slab_height 

1116 

1117 # Indices de la tuile dans la dalle 

1118 relative_tile_column = column % level_object.slab_width 

1119 relative_tile_row = row % level_object.slab_height 

1120 

1121 # Numéro de la tuile dans le header 

1122 tile_index = relative_tile_row * level_object.slab_width + relative_tile_column 

1123 

1124 # Calcul du chemin de la dalle contenant la tuile voulue 

1125 slab_path = self.get_slab_path_from_infos(SlabType.DATA, level, slab_column, slab_row) 

1126 

1127 # Récupération des offset et tailles des tuiles dans la dalle 

1128 # Une dalle ROK4 a une en-tête fixe de 2048 octets, 

1129 # puis sont stockés les offsets (chacun sur 4 octets) 

1130 # puis les tailles (chacune sur 4 octets) 

1131 try: 

1132 binary_index = get_data_binary( 

1133 slab_path, 

1134 ( 

1135 ROK4_IMAGE_HEADER_SIZE, 

1136 2 * 4 * level_object.slab_width * level_object.slab_height, 

1137 ), 

1138 ) 

1139 except FileNotFoundError: 

1140 # L'absence de la dalle est gérée comme simplement une absence de données 

1141 return None 

1142 

1143 offsets = numpy.frombuffer( 

1144 binary_index, 

1145 dtype=numpy.dtype("uint32"), 

1146 count=level_object.slab_width * level_object.slab_height, 

1147 ) 

1148 sizes = numpy.frombuffer( 

1149 binary_index, 

1150 dtype=numpy.dtype("uint32"), 

1151 offset=4 * level_object.slab_width * level_object.slab_height, 

1152 count=level_object.slab_width * level_object.slab_height, 

1153 ) 

1154 

1155 if sizes[tile_index] == 0: 

1156 return None 

1157 

1158 return get_data_binary(slab_path, (offsets[tile_index], sizes[tile_index])) 

1159 

1160 def get_tile_data_raster(self, level: str, column: int, row: int) -> numpy.ndarray: 

1161 """Get a raster pyramid's tile as 3-dimension numpy ndarray 

1162 

1163 First dimension is the row, second one is column, third one is band. 

1164 

1165 Args: 

1166 level (str): Tile's level 

1167 column (int): Tile's column 

1168 row (int): Tile's row 

1169 

1170 Limitations: 

1171 Packbits (pyramid formats TIFF_PKB_FLOAT32 and TIFF_PKB_UINT8) and LZW (pyramid formats TIFF_LZW_FLOAT32 and TIFF_LZW_UINT8) compressions are not handled. 

1172 

1173 Raises: 

1174 Exception: Cannot get raster data for a vector pyramid 

1175 Exception: Level not found in the pyramid 

1176 NotImplementedError: Pyramid owns one-tile slabs 

1177 NotImplementedError: Raster pyramid format not handled 

1178 MissingEnvironmentError: Missing object storage informations 

1179 StorageError: Storage read issue 

1180 FormatError: Cannot decode tile 

1181 

1182 Examples: 

1183 

1184 FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level 

1185 

1186 from rok4.pyramid import Pyramid 

1187 

1188 try: 

1189 pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") 

1190 level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") 

1191 data = pyramid.get_tile_data_raster(level, col, row) 

1192 

1193 if data is None: 

1194 print("No data") 

1195 else: 

1196 print(data[prow][pcol]) 

1197 

1198 except Exception as e: 

1199 print("Cannot get a pyramid's pixel value : {e}") 

1200 

1201 Returns: 

1202 str: data, as numpy array, None if no data 

1203 """ 

1204 

1205 if self.type == PyramidType.VECTOR: 

1206 raise Exception("Cannot get tile as raster data : it's a vector pyramid") 

1207 

1208 binary_tile = self.get_tile_data_binary(level, column, row) 

1209 

1210 if binary_tile is None: 

1211 return None 

1212 

1213 level_object = self.get_level(level) 

1214 

1215 if self.__format == "TIFF_JPG_UINT8" or self.__format == "TIFF_JPG90_UINT8": 

1216 try: 

1217 img = Image.open(io.BytesIO(binary_tile)) 

1218 except Exception as e: 

1219 raise FormatError("JPEG", "binary tile", e) 

1220 

1221 data = numpy.asarray(img) 

1222 data.shape = ( 

1223 level_object.tile_matrix.tile_size[0], 

1224 level_object.tile_matrix.tile_size[1], 

1225 self.__raster_specifications["channels"], 

1226 ) 

1227 

1228 elif self.__format == "TIFF_RAW_UINT8": 

1229 data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("uint8")) 

1230 data.shape = ( 

1231 level_object.tile_matrix.tile_size[0], 

1232 level_object.tile_matrix.tile_size[1], 

1233 self.__raster_specifications["channels"], 

1234 ) 

1235 

1236 elif self.__format == "TIFF_PNG_UINT8": 

1237 try: 

1238 img = Image.open(io.BytesIO(binary_tile)) 

1239 except Exception as e: 

1240 raise FormatError("PNG", "binary tile", e) 

1241 

1242 data = numpy.asarray(img) 

1243 data.shape = ( 

1244 level_object.tile_matrix.tile_size[0], 

1245 level_object.tile_matrix.tile_size[1], 

1246 self.__raster_specifications["channels"], 

1247 ) 

1248 

1249 elif self.__format == "TIFF_ZIP_UINT8": 

1250 try: 

1251 data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("uint8")) 

1252 except Exception as e: 

1253 raise FormatError("ZIP", "binary tile", e) 

1254 

1255 data.shape = ( 

1256 level_object.tile_matrix.tile_size[0], 

1257 level_object.tile_matrix.tile_size[1], 

1258 self.__raster_specifications["channels"], 

1259 ) 

1260 

1261 elif self.__format == "TIFF_ZIP_FLOAT32": 

1262 try: 

1263 data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("float32")) 

1264 except Exception as e: 

1265 raise FormatError("ZIP", "binary tile", e) 

1266 

1267 data.shape = ( 

1268 level_object.tile_matrix.tile_size[0], 

1269 level_object.tile_matrix.tile_size[1], 

1270 self.__raster_specifications["channels"], 

1271 ) 

1272 

1273 elif self.__format == "TIFF_RAW_FLOAT32": 

1274 data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("float32")) 

1275 data.shape = ( 

1276 level_object.tile_matrix.tile_size[0], 

1277 level_object.tile_matrix.tile_size[1], 

1278 self.__raster_specifications["channels"], 

1279 ) 

1280 

1281 else: 

1282 raise NotImplementedError(f"Cannot get tile as raster data for format {self.__format}") 

1283 

1284 return data 

1285 

1286 def get_tile_data_vector(self, level: str, column: int, row: int) -> Dict: 

1287 """Get a vector pyramid's tile as GeoJSON dictionnary 

1288 

1289 Args: 

1290 level (str): Tile's level 

1291 column (int): Tile's column 

1292 row (int): Tile's row 

1293 

1294 Raises: 

1295 Exception: Cannot get vector data for a raster pyramid 

1296 Exception: Level not found in the pyramid 

1297 NotImplementedError: Pyramid owns one-tile slabs 

1298 NotImplementedError: Vector pyramid format not handled 

1299 MissingEnvironmentError: Missing object storage informations 

1300 StorageError: Storage read issue 

1301 FormatError: Cannot decode tile 

1302 

1303 Examples: 

1304 

1305 S3 stored vector pyramid, to print a tile as GeoJSON 

1306 

1307 from rok4.pyramid import Pyramid 

1308 

1309 import json 

1310 

1311 try: 

1312 pyramid = Pyramid.from_descriptor("s3://pyramids/vectors/BDTOPO.json") 

1313 level, col, row, pcol, prow = pyramid.get_tile_indices(40.325, 3.123, srs = "EPSG:4326") 

1314 data = pyramid.get_tile_data_vector(level, col, row) 

1315 

1316 if data is None: 

1317 print("No data") 

1318 else: 

1319 print(json.dumps(data)) 

1320 

1321 except Exception as e: 

1322 print("Cannot print a vector pyramid's tile as GeoJSON : {e}") 

1323 

1324 Returns: 

1325 str: data, as GeoJSON dictionnary. None if no data 

1326 """ 

1327 

1328 if self.type == PyramidType.RASTER: 

1329 raise Exception("Cannot get tile as vector data : it's a raster pyramid") 

1330 

1331 binary_tile = self.get_tile_data_binary(level, column, row) 

1332 

1333 if binary_tile is None: 

1334 return None 

1335 

1336 self.get_level(level) 

1337 

1338 if self.__format == "TIFF_PBF_MVT": 

1339 try: 

1340 data = mapbox_vector_tile.decode(binary_tile) 

1341 except Exception as e: 

1342 raise FormatError("PBF (MVT)", "binary tile", e) 

1343 else: 

1344 raise NotImplementedError(f"Cannot get tile as vector data for format {self.__format}") 

1345 

1346 return data 

1347 

1348 def get_tile_indices( 

1349 self, x: float, y: float, level: str = None, **kwargs 

1350 ) -> Tuple[str, int, int, int, int]: 

1351 """Get pyramid's tile and pixel indices from point's coordinates 

1352 

1353 Used coordinates system have to be the pyramid one. If EPSG:4326, x is latitude and y longitude. 

1354 

1355 Args: 

1356 x (float): point's x 

1357 y (float): point's y 

1358 level (str, optional): Pyramid's level to take into account, the bottom one if None . Defaults to None. 

1359 **srs (string): spatial reference system of provided coordinates, with authority and code (same as the pyramid's one if not provided) 

1360 

1361 Raises: 

1362 Exception: Cannot find level to calculate indices 

1363 RuntimeError: Provided SRS is invalid for OSR 

1364 

1365 Examples: 

1366 

1367 FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level 

1368 

1369 from rok4.pyramid import Pyramid 

1370 

1371 try: 

1372 pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") 

1373 level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") 

1374 data = pyramid.get_tile_data_raster(level, col, row) 

1375 

1376 if data is None: 

1377 print("No data") 

1378 else: 

1379 print(data[prow][pcol]) 

1380 

1381 except Exception as e: 

1382 print("Cannot get a pyramid's pixel value : {e}") 

1383 

1384 Returns: 

1385 Tuple[str, int, int, int, int]: Level identifier, tile's column, tile's row, pixel's (in the tile) column, pixel's row 

1386 """ 

1387 

1388 level_object = self.bottom_level 

1389 if level is not None: 

1390 level_object = self.get_level(level) 

1391 

1392 if level_object is None: 

1393 raise Exception("Cannot found the level to calculate indices") 

1394 

1395 if ( 

1396 "srs" in kwargs 

1397 and kwargs["srs"] is not None 

1398 and kwargs["srs"].upper() != self.__tms.srs.upper() 

1399 ): 

1400 sr = srs_to_spatialreference(kwargs["srs"]) 

1401 x, y = reproject_point((x, y), sr, self.__tms.sr) 

1402 

1403 return (level_object.id,) + level_object.tile_matrix.point_to_indices(x, y) 

1404 

1405 def delete_level(self, level_id: str) -> None: 

1406 """Delete the given level in the description of the pyramid 

1407 

1408 Args: 

1409 level_id: Level identifier 

1410 

1411 Raises: 

1412 Exception: Cannot find level 

1413 """ 

1414 

1415 try: 

1416 del self.__levels[level_id] 

1417 except Exception: 

1418 raise Exception(f"The level {level_id} don't exist in the pyramid") 

1419 

1420 def add_level( 

1421 self, 

1422 level_id: str, 

1423 tiles_per_width: int, 

1424 tiles_per_height: int, 

1425 tile_limits: Dict[str, int], 

1426 ) -> None: 

1427 """Add a level in the description of the pyramid 

1428 

1429 Args: 

1430 level_id: Level identifier 

1431 tiles_per_width : Number of tiles in width by slab 

1432 tiles_per_height : Number of tiles in height by slab 

1433 tile_limits : Minimum and maximum tiles' columns and rows of pyramid's content 

1434 """ 

1435 

1436 data = { 

1437 "id": level_id, 

1438 "tile_limits": tile_limits, 

1439 "tiles_per_width": tiles_per_width, 

1440 "tiles_per_height": tiles_per_height, 

1441 "storage": {"type": self.storage_type.name}, 

1442 } 

1443 if self.own_masks: 

1444 data["storage"]["mask_prefix"] = True 

1445 if self.storage_type == StorageType.FILE: 

1446 data["storage"]["path_depth"] = self.storage_depth 

1447 

1448 lev = Level.from_descriptor(data, self) 

1449 

1450 if self.__tms.get_level(lev.id) is None: 

1451 raise Exception( 

1452 f"Pyramid {self.name} owns a level with the ID '{lev.id}', not defined in the TMS '{self.tms.name}'" 

1453 ) 

1454 else: 

1455 self.__levels[lev.id] = lev 

1456 

1457 @property 

1458 def size(self) -> int: 

1459 """Get the size of the pyramid 

1460 

1461 Examples: 

1462 

1463 from rok4.pyramid import Pyramid 

1464 

1465 try: 

1466 pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") 

1467 size = pyramid.size() 

1468 

1469 except Exception as e: 

1470 print("Cannot load the pyramid from its descriptor and get his size") 

1471 

1472 Returns: 

1473 int: size of the pyramid 

1474 """ 

1475 

1476 if not hasattr(self, "_Pyramid__size"): 

1477 self.__size = size_path( 

1478 get_path_from_infos(self.__storage["type"], self.__storage["root"], self.__name) 

1479 ) 

1480 

1481 return self.__size