Module rok4.Pyramid
Provide classes to use pyramid's data.
The module contains the following classes:
Expand source code
"""Provide classes to use pyramid's data.
The module contains the following classes:
- `Pyramid` - Data container
- `Level` - Level of a pyramid
"""
from typing import Dict, List, Tuple, Union, Iterator
import json
from json.decoder import JSONDecodeError
import os
import re
import numpy
import zlib
import io
import mapbox_vector_tile
from PIL import Image
from rok4.Exceptions import *
from rok4.TileMatrixSet import TileMatrixSet, TileMatrix
from rok4.Storage import *
from rok4.Utils import *
class PyramidType(Enum):
"""Pyramid's data type"""
RASTER = "RASTER"
VECTOR = "VECTOR"
class SlabType(Enum):
"""Slab's type"""
DATA = "DATA" # Slab of data, raster or vector
MASK = "MASK" # Slab of mask, only for raster pyramid, image with one band : 0 is nodata, other values are data
ROK4_IMAGE_HEADER_SIZE = 2048
"""Slab's header size, 2048 bytes"""
def b36_number_encode(number: int) -> str:
"""Convert base-10 number to base-36
Used alphabet is '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
Args:
number (int): base-10 number
Returns:
str: base-36 number
"""
alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
base36 = ""
if 0 <= number < len(alphabet):
return alphabet[number]
while number != 0:
number, i = divmod(number, len(alphabet))
base36 = alphabet[i] + base36
return base36
def b36_number_decode(number: str) -> int:
"""Convert base-36 number to base-10
Args:
number (str): base-36 number
Returns:
int: base-10 number
"""
return int(number, 36)
def b36_path_decode(path: str) -> Tuple[int, int]:
"""Get slab's column and row from a base-36 based path
Args:
path (str): slab's path
Returns:
Tuple[int, int]: slab's column and row
"""
path = path.replace("/", "")
path = re.sub(r"(\.TIFF?)", "", path.upper())
b36_column = ""
b36_row = ""
while len(path) > 0:
b36_column += path[0]
b36_row += path[1]
path = path[2:]
return b36_number_decode(b36_column), b36_number_decode(b36_row)
def b36_path_encode(column: int, row: int, slashs: int) -> str:
"""Convert slab indices to base-36 based path, with .tif extension
Args:
column (int): slab's column
row (int): slab's row
slashs (int): slashs' number (to split path)
Returns:
str: base-36 based path
"""
b36_column = b36_number_encode(column)
b36_row = b36_number_encode(row)
max_len = max(slashs + 1, len(b36_column), len(b36_row))
b36_column = b36_column.rjust(max_len, "0")
b36_row = b36_row.rjust(max_len, "0")
b36_path = ""
while len(b36_column) > 0:
b36_path = b36_row[-1] + b36_path
b36_path = b36_column[-1] + b36_path
b36_column = b36_column[:-1]
b36_row = b36_row[:-1]
if slashs > 0:
b36_path = "/" + b36_path
slashs -= 1
return f"{b36_path}.tif"
class Level:
"""A pyramid's level, raster or vector
Attributes:
__id (str): level's identifier. have to exist in the pyramid's used TMS
__tile_limits (Dict[str, int]): minimum and maximum tiles' columns and rows of pyramid's content
__slab_size (Tuple[int, int]): number of tile in a slab, widthwise and heightwise
__tables (List[Dict]): for a VECTOR pyramid, description of vector content, tables and attributes
"""
@classmethod
def from_descriptor(cls, data: Dict, pyramid: "Pyramid") -> "Level":
"""Create a pyramid's level from the pyramid's descriptor levels element
Args:
data (Dict): level's information from the pyramid's descriptor
pyramid (Pyramid): pyramid containing the level to create
Raises:
Exception: different storage or masks presence between the level and the pyramid
MissingAttributeError: Attribute is missing in the content
Returns:
Pyramid: a Level instance
"""
level = cls()
level.__pyramid = pyramid
# Attributs communs
try:
level.__id = data["id"]
level.__tile_limits = data["tile_limits"]
level.__slab_size = (
data["tiles_per_width"],
data["tiles_per_height"],
)
# Informations sur le stockage : on les valide et stocke dans la pyramide
if pyramid.storage_type.name != data["storage"]["type"]:
raise Exception(
f"Pyramid {pyramid.descriptor} owns levels using different storage types ({ data['storage']['type'] }) than its one ({pyramid.storage_type.name})"
)
if pyramid.storage_type == StorageType.FILE:
pyramid.storage_depth = data["storage"]["path_depth"]
if "mask_directory" in data["storage"] or "mask_prefix" in data["storage"]:
if not pyramid.own_masks:
raise Exception(
f"Pyramid {pyramid.__descriptor} does not define a mask format but level {level.__id} define mask storage informations"
)
else:
if pyramid.own_masks:
raise Exception(
f"Pyramid {pyramid.__descriptor} define a mask format but level {level.__id} does not define mask storage informations"
)
except KeyError as e:
raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}")
# Attributs dans le cas d'un niveau vecteur
if level.__pyramid.type == PyramidType.VECTOR:
try:
level.__tables = data["tables"]
except KeyError as e:
raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}")
return level
@classmethod
def from_other(cls, other: "Level", pyramid: "Pyramid") -> "Level":
"""Create a pyramid's level from another one
Args:
other (Level): level to clone
pyramid (Pyramid): new pyramid containing the new level
Raises:
Exception: different storage or masks presence between the level and the pyramid
MissingAttributeError: Attribute is missing in the content
Returns:
Pyramid: a Level instance
"""
level = cls()
# Attributs communs
level.__id = other.__id
level.__pyramid = pyramid
level.__tile_limits = other.__tile_limits
level.__slab_size = other.__slab_size
# Attributs dans le cas d'un niveau vecteur
if level.__pyramid.type == PyramidType.VECTOR:
level.__tables = other.__tables
return level
def __str__(self) -> str:
return f"{self.__pyramid.type.name} pyramid's level '{self.__id}' ({self.__pyramid.storage_type.name} storage)"
@property
def serializable(self) -> Dict:
"""Get the dict version of the pyramid object, pyramid's descriptor compliant
Returns:
Dict: pyramid's descriptor structured object description
"""
serialization = {
"id": self.__id,
"tiles_per_width": self.__slab_size[0],
"tiles_per_height": self.__slab_size[1],
"tile_limits": self.__tile_limits,
}
if self.__pyramid.type == PyramidType.VECTOR:
serialization["tables"] = self.__tables
if self.__pyramid.storage_type == StorageType.FILE:
serialization["storage"] = {
"type": "FILE",
"image_directory": f"{self.__pyramid.name}/DATA/{self.__id}",
"path_depth": self.__pyramid.storage_depth,
}
if self.__pyramid.own_masks:
serialization["storage"][
"mask_directory"
] = f"{self.__pyramid.name}/MASK/{self.__id}"
elif self.__pyramid.storage_type == StorageType.CEPH:
serialization["storage"] = {
"type": "CEPH",
"image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}",
"pool_name": self.__pyramid.storage_root,
}
if self.__pyramid.own_masks:
serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}"
elif self.__pyramid.storage_type == StorageType.S3:
serialization["storage"] = {
"type": "S3",
"image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}",
"bucket_name": self.__pyramid.storage_root,
}
if self.__pyramid.own_masks:
serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}"
return serialization
@property
def id(self) -> str:
return self.__id
@property
def bbox(self) -> Tuple[float, float, float, float]:
"""Return level extent, based on tile limits
Returns:
Tuple[float, float, float, float]: level terrain extent (xmin, ymin, xmax, ymax)
"""
min_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox(
self.__tile_limits["min_col"], self.__tile_limits["max_row"]
)
max_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox(
self.__tile_limits["max_col"], self.__tile_limits["min_row"]
)
return (min_bbox[0], min_bbox[1], max_bbox[2], max_bbox[3])
@property
def resolution(self) -> str:
return self.__pyramid.tms.get_level(self.__id).resolution
@property
def tile_matrix(self) -> TileMatrix:
return self.__pyramid.tms.get_level(self.__id)
@property
def slab_width(self) -> int:
return self.__slab_size[0]
@property
def slab_height(self) -> int:
return self.__slab_size[1]
def is_in_limits(self, column: int, row: int) -> bool:
"""Is the tile indices in limits ?
Args:
column (int): tile's column
row (int): tile's row
Returns:
bool: True if tiles' limits contain the provided tile's indices
"""
return (
self.__tile_limits["min_row"] <= row
and self.__tile_limits["max_row"] >= row
and self.__tile_limits["min_col"] <= column
and self.__tile_limits["max_col"] >= column
)
def set_limits_from_bbox(self, bbox: Tuple[float, float, float, float]) -> None:
"""Set tile limits, based on provided bounding box
Args:
bbox (Tuple[float, float, float, float]): terrain extent (xmin, ymin, xmax, ymax), in TMS coordinates system
"""
col_min, row_min, col_max, row_max = self.__pyramid.tms.get_level(self.__id).bbox_to_tiles(
bbox
)
self.__tile_limits = {
"min_row": row_min,
"max_col": col_max,
"max_row": row_max,
"min_col": col_min,
}
class Pyramid:
"""A data pyramid, raster or vector
Attributes:
__name (str): pyramid's name
__descriptor (str): pyramid's descriptor path
__list (str): pyramid's list path
__tms (rok4.TileMatrixSet.TileMatrixSet): Used grid
__levels (Dict[str, Level]): Pyramid's levels
__format (str): Data format
__storage (Dict[str, Union[rok4.Storage.StorageType,str,int]]): Pyramid's storage informations (type, root and depth if FILE storage)
__raster_specifications (Dict): If raster pyramid, raster specifications
__content (Dict): Loading status (loaded) and list content (cache).
Example (S3 storage):
{
'cache': {
(<SlabType.DATA: 'DATA'>, '18', 5424, 7526): {
'link': False,
'md5': None,
'root': 'pyramids@localhost:9000/LIMADM',
'slab': 'DATA_18_5424_7526'
}
},
'loaded': True
}
"""
@classmethod
def from_descriptor(cls, descriptor: str) -> "Pyramid":
"""Create a pyramid from its descriptor
Args:
descriptor (str): pyramid's descriptor path
Raises:
FormatError: Provided path or the TMS is not a well formed JSON
Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS
MissingAttributeError: Attribute is missing in the content
StorageError: Storage read issue (pyramid descriptor or TMS)
MissingEnvironmentError: Missing object storage informations or TMS root directory
Examples:
S3 stored descriptor
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json")
except Exception as e:
print("Cannot load the pyramid from its descriptor")
Returns:
Pyramid: a Pyramid instance
"""
try:
data = json.loads(get_data_str(descriptor))
except JSONDecodeError as e:
raise FormatError("JSON", descriptor, e)
pyramid = cls()
pyramid.__storage["type"], path, pyramid.__storage["root"], base_name = get_infos_from_path(
descriptor
)
pyramid.__name = base_name[:-5] # on supprime l'extension.json
pyramid.__descriptor = descriptor
pyramid.__list = get_path_from_infos(
pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list"
)
try:
# Attributs communs
pyramid.__tms = TileMatrixSet(data["tile_matrix_set"])
pyramid.__format = data["format"]
# Attributs d'une pyramide raster
if pyramid.type == PyramidType.RASTER:
pyramid.__raster_specifications = data["raster_specifications"]
if "mask_format" in data:
pyramid.__masks = True
else:
pyramid.__masks = False
# Niveaux
for l in data["levels"]:
lev = Level.from_descriptor(l, pyramid)
pyramid.__levels[lev.id] = lev
if pyramid.__tms.get_level(lev.id) is None:
raise Exception(
f"Pyramid {descriptor} owns a level with the ID '{lev.id}', not defined in the TMS '{pyramid.tms.name}'"
)
except KeyError as e:
raise MissingAttributeError(descriptor, e)
if len(pyramid.__levels.keys()) == 0:
raise Exception(f"Pyramid '{descriptor}' has no level")
return pyramid
@classmethod
def from_other(cls, other: "Pyramid", name: str, storage: Dict) -> "Pyramid":
"""Create a pyramid from another one
Args:
other (Pyramid): pyramid to clone
name (str): new pyramid's name
storage (Dict[str, Union[str, int]]): new pyramid's storage informations
Raises:
FormatError: Provided path or the TMS is not a well formed JSON
Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS
MissingAttributeError: Attribute is missing in the content
Returns:
Pyramid: a Pyramid instance
"""
try:
# On convertit le type de stockage selon l'énumération
storage["type"] = StorageType[storage["type"]]
if storage["type"] == StorageType.FILE and name.find("/") != -1:
raise Exception(f"A FILE stored pyramid's name cannot contain '/' : '{name}'")
if storage["type"] == StorageType.FILE and "depth" not in storage:
storage["depth"] = 2
pyramid = cls()
# Attributs communs
pyramid.__name = name
pyramid.__storage = storage
pyramid.__masks = other.__masks
pyramid.__descriptor = get_path_from_infos(
pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.json"
)
pyramid.__list = get_path_from_infos(
pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list"
)
pyramid.__tms = other.__tms
pyramid.__format = other.__format
# Attributs d'une pyramide raster
if pyramid.type == PyramidType.RASTER:
if other.own_masks:
pyramid.__masks = True
else:
pyramid.__masks = False
pyramid.__raster_specifications = other.__raster_specifications
# Niveaux
for l in other.__levels.values():
lev = Level.from_other(l, pyramid)
pyramid.__levels[lev.id] = lev
except KeyError as e:
raise MissingAttributeError(descriptor, e)
return pyramid
def __init__(self) -> None:
self.__storage = {}
self.__levels = {}
self.__masks = None
self.__content = {"loaded": False, "cache": {}}
def __str__(self) -> str:
return f"{self.type.name} pyramid '{self.__name}' ({self.__storage['type'].name} storage)"
@property
def serializable(self) -> Dict:
"""Get the dict version of the pyramid object, descriptor compliant
Returns:
Dict: descriptor structured object description
"""
serialization = {
"tile_matrix_set": self.__tms.name,
"format": self.__format
}
serialization["levels"] = []
sorted_levels = sorted(self.__levels.values(), key=lambda l: l.resolution, reverse=True)
for l in sorted_levels:
serialization["levels"].append(l.serializable)
if self.type == PyramidType.RASTER:
serialization["raster_specifications"] = self.__raster_specifications
if self.__masks:
serialization["mask_format"] = "TIFF_ZIP_UINT8"
return serialization
@property
def list(self) -> str:
return self.__list
@property
def descriptor(self) -> str:
return self.__descriptor
@property
def name(self) -> str:
return self.__name
@property
def tms(self) -> TileMatrixSet:
return self.__tms
@property
def raster_specifications(self) -> Dict:
"""Get raster specifications for a RASTER pyramid
Example:
{
"channels": 3,
"nodata": "255,0,0",
"photometric": "rgb",
"interpolation": "bicubic"
}
Returns:
Dict: Raster specifications, None if VECTOR pyramid
"""
return self.__raster_specifications
@property
def storage_type(self) -> StorageType:
"""Get the storage type
Returns:
StorageType: FILE, S3 or CEPH
"""
return self.__storage["type"]
@property
def storage_root(self) -> str:
"""Get the pyramid's storage root.
If storage is S3, the used cluster is removed.
Returns:
str: Pyramid's storage root
"""
return self.__storage["root"].split("@", 1)[
0
] # Suppression de l'éventuel hôte de spécification du cluster S3
@property
def storage_depth(self) -> int:
return self.__storage.get("depth", None)
@property
def storage_s3_cluster(self) -> str:
"""Get the pyramid's storage S3 cluster (host name)
Returns:
str: the host if known, None if the default one have to be used or if storage is not S3
"""
if self.__storage["type"] == StorageType.S3:
try:
return self.__storage["root"].split("@")[1]
except IndexError:
return None
else:
return None
@storage_depth.setter
def storage_depth(self, d: int) -> None:
"""Set the tree depth for a FILE storage
Args:
d (int): file storage depth
Raises:
Exception: the depth is not equal to the already known depth
"""
if "depth" in self.__storage and self.__storage["depth"] != d:
raise Exception(
f"Pyramid {pyramid.__descriptor} owns levels with different path depths"
)
self.__storage["depth"] = d
@property
def own_masks(self) -> bool:
return self.__masks
@property
def format(self) -> str:
return self.__format
@property
def tile_extension(self) -> str:
if self.__format in [
"TIFF_RAW_UINT8",
"TIFF_LZW_UINT8",
"TIFF_ZIP_UINT8",
"TIFF_PKB_UINT8",
"TIFF_RAW_FLOAT32",
"TIFF_LZW_FLOAT32",
"TIFF_ZIP_FLOAT32",
"TIFF_PKB_FLOAT32",
]:
return "tif"
elif self.__format in ["TIFF_JPG_UINT8", "TIFF_JPG90_UINT8"]:
return "jpg"
elif self.__format == "TIFF_PNG_UINT8":
return "png"
elif self.__format == "TIFF_PBF_MVT":
return "pbf"
else:
raise Exception(
f"Unknown pyramid's format ({self.__format}), cannot return the tile extension"
)
@property
def bottom_level(self) -> "Level":
"""Get the best resolution level in the pyramid
Returns:
Level: the bottom level
"""
return sorted(self.__levels.values(), key=lambda l: l.resolution)[0]
@property
def top_level(self) -> "Level":
"""Get the low resolution level in the pyramid
Returns:
Level: the top level
"""
return sorted(self.__levels.values(), key=lambda l: l.resolution)[-1]
@property
def type(self) -> PyramidType:
"""Get the pyramid's type (RASTER or VECTOR) from its format
Returns:
PyramidType: RASTER or VECTOR
"""
if self.__format == "TIFF_PBF_MVT":
return PyramidType.VECTOR
else:
return PyramidType.RASTER
def load_list(self) -> None:
"""Load list content and cache it
If list is already loaded, nothing done
"""
if self.__content["loaded"]:
return
for slab, infos in self.list_generator():
self.__content["cache"][slab] = infos
self.__content["loaded"] = True
def list_generator(self) -> Iterator[Tuple[Tuple[SlabType, str, int, int], Dict]]:
"""Get list content
List is copied as temporary file, roots are read and informations about each slab is returned. If list is already loaded, we yield the cached content
Examples:
S3 stored descriptor
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json")
for (slab_type, level, column, row), infos in pyramid.list_generator():
print(infos)
except Exception as e:
print("Cannot load the pyramid from its descriptor and read the list")
Yields:
Iterator[Tuple[Tuple[SlabType,str,int,int], Dict]]: Slab indices and storage informations
Value example:
(
(<SlabType.DATA: 'DATA'>, '18', 5424, 7526),
{
'link': False,
'md5': None,
'root': 'pyramids@localhost:9000/LIMADM',
'slab': 'DATA_18_5424_7526'
}
)
"""
if self.__content["loaded"]:
for slab, infos in self.__content["cache"].items():
yield slab, infos
else:
# Copie de la liste dans un fichier temporaire (cette liste peut être un objet)
list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
list_file = list_obj.name
copy(self.__list, f"file://{list_file}")
list_obj.close()
roots = {}
s3_cluster = self.storage_s3_cluster
with open(list_file, "r") as listin:
# Lecture des racines
for line in listin:
line = line.rstrip()
if line == "#":
break
root_id, root_path = line.split("=", 1)
if s3_cluster is None:
roots[root_id] = root_path
else:
# On a un nom de cluster S3, on l'ajoute au nom du bucket dans les racines
root_bucket, root_path = root_path.split("/", 1)
roots[root_id] = f"{root_bucket}@{s3_cluster}/{root_path}"
# Lecture des dalles
for line in listin:
line = line.rstrip()
parts = line.split(" ", 1)
slab_path = parts[0]
slab_md5 = None
if len(parts) == 2:
slab_md5 = parts[1]
root_id, slab_path = slab_path.split("/", 1)
slab_type, level, column, row = self.get_infos_from_slab_path(slab_path)
infos = {
"root": roots[root_id],
"link": root_id != "0",
"slab": slab_path,
"md5": slab_md5,
}
yield ((slab_type, level, column, row), infos)
remove(f"file://{list_file}")
def get_level(self, level_id: str) -> "Level":
"""Get one level according to its identifier
Args:
level_id: Level identifier
Returns:
The corresponding pyramid's level, None if not present
"""
return self.__levels.get(level_id, None)
def get_levels(self, bottom_id: str = None, top_id: str = None) -> List[Level]:
"""Get sorted levels in the provided range from bottom to top
Args:
bottom_id (str, optionnal): specific bottom level id. Defaults to None.
top_id (str, optionnal): specific top level id. Defaults to None.
Raises:
Exception: Provided levels are not consistent (bottom > top or not in the pyramid)
Examples:
All levels
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json")
levels = pyramid.get_levels()
except Exception as e:
print("Cannot load the pyramid from its descriptor and get levels")
From pyramid's bottom to provided top (level 5)
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json")
levels = pyramid.get_levels(None, "5")
except Exception as e:
print("Cannot load the pyramid from its descriptor and get levels")
Returns:
List[Level]: asked sorted levels
"""
sorted_levels = sorted(self.__levels.values(), key=lambda l: l.resolution)
levels = []
begin = False
if bottom_id is None:
# Pas de niveau du bas fourni, on commence tout en bas
begin = True
else:
if self.get_level(bottom_id) is None:
raise Exception(
f"Pyramid {self.name} does not contain the provided bottom level {bottom_id}"
)
if top_id is not None and self.get_level(top_id) is None:
raise Exception(f"Pyramid {self.name} does not contain the provided top level {top_id}")
end = False
for l in sorted_levels:
if not begin and l.id == bottom_id:
begin = True
if begin:
levels.append(l)
if top_id is not None and l.id == top_id:
end = True
break
else:
continue
if top_id is None:
# Pas de niveau du haut fourni, on a été jusqu'en haut et c'est normal
end = True
if not begin or not end:
raise Exception(
f"Provided levels ids are not consistent to extract levels from the pyramid {self.name}"
)
return levels
def write_descriptor(self) -> None:
"""Write the pyramid's descriptor to the final location (in the pyramid's storage root)"""
content = json.dumps(self.serializable)
put_data_str(content, self.__descriptor)
def get_infos_from_slab_path(self, path: str) -> Tuple[SlabType, str, int, int]:
"""Get the slab's indices from its storage path
Args:
path (str): Slab's storage path
Examples:
FILE stored pyramid
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("/path/to/descriptor.json")
slab_type, level, column, row = self.get_infos_from_slab_path("DATA/12/00/4A/F7.tif")
# (SlabType.DATA, "12", 159, 367)
except Exception as e:
print("Cannot load the pyramid from its descriptor and convert a slab path")
S3 stored pyramid
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/pyramid.json")
slab_type, level, column, row = self.get_infos_from_slab_path("s3://bucket_name/path/to/pyramid/MASK_15_9164_5846")
# (SlabType.MASK, "15", 9164, 5846)
except Exception as e:
print("Cannot load the pyramid from its descriptor and convert a slab path")
Returns:
Tuple[SlabType, str, int, int]: Slab's type (DATA or MASK), level identifier, slab's column and slab's row
"""
if self.__storage["type"] == StorageType.FILE:
parts = path.split("/")
# Le partie du chemin qui contient la colonne et ligne de la dalle est à la fin, en fonction de la profondeur choisie
# depth = 2 -> on doit utiliser les 3 dernières parties pour la conversion
column, row = b36_path_decode("/".join(parts[-(self.__storage["depth"] + 1) :]))
level = parts[-(self.__storage["depth"] + 2)]
raw_slab_type = parts[-(self.__storage["depth"] + 3)]
# Pour être retro compatible avec l'ancien nommage
if raw_slab_type == "IMAGE":
raw_slab_type = "DATA"
slab_type = SlabType[raw_slab_type]
return slab_type, level, column, row
else:
parts = re.split(r"[/_]", path)
column = parts[-2]
row = parts[-1]
level = parts[-3]
raw_slab_type = parts[-4]
# Pour être retro compatible avec l'ancien nommage
if raw_slab_type == "IMG":
raw_slab_type = "DATA"
elif raw_slab_type == "MSK":
raw_slab_type = "MASK"
slab_type = SlabType[raw_slab_type]
return slab_type, level, int(column), int(row)
def get_slab_path_from_infos(
self, slab_type: SlabType, level: str, column: int, row: int, full: bool = True
) -> str:
"""Get slab's storage path from the indices
Args:
slab_type (SlabType): DATA or MASK
level (str): Level identifier
column (int): Slab's column
row (int): Slab's row
full (bool, optional): Full path or just relative path from pyramid storage root. Defaults to True.
Returns:
str: Absolute or relative slab's storage path
"""
if self.__storage["type"] == StorageType.FILE:
slab_path = os.path.join(
slab_type.value, level, b36_path_encode(column, row, self.__storage["depth"])
)
else:
slab_path = f"{slab_type.value}_{level}_{column}_{row}"
if full:
return get_path_from_infos(
self.__storage["type"], self.__storage["root"], self.__name, slab_path
)
else:
return slab_path
def get_tile_data_binary(self, level: str, column: int, row: int) -> str:
"""Get a pyramid's tile as binary string
To get a tile, 3 steps :
* calculate slab path from tile index
* read slab index to get offsets and sizes of slab's tiles
* read the tile into the slab
Args:
level (str): Tile's level
column (int): Tile's column
row (int): Tile's row
Limitations:
Pyramids with one-tile slab are not handled
Examples:
FILE stored raster pyramid, to extract a tile containing a point and save it as independent image
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("/data/pyramids/SCAN1000.json")
level, col, row, pcol, prow = pyramid.get_tile_indices(992904.46, 6733643.15, "9", srs = "IGNF:LAMB93")
data = pyramid.get_tile_data_binary(level, col, row)
if data is None:
print("No data")
else:
tile_name = f"tile_{level}_{col}_{row}.{pyramid.tile_extension}"
with open(tile_name, "wb") as image:
image.write(data)
print (f"Tile written in {tile_name}")
except Exception as e:
print("Cannot save a pyramid's tile : {e}")
Raises:
Exception: Level not found in the pyramid
NotImplementedError: Pyramid owns one-tile slabs
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
Returns:
str: data, as binary string, None if no data
"""
level_object = self.get_level(level)
if level_object is None:
raise Exception(f"No level {level} in the pyramid")
if level_object.slab_width == 1 and level_object.slab_height == 1:
raise NotImplementedError(f"One-tile slab pyramid is not handled")
if not level_object.is_in_limits(column, row):
return None
# Indices de la dalle
slab_column = column // level_object.slab_width
slab_row = row // level_object.slab_height
# Indices de la tuile dans la dalle
relative_tile_column = column % level_object.slab_width
relative_tile_row = row % level_object.slab_height
# Numéro de la tuile dans le header
tile_index = relative_tile_row * level_object.slab_width + relative_tile_column
# Calcul du chemin de la dalle contenant la tuile voulue
slab_path = self.get_slab_path_from_infos(SlabType.DATA, level, slab_column, slab_row)
# Récupération des offset et tailles des tuiles dans la dalle
# Une dalle ROK4 a une en-tête fixe de 2048 octets,
# puis sont stockés les offsets (chacun sur 4 octets)
# puis les tailles (chacune sur 4 octets)
try:
binary_index = get_data_binary(
slab_path,
(
ROK4_IMAGE_HEADER_SIZE,
2 * 4 * level_object.slab_width * level_object.slab_height,
),
)
except FileNotFoundError as e:
# L'absence de la dalle est gérée comme simplement une absence de données
return None
offsets = numpy.frombuffer(
binary_index,
dtype=numpy.dtype("uint32"),
count=level_object.slab_width * level_object.slab_height,
)
sizes = numpy.frombuffer(
binary_index,
dtype=numpy.dtype("uint32"),
offset=4 * level_object.slab_width * level_object.slab_height,
count=level_object.slab_width * level_object.slab_height,
)
if sizes[tile_index] == 0:
return None
return get_data_binary(slab_path, (offsets[tile_index], sizes[tile_index]))
def get_tile_data_raster(self, level: str, column: int, row: int) -> numpy.ndarray:
"""Get a raster pyramid's tile as 3-dimension numpy ndarray
First dimension is the row, second one is column, third one is band.
Args:
level (str): Tile's level
column (int): Tile's column
row (int): Tile's row
Limitations:
Packbits (pyramid formats TIFF_PKB_FLOAT32 and TIFF_PKB_UINT8) and LZW (pyramid formats TIFF_LZW_FLOAT32 and TIFF_LZW_UINT8) compressions are not handled.
Raises:
Exception: Cannot get raster data for a vector pyramid
Exception: Level not found in the pyramid
NotImplementedError: Pyramid owns one-tile slabs
NotImplementedError: Raster pyramid format not handled
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
FormatError: Cannot decode tile
Examples:
FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json")
level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326")
data = pyramid.get_tile_data_raster(level, col, row)
if data is None:
print("No data")
else:
print(data[prow][pcol])
except Exception as e:
print("Cannot get a pyramid's pixel value : {e}")
Returns:
str: data, as numpy array, None if no data
"""
if self.type == PyramidType.VECTOR:
raise Exception("Cannot get tile as raster data : it's a vector pyramid")
binary_tile = self.get_tile_data_binary(level, column, row)
if binary_tile is None:
return None
level_object = self.get_level(level)
if self.__format == "TIFF_JPG_UINT8" or self.__format == "TIFF_JPG90_UINT8":
try:
img = Image.open(io.BytesIO(binary_tile))
except Exception as e:
raise FormatError("JPEG", "binary tile", e)
data = numpy.asarray(img)
elif self.__format == "TIFF_RAW_UINT8":
data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("uint8"))
data.shape = (
level_object.tile_matrix.tile_size[0],
level_object.tile_matrix.tile_size[1],
self.__raster_specifications["channels"],
)
elif self.__format == "TIFF_PNG_UINT8":
try:
img = Image.open(io.BytesIO(binary_tile))
except Exception as e:
raise FormatError("PNG", "binary tile", e)
data = numpy.asarray(img)
elif self.__format == "TIFF_ZIP_UINT8":
try:
data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("uint8"))
except Exception as e:
raise FormatError("ZIP", "binary tile", e)
data.shape = (
level_object.tile_matrix.tile_size[0],
level_object.tile_matrix.tile_size[1],
self.__raster_specifications["channels"],
)
elif self.__format == "TIFF_ZIP_FLOAT32":
try:
data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("float32"))
except Exception as e:
raise FormatError("ZIP", "binary tile", e)
data.shape = (
level_object.tile_matrix.tile_size[0],
level_object.tile_matrix.tile_size[1],
self.__raster_specifications["channels"],
)
elif self.__format == "TIFF_RAW_FLOAT32":
data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("float32"))
data.shape = (
level_object.tile_matrix.tile_size[0],
level_object.tile_matrix.tile_size[1],
self.__raster_specifications["channels"],
)
else:
raise NotImplementedError(f"Cannot get tile as raster data for format {self.__format}")
return data
def get_tile_data_vector(self, level: str, column: int, row: int) -> Dict:
"""Get a vector pyramid's tile as GeoJSON dictionnary
Args:
level (str): Tile's level
column (int): Tile's column
row (int): Tile's row
Raises:
Exception: Cannot get vector data for a raster pyramid
Exception: Level not found in the pyramid
NotImplementedError: Pyramid owns one-tile slabs
NotImplementedError: Vector pyramid format not handled
MissingEnvironmentError: Missing object storage informations
StorageError: Storage read issue
FormatError: Cannot decode tile
Examples:
S3 stored vector pyramid, to print a tile as GeoJSON
from rok4.Pyramid import Pyramid
import json
try:
pyramid = Pyramid.from_descriptor("s3://pyramids/vectors/BDTOPO.json")
level, col, row, pcol, prow = pyramid.get_tile_indices(40.325, 3.123, srs = "EPSG:4326")
data = pyramid.get_tile_data_vector(level, col, row)
if data is None:
print("No data")
else:
print(json.dumps(data))
except Exception as e:
print("Cannot print a vector pyramid's tile as GeoJSON : {e}")
Returns:
str: data, as GeoJSON dictionnary. None if no data
"""
if self.type == PyramidType.RASTER:
raise Exception("Cannot get tile as vector data : it's a raster pyramid")
binary_tile = self.get_tile_data_binary(level, column, row)
if binary_tile is None:
return None
level_object = self.get_level(level)
if self.__format == "TIFF_PBF_MVT":
try:
data = mapbox_vector_tile.decode(binary_tile)
except Exception as e:
raise FormatError("PBF (MVT)", "binary tile", e)
else:
raise NotImplementedError(f"Cannot get tile as vector data for format {self.__format}")
return data
def get_tile_indices(
self, x: float, y: float, level: str = None, **kwargs
) -> Tuple[str, int, int, int, int]:
"""Get pyramid's tile and pixel indices from point's coordinates
Used coordinates system have to be the pyramid one. If EPSG:4326, x is latitude and y longitude.
Args:
x (float): point's x
y (float): point's y
level (str, optional): Pyramid's level to take into account, the bottom one if None . Defaults to None.
**srs (string): spatial reference system of provided coordinates, with authority and code (same as the pyramid's one if not provided)
Raises:
Exception: Cannot find level to calculate indices
RuntimeError: Provided SRS is invalid for OSR
Examples:
FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json")
level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326")
data = pyramid.get_tile_data_raster(level, col, row)
if data is None:
print("No data")
else:
print(data[prow][pcol])
except Exception as e:
print("Cannot get a pyramid's pixel value : {e}")
Returns:
Tuple[str, int, int, int, int]: Level identifier, tile's column, tile's row, pixel's (in the tile) column, pixel's row
"""
level_object = self.bottom_level
if level is not None:
level_object = self.get_level(level)
if level_object is None:
raise Exception(f"Cannot found the level to calculate indices")
if (
"srs" in kwargs
and kwargs["srs"] is not None
and kwargs["srs"].upper() != self.__tms.srs.upper()
):
sr = srs_to_spatialreference(kwargs["srs"])
x, y = reproject_point((x, y), sr, self.__tms.sr)
return (level_object.id,) + level_object.tile_matrix.point_to_indices(x, y)
@property
def size(self) -> int:
"""Get the size of the pyramid
Examples:
from rok4.Pyramid import Pyramid
try:
pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json")
size = pyramid.size()
except Exception as e:
print("Cannot load the pyramid from its descriptor and get his size")
Returns:
int: size of the pyramid
"""
if not hasattr(self,"_Pyramid__size") :
self.__size = size_path(get_path_from_infos(self.__storage["type"], self.__storage["root"], self.__name))
return self.__size
Global variables
var ROK4_IMAGE_HEADER_SIZE
-
Slab's header size, 2048 bytes
Functions
def b36_number_decode(number: str) ‑> int
-
Convert base-36 number to base-10
Args
number
:str
- base-36 number
Returns
int
- base-10 number
Expand source code
def b36_number_decode(number: str) -> int: """Convert base-36 number to base-10 Args: number (str): base-36 number Returns: int: base-10 number """ return int(number, 36)
def b36_number_encode(number: int) ‑> str
-
Convert base-10 number to base-36
Used alphabet is '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
Args
number
:int
- base-10 number
Returns
str
- base-36 number
Expand source code
def b36_number_encode(number: int) -> str: """Convert base-10 number to base-36 Used alphabet is '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ' Args: number (int): base-10 number Returns: str: base-36 number """ alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" base36 = "" if 0 <= number < len(alphabet): return alphabet[number] while number != 0: number, i = divmod(number, len(alphabet)) base36 = alphabet[i] + base36 return base36
def b36_path_decode(path: str) ‑> Tuple[int, int]
-
Get slab's column and row from a base-36 based path
Args
path
:str
- slab's path
Returns
Tuple[int, int]
- slab's column and row
Expand source code
def b36_path_decode(path: str) -> Tuple[int, int]: """Get slab's column and row from a base-36 based path Args: path (str): slab's path Returns: Tuple[int, int]: slab's column and row """ path = path.replace("/", "") path = re.sub(r"(\.TIFF?)", "", path.upper()) b36_column = "" b36_row = "" while len(path) > 0: b36_column += path[0] b36_row += path[1] path = path[2:] return b36_number_decode(b36_column), b36_number_decode(b36_row)
def b36_path_encode(column: int, row: int, slashs: int) ‑> str
-
Convert slab indices to base-36 based path, with .tif extension
Args
column
:int
- slab's column
row
:int
- slab's row
slashs
:int
- slashs' number (to split path)
Returns
str
- base-36 based path
Expand source code
def b36_path_encode(column: int, row: int, slashs: int) -> str: """Convert slab indices to base-36 based path, with .tif extension Args: column (int): slab's column row (int): slab's row slashs (int): slashs' number (to split path) Returns: str: base-36 based path """ b36_column = b36_number_encode(column) b36_row = b36_number_encode(row) max_len = max(slashs + 1, len(b36_column), len(b36_row)) b36_column = b36_column.rjust(max_len, "0") b36_row = b36_row.rjust(max_len, "0") b36_path = "" while len(b36_column) > 0: b36_path = b36_row[-1] + b36_path b36_path = b36_column[-1] + b36_path b36_column = b36_column[:-1] b36_row = b36_row[:-1] if slashs > 0: b36_path = "/" + b36_path slashs -= 1 return f"{b36_path}.tif"
Classes
class Level
-
A pyramid's level, raster or vector
Attributes
__id
:str
- level's identifier. have to exist in the pyramid's used TMS
__tile_limits
:Dict[str, int]
- minimum and maximum tiles' columns and rows of pyramid's content
__slab_size
:Tuple[int, int]
- number of tile in a slab, widthwise and heightwise
__tables
:List[Dict]
- for a VECTOR pyramid, description of vector content, tables and attributes
Expand source code
class Level: """A pyramid's level, raster or vector Attributes: __id (str): level's identifier. have to exist in the pyramid's used TMS __tile_limits (Dict[str, int]): minimum and maximum tiles' columns and rows of pyramid's content __slab_size (Tuple[int, int]): number of tile in a slab, widthwise and heightwise __tables (List[Dict]): for a VECTOR pyramid, description of vector content, tables and attributes """ @classmethod def from_descriptor(cls, data: Dict, pyramid: "Pyramid") -> "Level": """Create a pyramid's level from the pyramid's descriptor levels element Args: data (Dict): level's information from the pyramid's descriptor pyramid (Pyramid): pyramid containing the level to create Raises: Exception: different storage or masks presence between the level and the pyramid MissingAttributeError: Attribute is missing in the content Returns: Pyramid: a Level instance """ level = cls() level.__pyramid = pyramid # Attributs communs try: level.__id = data["id"] level.__tile_limits = data["tile_limits"] level.__slab_size = ( data["tiles_per_width"], data["tiles_per_height"], ) # Informations sur le stockage : on les valide et stocke dans la pyramide if pyramid.storage_type.name != data["storage"]["type"]: raise Exception( f"Pyramid {pyramid.descriptor} owns levels using different storage types ({ data['storage']['type'] }) than its one ({pyramid.storage_type.name})" ) if pyramid.storage_type == StorageType.FILE: pyramid.storage_depth = data["storage"]["path_depth"] if "mask_directory" in data["storage"] or "mask_prefix" in data["storage"]: if not pyramid.own_masks: raise Exception( f"Pyramid {pyramid.__descriptor} does not define a mask format but level {level.__id} define mask storage informations" ) else: if pyramid.own_masks: raise Exception( f"Pyramid {pyramid.__descriptor} define a mask format but level {level.__id} does not define mask storage informations" ) except KeyError as e: raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}") # Attributs dans le cas d'un niveau vecteur if level.__pyramid.type == PyramidType.VECTOR: try: level.__tables = data["tables"] except KeyError as e: raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}") return level @classmethod def from_other(cls, other: "Level", pyramid: "Pyramid") -> "Level": """Create a pyramid's level from another one Args: other (Level): level to clone pyramid (Pyramid): new pyramid containing the new level Raises: Exception: different storage or masks presence between the level and the pyramid MissingAttributeError: Attribute is missing in the content Returns: Pyramid: a Level instance """ level = cls() # Attributs communs level.__id = other.__id level.__pyramid = pyramid level.__tile_limits = other.__tile_limits level.__slab_size = other.__slab_size # Attributs dans le cas d'un niveau vecteur if level.__pyramid.type == PyramidType.VECTOR: level.__tables = other.__tables return level def __str__(self) -> str: return f"{self.__pyramid.type.name} pyramid's level '{self.__id}' ({self.__pyramid.storage_type.name} storage)" @property def serializable(self) -> Dict: """Get the dict version of the pyramid object, pyramid's descriptor compliant Returns: Dict: pyramid's descriptor structured object description """ serialization = { "id": self.__id, "tiles_per_width": self.__slab_size[0], "tiles_per_height": self.__slab_size[1], "tile_limits": self.__tile_limits, } if self.__pyramid.type == PyramidType.VECTOR: serialization["tables"] = self.__tables if self.__pyramid.storage_type == StorageType.FILE: serialization["storage"] = { "type": "FILE", "image_directory": f"{self.__pyramid.name}/DATA/{self.__id}", "path_depth": self.__pyramid.storage_depth, } if self.__pyramid.own_masks: serialization["storage"][ "mask_directory" ] = f"{self.__pyramid.name}/MASK/{self.__id}" elif self.__pyramid.storage_type == StorageType.CEPH: serialization["storage"] = { "type": "CEPH", "image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}", "pool_name": self.__pyramid.storage_root, } if self.__pyramid.own_masks: serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}" elif self.__pyramid.storage_type == StorageType.S3: serialization["storage"] = { "type": "S3", "image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}", "bucket_name": self.__pyramid.storage_root, } if self.__pyramid.own_masks: serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}" return serialization @property def id(self) -> str: return self.__id @property def bbox(self) -> Tuple[float, float, float, float]: """Return level extent, based on tile limits Returns: Tuple[float, float, float, float]: level terrain extent (xmin, ymin, xmax, ymax) """ min_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox( self.__tile_limits["min_col"], self.__tile_limits["max_row"] ) max_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox( self.__tile_limits["max_col"], self.__tile_limits["min_row"] ) return (min_bbox[0], min_bbox[1], max_bbox[2], max_bbox[3]) @property def resolution(self) -> str: return self.__pyramid.tms.get_level(self.__id).resolution @property def tile_matrix(self) -> TileMatrix: return self.__pyramid.tms.get_level(self.__id) @property def slab_width(self) -> int: return self.__slab_size[0] @property def slab_height(self) -> int: return self.__slab_size[1] def is_in_limits(self, column: int, row: int) -> bool: """Is the tile indices in limits ? Args: column (int): tile's column row (int): tile's row Returns: bool: True if tiles' limits contain the provided tile's indices """ return ( self.__tile_limits["min_row"] <= row and self.__tile_limits["max_row"] >= row and self.__tile_limits["min_col"] <= column and self.__tile_limits["max_col"] >= column ) def set_limits_from_bbox(self, bbox: Tuple[float, float, float, float]) -> None: """Set tile limits, based on provided bounding box Args: bbox (Tuple[float, float, float, float]): terrain extent (xmin, ymin, xmax, ymax), in TMS coordinates system """ col_min, row_min, col_max, row_max = self.__pyramid.tms.get_level(self.__id).bbox_to_tiles( bbox ) self.__tile_limits = { "min_row": row_min, "max_col": col_max, "max_row": row_max, "min_col": col_min, }
Static methods
def from_descriptor(data: Dict[~KT, ~VT], pyramid: Pyramid) ‑> Level
-
Create a pyramid's level from the pyramid's descriptor levels element
Args
data
:Dict
- level's information from the pyramid's descriptor
pyramid
:Pyramid
- pyramid containing the level to create
Raises
Exception
- different storage or masks presence between the level and the pyramid
MissingAttributeError
- Attribute is missing in the content
Returns
Pyramid
- a Level instance
Expand source code
@classmethod def from_descriptor(cls, data: Dict, pyramid: "Pyramid") -> "Level": """Create a pyramid's level from the pyramid's descriptor levels element Args: data (Dict): level's information from the pyramid's descriptor pyramid (Pyramid): pyramid containing the level to create Raises: Exception: different storage or masks presence between the level and the pyramid MissingAttributeError: Attribute is missing in the content Returns: Pyramid: a Level instance """ level = cls() level.__pyramid = pyramid # Attributs communs try: level.__id = data["id"] level.__tile_limits = data["tile_limits"] level.__slab_size = ( data["tiles_per_width"], data["tiles_per_height"], ) # Informations sur le stockage : on les valide et stocke dans la pyramide if pyramid.storage_type.name != data["storage"]["type"]: raise Exception( f"Pyramid {pyramid.descriptor} owns levels using different storage types ({ data['storage']['type'] }) than its one ({pyramid.storage_type.name})" ) if pyramid.storage_type == StorageType.FILE: pyramid.storage_depth = data["storage"]["path_depth"] if "mask_directory" in data["storage"] or "mask_prefix" in data["storage"]: if not pyramid.own_masks: raise Exception( f"Pyramid {pyramid.__descriptor} does not define a mask format but level {level.__id} define mask storage informations" ) else: if pyramid.own_masks: raise Exception( f"Pyramid {pyramid.__descriptor} define a mask format but level {level.__id} does not define mask storage informations" ) except KeyError as e: raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}") # Attributs dans le cas d'un niveau vecteur if level.__pyramid.type == PyramidType.VECTOR: try: level.__tables = data["tables"] except KeyError as e: raise MissingAttributeError(pyramid.descriptor, f"levels[].{e}") return level
def from_other(other: Level, pyramid: Pyramid) ‑> Level
-
Create a pyramid's level from another one
Args
Raises
Exception
- different storage or masks presence between the level and the pyramid
MissingAttributeError
- Attribute is missing in the content
Returns
Pyramid
- a Level instance
Expand source code
@classmethod def from_other(cls, other: "Level", pyramid: "Pyramid") -> "Level": """Create a pyramid's level from another one Args: other (Level): level to clone pyramid (Pyramid): new pyramid containing the new level Raises: Exception: different storage or masks presence between the level and the pyramid MissingAttributeError: Attribute is missing in the content Returns: Pyramid: a Level instance """ level = cls() # Attributs communs level.__id = other.__id level.__pyramid = pyramid level.__tile_limits = other.__tile_limits level.__slab_size = other.__slab_size # Attributs dans le cas d'un niveau vecteur if level.__pyramid.type == PyramidType.VECTOR: level.__tables = other.__tables return level
Instance variables
var bbox : Tuple[float, float, float, float]
-
Return level extent, based on tile limits
Returns
Tuple[float, float, float, float]
- level terrain extent (xmin, ymin, xmax, ymax)
Expand source code
@property def bbox(self) -> Tuple[float, float, float, float]: """Return level extent, based on tile limits Returns: Tuple[float, float, float, float]: level terrain extent (xmin, ymin, xmax, ymax) """ min_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox( self.__tile_limits["min_col"], self.__tile_limits["max_row"] ) max_bbox = self.__pyramid.tms.get_level(self.__id).tile_to_bbox( self.__tile_limits["max_col"], self.__tile_limits["min_row"] ) return (min_bbox[0], min_bbox[1], max_bbox[2], max_bbox[3])
var id : str
-
Expand source code
@property def id(self) -> str: return self.__id
var resolution : str
-
Expand source code
@property def resolution(self) -> str: return self.__pyramid.tms.get_level(self.__id).resolution
var serializable : Dict[~KT, ~VT]
-
Get the dict version of the pyramid object, pyramid's descriptor compliant
Returns
Dict
- pyramid's descriptor structured object description
Expand source code
@property def serializable(self) -> Dict: """Get the dict version of the pyramid object, pyramid's descriptor compliant Returns: Dict: pyramid's descriptor structured object description """ serialization = { "id": self.__id, "tiles_per_width": self.__slab_size[0], "tiles_per_height": self.__slab_size[1], "tile_limits": self.__tile_limits, } if self.__pyramid.type == PyramidType.VECTOR: serialization["tables"] = self.__tables if self.__pyramid.storage_type == StorageType.FILE: serialization["storage"] = { "type": "FILE", "image_directory": f"{self.__pyramid.name}/DATA/{self.__id}", "path_depth": self.__pyramid.storage_depth, } if self.__pyramid.own_masks: serialization["storage"][ "mask_directory" ] = f"{self.__pyramid.name}/MASK/{self.__id}" elif self.__pyramid.storage_type == StorageType.CEPH: serialization["storage"] = { "type": "CEPH", "image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}", "pool_name": self.__pyramid.storage_root, } if self.__pyramid.own_masks: serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}" elif self.__pyramid.storage_type == StorageType.S3: serialization["storage"] = { "type": "S3", "image_prefix": f"{self.__pyramid.name}/DATA_{self.__id}", "bucket_name": self.__pyramid.storage_root, } if self.__pyramid.own_masks: serialization["storage"]["mask_prefix"] = f"{self.__pyramid.name}/MASK_{self.__id}" return serialization
var slab_height : int
-
Expand source code
@property def slab_height(self) -> int: return self.__slab_size[1]
var slab_width : int
-
Expand source code
@property def slab_width(self) -> int: return self.__slab_size[0]
var tile_matrix : TileMatrix
-
Expand source code
@property def tile_matrix(self) -> TileMatrix: return self.__pyramid.tms.get_level(self.__id)
Methods
def is_in_limits(self, column: int, row: int) ‑> bool
-
Is the tile indices in limits ?
Args
column
:int
- tile's column
row
:int
- tile's row
Returns
bool
- True if tiles' limits contain the provided tile's indices
Expand source code
def is_in_limits(self, column: int, row: int) -> bool: """Is the tile indices in limits ? Args: column (int): tile's column row (int): tile's row Returns: bool: True if tiles' limits contain the provided tile's indices """ return ( self.__tile_limits["min_row"] <= row and self.__tile_limits["max_row"] >= row and self.__tile_limits["min_col"] <= column and self.__tile_limits["max_col"] >= column )
def set_limits_from_bbox(self, bbox: Tuple[float, float, float, float]) ‑> None
-
Set tile limits, based on provided bounding box
Args
bbox
:Tuple[float, float, float, float]
- terrain extent (xmin, ymin, xmax, ymax), in TMS coordinates system
Expand source code
def set_limits_from_bbox(self, bbox: Tuple[float, float, float, float]) -> None: """Set tile limits, based on provided bounding box Args: bbox (Tuple[float, float, float, float]): terrain extent (xmin, ymin, xmax, ymax), in TMS coordinates system """ col_min, row_min, col_max, row_max = self.__pyramid.tms.get_level(self.__id).bbox_to_tiles( bbox ) self.__tile_limits = { "min_row": row_min, "max_col": col_max, "max_row": row_max, "min_col": col_min, }
class Pyramid
-
A data pyramid, raster or vector
Attributes
__name
:str
- pyramid's name
__descriptor
:str
- pyramid's descriptor path
__list
:str
- pyramid's list path
__tms
:TileMatrixSet
- Used grid
__levels
:Dict[str, Level]
- Pyramid's levels
__format
:str
- Data format
__storage
:Dict[str, Union[StorageType,str,int]]
- Pyramid's storage informations (type, root and depth if FILE storage)
__raster_specifications
:Dict
- If raster pyramid, raster specifications
__content
:Dict
-
Loading status (loaded) and list content (cache).
Example (S3 storage):
{ 'cache': { (<SlabType.DATA: 'DATA'>, '18', 5424, 7526): { 'link': False, 'md5': None, 'root': 'pyramids@localhost:9000/LIMADM', 'slab': 'DATA_18_5424_7526' } }, 'loaded': True }
Expand source code
class Pyramid: """A data pyramid, raster or vector Attributes: __name (str): pyramid's name __descriptor (str): pyramid's descriptor path __list (str): pyramid's list path __tms (rok4.TileMatrixSet.TileMatrixSet): Used grid __levels (Dict[str, Level]): Pyramid's levels __format (str): Data format __storage (Dict[str, Union[rok4.Storage.StorageType,str,int]]): Pyramid's storage informations (type, root and depth if FILE storage) __raster_specifications (Dict): If raster pyramid, raster specifications __content (Dict): Loading status (loaded) and list content (cache). Example (S3 storage): { 'cache': { (<SlabType.DATA: 'DATA'>, '18', 5424, 7526): { 'link': False, 'md5': None, 'root': 'pyramids@localhost:9000/LIMADM', 'slab': 'DATA_18_5424_7526' } }, 'loaded': True } """ @classmethod def from_descriptor(cls, descriptor: str) -> "Pyramid": """Create a pyramid from its descriptor Args: descriptor (str): pyramid's descriptor path Raises: FormatError: Provided path or the TMS is not a well formed JSON Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS MissingAttributeError: Attribute is missing in the content StorageError: Storage read issue (pyramid descriptor or TMS) MissingEnvironmentError: Missing object storage informations or TMS root directory Examples: S3 stored descriptor from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") except Exception as e: print("Cannot load the pyramid from its descriptor") Returns: Pyramid: a Pyramid instance """ try: data = json.loads(get_data_str(descriptor)) except JSONDecodeError as e: raise FormatError("JSON", descriptor, e) pyramid = cls() pyramid.__storage["type"], path, pyramid.__storage["root"], base_name = get_infos_from_path( descriptor ) pyramid.__name = base_name[:-5] # on supprime l'extension.json pyramid.__descriptor = descriptor pyramid.__list = get_path_from_infos( pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list" ) try: # Attributs communs pyramid.__tms = TileMatrixSet(data["tile_matrix_set"]) pyramid.__format = data["format"] # Attributs d'une pyramide raster if pyramid.type == PyramidType.RASTER: pyramid.__raster_specifications = data["raster_specifications"] if "mask_format" in data: pyramid.__masks = True else: pyramid.__masks = False # Niveaux for l in data["levels"]: lev = Level.from_descriptor(l, pyramid) pyramid.__levels[lev.id] = lev if pyramid.__tms.get_level(lev.id) is None: raise Exception( f"Pyramid {descriptor} owns a level with the ID '{lev.id}', not defined in the TMS '{pyramid.tms.name}'" ) except KeyError as e: raise MissingAttributeError(descriptor, e) if len(pyramid.__levels.keys()) == 0: raise Exception(f"Pyramid '{descriptor}' has no level") return pyramid @classmethod def from_other(cls, other: "Pyramid", name: str, storage: Dict) -> "Pyramid": """Create a pyramid from another one Args: other (Pyramid): pyramid to clone name (str): new pyramid's name storage (Dict[str, Union[str, int]]): new pyramid's storage informations Raises: FormatError: Provided path or the TMS is not a well formed JSON Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS MissingAttributeError: Attribute is missing in the content Returns: Pyramid: a Pyramid instance """ try: # On convertit le type de stockage selon l'énumération storage["type"] = StorageType[storage["type"]] if storage["type"] == StorageType.FILE and name.find("/") != -1: raise Exception(f"A FILE stored pyramid's name cannot contain '/' : '{name}'") if storage["type"] == StorageType.FILE and "depth" not in storage: storage["depth"] = 2 pyramid = cls() # Attributs communs pyramid.__name = name pyramid.__storage = storage pyramid.__masks = other.__masks pyramid.__descriptor = get_path_from_infos( pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.json" ) pyramid.__list = get_path_from_infos( pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list" ) pyramid.__tms = other.__tms pyramid.__format = other.__format # Attributs d'une pyramide raster if pyramid.type == PyramidType.RASTER: if other.own_masks: pyramid.__masks = True else: pyramid.__masks = False pyramid.__raster_specifications = other.__raster_specifications # Niveaux for l in other.__levels.values(): lev = Level.from_other(l, pyramid) pyramid.__levels[lev.id] = lev except KeyError as e: raise MissingAttributeError(descriptor, e) return pyramid def __init__(self) -> None: self.__storage = {} self.__levels = {} self.__masks = None self.__content = {"loaded": False, "cache": {}} def __str__(self) -> str: return f"{self.type.name} pyramid '{self.__name}' ({self.__storage['type'].name} storage)" @property def serializable(self) -> Dict: """Get the dict version of the pyramid object, descriptor compliant Returns: Dict: descriptor structured object description """ serialization = { "tile_matrix_set": self.__tms.name, "format": self.__format } serialization["levels"] = [] sorted_levels = sorted(self.__levels.values(), key=lambda l: l.resolution, reverse=True) for l in sorted_levels: serialization["levels"].append(l.serializable) if self.type == PyramidType.RASTER: serialization["raster_specifications"] = self.__raster_specifications if self.__masks: serialization["mask_format"] = "TIFF_ZIP_UINT8" return serialization @property def list(self) -> str: return self.__list @property def descriptor(self) -> str: return self.__descriptor @property def name(self) -> str: return self.__name @property def tms(self) -> TileMatrixSet: return self.__tms @property def raster_specifications(self) -> Dict: """Get raster specifications for a RASTER pyramid Example: { "channels": 3, "nodata": "255,0,0", "photometric": "rgb", "interpolation": "bicubic" } Returns: Dict: Raster specifications, None if VECTOR pyramid """ return self.__raster_specifications @property def storage_type(self) -> StorageType: """Get the storage type Returns: StorageType: FILE, S3 or CEPH """ return self.__storage["type"] @property def storage_root(self) -> str: """Get the pyramid's storage root. If storage is S3, the used cluster is removed. Returns: str: Pyramid's storage root """ return self.__storage["root"].split("@", 1)[ 0 ] # Suppression de l'éventuel hôte de spécification du cluster S3 @property def storage_depth(self) -> int: return self.__storage.get("depth", None) @property def storage_s3_cluster(self) -> str: """Get the pyramid's storage S3 cluster (host name) Returns: str: the host if known, None if the default one have to be used or if storage is not S3 """ if self.__storage["type"] == StorageType.S3: try: return self.__storage["root"].split("@")[1] except IndexError: return None else: return None @storage_depth.setter def storage_depth(self, d: int) -> None: """Set the tree depth for a FILE storage Args: d (int): file storage depth Raises: Exception: the depth is not equal to the already known depth """ if "depth" in self.__storage and self.__storage["depth"] != d: raise Exception( f"Pyramid {pyramid.__descriptor} owns levels with different path depths" ) self.__storage["depth"] = d @property def own_masks(self) -> bool: return self.__masks @property def format(self) -> str: return self.__format @property def tile_extension(self) -> str: if self.__format in [ "TIFF_RAW_UINT8", "TIFF_LZW_UINT8", "TIFF_ZIP_UINT8", "TIFF_PKB_UINT8", "TIFF_RAW_FLOAT32", "TIFF_LZW_FLOAT32", "TIFF_ZIP_FLOAT32", "TIFF_PKB_FLOAT32", ]: return "tif" elif self.__format in ["TIFF_JPG_UINT8", "TIFF_JPG90_UINT8"]: return "jpg" elif self.__format == "TIFF_PNG_UINT8": return "png" elif self.__format == "TIFF_PBF_MVT": return "pbf" else: raise Exception( f"Unknown pyramid's format ({self.__format}), cannot return the tile extension" ) @property def bottom_level(self) -> "Level": """Get the best resolution level in the pyramid Returns: Level: the bottom level """ return sorted(self.__levels.values(), key=lambda l: l.resolution)[0] @property def top_level(self) -> "Level": """Get the low resolution level in the pyramid Returns: Level: the top level """ return sorted(self.__levels.values(), key=lambda l: l.resolution)[-1] @property def type(self) -> PyramidType: """Get the pyramid's type (RASTER or VECTOR) from its format Returns: PyramidType: RASTER or VECTOR """ if self.__format == "TIFF_PBF_MVT": return PyramidType.VECTOR else: return PyramidType.RASTER def load_list(self) -> None: """Load list content and cache it If list is already loaded, nothing done """ if self.__content["loaded"]: return for slab, infos in self.list_generator(): self.__content["cache"][slab] = infos self.__content["loaded"] = True def list_generator(self) -> Iterator[Tuple[Tuple[SlabType, str, int, int], Dict]]: """Get list content List is copied as temporary file, roots are read and informations about each slab is returned. If list is already loaded, we yield the cached content Examples: S3 stored descriptor from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") for (slab_type, level, column, row), infos in pyramid.list_generator(): print(infos) except Exception as e: print("Cannot load the pyramid from its descriptor and read the list") Yields: Iterator[Tuple[Tuple[SlabType,str,int,int], Dict]]: Slab indices and storage informations Value example: ( (<SlabType.DATA: 'DATA'>, '18', 5424, 7526), { 'link': False, 'md5': None, 'root': 'pyramids@localhost:9000/LIMADM', 'slab': 'DATA_18_5424_7526' } ) """ if self.__content["loaded"]: for slab, infos in self.__content["cache"].items(): yield slab, infos else: # Copie de la liste dans un fichier temporaire (cette liste peut être un objet) list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) list_file = list_obj.name copy(self.__list, f"file://{list_file}") list_obj.close() roots = {} s3_cluster = self.storage_s3_cluster with open(list_file, "r") as listin: # Lecture des racines for line in listin: line = line.rstrip() if line == "#": break root_id, root_path = line.split("=", 1) if s3_cluster is None: roots[root_id] = root_path else: # On a un nom de cluster S3, on l'ajoute au nom du bucket dans les racines root_bucket, root_path = root_path.split("/", 1) roots[root_id] = f"{root_bucket}@{s3_cluster}/{root_path}" # Lecture des dalles for line in listin: line = line.rstrip() parts = line.split(" ", 1) slab_path = parts[0] slab_md5 = None if len(parts) == 2: slab_md5 = parts[1] root_id, slab_path = slab_path.split("/", 1) slab_type, level, column, row = self.get_infos_from_slab_path(slab_path) infos = { "root": roots[root_id], "link": root_id != "0", "slab": slab_path, "md5": slab_md5, } yield ((slab_type, level, column, row), infos) remove(f"file://{list_file}") def get_level(self, level_id: str) -> "Level": """Get one level according to its identifier Args: level_id: Level identifier Returns: The corresponding pyramid's level, None if not present """ return self.__levels.get(level_id, None) def get_levels(self, bottom_id: str = None, top_id: str = None) -> List[Level]: """Get sorted levels in the provided range from bottom to top Args: bottom_id (str, optionnal): specific bottom level id. Defaults to None. top_id (str, optionnal): specific top level id. Defaults to None. Raises: Exception: Provided levels are not consistent (bottom > top or not in the pyramid) Examples: All levels from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") levels = pyramid.get_levels() except Exception as e: print("Cannot load the pyramid from its descriptor and get levels") From pyramid's bottom to provided top (level 5) from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") levels = pyramid.get_levels(None, "5") except Exception as e: print("Cannot load the pyramid from its descriptor and get levels") Returns: List[Level]: asked sorted levels """ sorted_levels = sorted(self.__levels.values(), key=lambda l: l.resolution) levels = [] begin = False if bottom_id is None: # Pas de niveau du bas fourni, on commence tout en bas begin = True else: if self.get_level(bottom_id) is None: raise Exception( f"Pyramid {self.name} does not contain the provided bottom level {bottom_id}" ) if top_id is not None and self.get_level(top_id) is None: raise Exception(f"Pyramid {self.name} does not contain the provided top level {top_id}") end = False for l in sorted_levels: if not begin and l.id == bottom_id: begin = True if begin: levels.append(l) if top_id is not None and l.id == top_id: end = True break else: continue if top_id is None: # Pas de niveau du haut fourni, on a été jusqu'en haut et c'est normal end = True if not begin or not end: raise Exception( f"Provided levels ids are not consistent to extract levels from the pyramid {self.name}" ) return levels def write_descriptor(self) -> None: """Write the pyramid's descriptor to the final location (in the pyramid's storage root)""" content = json.dumps(self.serializable) put_data_str(content, self.__descriptor) def get_infos_from_slab_path(self, path: str) -> Tuple[SlabType, str, int, int]: """Get the slab's indices from its storage path Args: path (str): Slab's storage path Examples: FILE stored pyramid from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/path/to/descriptor.json") slab_type, level, column, row = self.get_infos_from_slab_path("DATA/12/00/4A/F7.tif") # (SlabType.DATA, "12", 159, 367) except Exception as e: print("Cannot load the pyramid from its descriptor and convert a slab path") S3 stored pyramid from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/pyramid.json") slab_type, level, column, row = self.get_infos_from_slab_path("s3://bucket_name/path/to/pyramid/MASK_15_9164_5846") # (SlabType.MASK, "15", 9164, 5846) except Exception as e: print("Cannot load the pyramid from its descriptor and convert a slab path") Returns: Tuple[SlabType, str, int, int]: Slab's type (DATA or MASK), level identifier, slab's column and slab's row """ if self.__storage["type"] == StorageType.FILE: parts = path.split("/") # Le partie du chemin qui contient la colonne et ligne de la dalle est à la fin, en fonction de la profondeur choisie # depth = 2 -> on doit utiliser les 3 dernières parties pour la conversion column, row = b36_path_decode("/".join(parts[-(self.__storage["depth"] + 1) :])) level = parts[-(self.__storage["depth"] + 2)] raw_slab_type = parts[-(self.__storage["depth"] + 3)] # Pour être retro compatible avec l'ancien nommage if raw_slab_type == "IMAGE": raw_slab_type = "DATA" slab_type = SlabType[raw_slab_type] return slab_type, level, column, row else: parts = re.split(r"[/_]", path) column = parts[-2] row = parts[-1] level = parts[-3] raw_slab_type = parts[-4] # Pour être retro compatible avec l'ancien nommage if raw_slab_type == "IMG": raw_slab_type = "DATA" elif raw_slab_type == "MSK": raw_slab_type = "MASK" slab_type = SlabType[raw_slab_type] return slab_type, level, int(column), int(row) def get_slab_path_from_infos( self, slab_type: SlabType, level: str, column: int, row: int, full: bool = True ) -> str: """Get slab's storage path from the indices Args: slab_type (SlabType): DATA or MASK level (str): Level identifier column (int): Slab's column row (int): Slab's row full (bool, optional): Full path or just relative path from pyramid storage root. Defaults to True. Returns: str: Absolute or relative slab's storage path """ if self.__storage["type"] == StorageType.FILE: slab_path = os.path.join( slab_type.value, level, b36_path_encode(column, row, self.__storage["depth"]) ) else: slab_path = f"{slab_type.value}_{level}_{column}_{row}" if full: return get_path_from_infos( self.__storage["type"], self.__storage["root"], self.__name, slab_path ) else: return slab_path def get_tile_data_binary(self, level: str, column: int, row: int) -> str: """Get a pyramid's tile as binary string To get a tile, 3 steps : * calculate slab path from tile index * read slab index to get offsets and sizes of slab's tiles * read the tile into the slab Args: level (str): Tile's level column (int): Tile's column row (int): Tile's row Limitations: Pyramids with one-tile slab are not handled Examples: FILE stored raster pyramid, to extract a tile containing a point and save it as independent image from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/SCAN1000.json") level, col, row, pcol, prow = pyramid.get_tile_indices(992904.46, 6733643.15, "9", srs = "IGNF:LAMB93") data = pyramid.get_tile_data_binary(level, col, row) if data is None: print("No data") else: tile_name = f"tile_{level}_{col}_{row}.{pyramid.tile_extension}" with open(tile_name, "wb") as image: image.write(data) print (f"Tile written in {tile_name}") except Exception as e: print("Cannot save a pyramid's tile : {e}") Raises: Exception: Level not found in the pyramid NotImplementedError: Pyramid owns one-tile slabs MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue Returns: str: data, as binary string, None if no data """ level_object = self.get_level(level) if level_object is None: raise Exception(f"No level {level} in the pyramid") if level_object.slab_width == 1 and level_object.slab_height == 1: raise NotImplementedError(f"One-tile slab pyramid is not handled") if not level_object.is_in_limits(column, row): return None # Indices de la dalle slab_column = column // level_object.slab_width slab_row = row // level_object.slab_height # Indices de la tuile dans la dalle relative_tile_column = column % level_object.slab_width relative_tile_row = row % level_object.slab_height # Numéro de la tuile dans le header tile_index = relative_tile_row * level_object.slab_width + relative_tile_column # Calcul du chemin de la dalle contenant la tuile voulue slab_path = self.get_slab_path_from_infos(SlabType.DATA, level, slab_column, slab_row) # Récupération des offset et tailles des tuiles dans la dalle # Une dalle ROK4 a une en-tête fixe de 2048 octets, # puis sont stockés les offsets (chacun sur 4 octets) # puis les tailles (chacune sur 4 octets) try: binary_index = get_data_binary( slab_path, ( ROK4_IMAGE_HEADER_SIZE, 2 * 4 * level_object.slab_width * level_object.slab_height, ), ) except FileNotFoundError as e: # L'absence de la dalle est gérée comme simplement une absence de données return None offsets = numpy.frombuffer( binary_index, dtype=numpy.dtype("uint32"), count=level_object.slab_width * level_object.slab_height, ) sizes = numpy.frombuffer( binary_index, dtype=numpy.dtype("uint32"), offset=4 * level_object.slab_width * level_object.slab_height, count=level_object.slab_width * level_object.slab_height, ) if sizes[tile_index] == 0: return None return get_data_binary(slab_path, (offsets[tile_index], sizes[tile_index])) def get_tile_data_raster(self, level: str, column: int, row: int) -> numpy.ndarray: """Get a raster pyramid's tile as 3-dimension numpy ndarray First dimension is the row, second one is column, third one is band. Args: level (str): Tile's level column (int): Tile's column row (int): Tile's row Limitations: Packbits (pyramid formats TIFF_PKB_FLOAT32 and TIFF_PKB_UINT8) and LZW (pyramid formats TIFF_LZW_FLOAT32 and TIFF_LZW_UINT8) compressions are not handled. Raises: Exception: Cannot get raster data for a vector pyramid Exception: Level not found in the pyramid NotImplementedError: Pyramid owns one-tile slabs NotImplementedError: Raster pyramid format not handled MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue FormatError: Cannot decode tile Examples: FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") data = pyramid.get_tile_data_raster(level, col, row) if data is None: print("No data") else: print(data[prow][pcol]) except Exception as e: print("Cannot get a pyramid's pixel value : {e}") Returns: str: data, as numpy array, None if no data """ if self.type == PyramidType.VECTOR: raise Exception("Cannot get tile as raster data : it's a vector pyramid") binary_tile = self.get_tile_data_binary(level, column, row) if binary_tile is None: return None level_object = self.get_level(level) if self.__format == "TIFF_JPG_UINT8" or self.__format == "TIFF_JPG90_UINT8": try: img = Image.open(io.BytesIO(binary_tile)) except Exception as e: raise FormatError("JPEG", "binary tile", e) data = numpy.asarray(img) elif self.__format == "TIFF_RAW_UINT8": data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("uint8")) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) elif self.__format == "TIFF_PNG_UINT8": try: img = Image.open(io.BytesIO(binary_tile)) except Exception as e: raise FormatError("PNG", "binary tile", e) data = numpy.asarray(img) elif self.__format == "TIFF_ZIP_UINT8": try: data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("uint8")) except Exception as e: raise FormatError("ZIP", "binary tile", e) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) elif self.__format == "TIFF_ZIP_FLOAT32": try: data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("float32")) except Exception as e: raise FormatError("ZIP", "binary tile", e) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) elif self.__format == "TIFF_RAW_FLOAT32": data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("float32")) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) else: raise NotImplementedError(f"Cannot get tile as raster data for format {self.__format}") return data def get_tile_data_vector(self, level: str, column: int, row: int) -> Dict: """Get a vector pyramid's tile as GeoJSON dictionnary Args: level (str): Tile's level column (int): Tile's column row (int): Tile's row Raises: Exception: Cannot get vector data for a raster pyramid Exception: Level not found in the pyramid NotImplementedError: Pyramid owns one-tile slabs NotImplementedError: Vector pyramid format not handled MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue FormatError: Cannot decode tile Examples: S3 stored vector pyramid, to print a tile as GeoJSON from rok4.Pyramid import Pyramid import json try: pyramid = Pyramid.from_descriptor("s3://pyramids/vectors/BDTOPO.json") level, col, row, pcol, prow = pyramid.get_tile_indices(40.325, 3.123, srs = "EPSG:4326") data = pyramid.get_tile_data_vector(level, col, row) if data is None: print("No data") else: print(json.dumps(data)) except Exception as e: print("Cannot print a vector pyramid's tile as GeoJSON : {e}") Returns: str: data, as GeoJSON dictionnary. None if no data """ if self.type == PyramidType.RASTER: raise Exception("Cannot get tile as vector data : it's a raster pyramid") binary_tile = self.get_tile_data_binary(level, column, row) if binary_tile is None: return None level_object = self.get_level(level) if self.__format == "TIFF_PBF_MVT": try: data = mapbox_vector_tile.decode(binary_tile) except Exception as e: raise FormatError("PBF (MVT)", "binary tile", e) else: raise NotImplementedError(f"Cannot get tile as vector data for format {self.__format}") return data def get_tile_indices( self, x: float, y: float, level: str = None, **kwargs ) -> Tuple[str, int, int, int, int]: """Get pyramid's tile and pixel indices from point's coordinates Used coordinates system have to be the pyramid one. If EPSG:4326, x is latitude and y longitude. Args: x (float): point's x y (float): point's y level (str, optional): Pyramid's level to take into account, the bottom one if None . Defaults to None. **srs (string): spatial reference system of provided coordinates, with authority and code (same as the pyramid's one if not provided) Raises: Exception: Cannot find level to calculate indices RuntimeError: Provided SRS is invalid for OSR Examples: FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") data = pyramid.get_tile_data_raster(level, col, row) if data is None: print("No data") else: print(data[prow][pcol]) except Exception as e: print("Cannot get a pyramid's pixel value : {e}") Returns: Tuple[str, int, int, int, int]: Level identifier, tile's column, tile's row, pixel's (in the tile) column, pixel's row """ level_object = self.bottom_level if level is not None: level_object = self.get_level(level) if level_object is None: raise Exception(f"Cannot found the level to calculate indices") if ( "srs" in kwargs and kwargs["srs"] is not None and kwargs["srs"].upper() != self.__tms.srs.upper() ): sr = srs_to_spatialreference(kwargs["srs"]) x, y = reproject_point((x, y), sr, self.__tms.sr) return (level_object.id,) + level_object.tile_matrix.point_to_indices(x, y) @property def size(self) -> int: """Get the size of the pyramid Examples: from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") size = pyramid.size() except Exception as e: print("Cannot load the pyramid from its descriptor and get his size") Returns: int: size of the pyramid """ if not hasattr(self,"_Pyramid__size") : self.__size = size_path(get_path_from_infos(self.__storage["type"], self.__storage["root"], self.__name)) return self.__size
Static methods
def from_descriptor(descriptor: str) ‑> Pyramid
-
Create a pyramid from its descriptor
Args
descriptor
:str
- pyramid's descriptor path
Raises
FormatError
- Provided path or the TMS is not a well formed JSON
Exception
- Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS
MissingAttributeError
- Attribute is missing in the content
StorageError
- Storage read issue (pyramid descriptor or TMS)
MissingEnvironmentError
- Missing object storage informations or TMS root directory
Examples
S3 stored descriptor
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") except Exception as e: print("Cannot load the pyramid from its descriptor")
Returns
Pyramid
- a Pyramid instance
Expand source code
@classmethod def from_descriptor(cls, descriptor: str) -> "Pyramid": """Create a pyramid from its descriptor Args: descriptor (str): pyramid's descriptor path Raises: FormatError: Provided path or the TMS is not a well formed JSON Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS MissingAttributeError: Attribute is missing in the content StorageError: Storage read issue (pyramid descriptor or TMS) MissingEnvironmentError: Missing object storage informations or TMS root directory Examples: S3 stored descriptor from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") except Exception as e: print("Cannot load the pyramid from its descriptor") Returns: Pyramid: a Pyramid instance """ try: data = json.loads(get_data_str(descriptor)) except JSONDecodeError as e: raise FormatError("JSON", descriptor, e) pyramid = cls() pyramid.__storage["type"], path, pyramid.__storage["root"], base_name = get_infos_from_path( descriptor ) pyramid.__name = base_name[:-5] # on supprime l'extension.json pyramid.__descriptor = descriptor pyramid.__list = get_path_from_infos( pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list" ) try: # Attributs communs pyramid.__tms = TileMatrixSet(data["tile_matrix_set"]) pyramid.__format = data["format"] # Attributs d'une pyramide raster if pyramid.type == PyramidType.RASTER: pyramid.__raster_specifications = data["raster_specifications"] if "mask_format" in data: pyramid.__masks = True else: pyramid.__masks = False # Niveaux for l in data["levels"]: lev = Level.from_descriptor(l, pyramid) pyramid.__levels[lev.id] = lev if pyramid.__tms.get_level(lev.id) is None: raise Exception( f"Pyramid {descriptor} owns a level with the ID '{lev.id}', not defined in the TMS '{pyramid.tms.name}'" ) except KeyError as e: raise MissingAttributeError(descriptor, e) if len(pyramid.__levels.keys()) == 0: raise Exception(f"Pyramid '{descriptor}' has no level") return pyramid
def from_other(other: Pyramid, name: str, storage: Dict[~KT, ~VT]) ‑> Pyramid
-
Create a pyramid from another one
Args
other
:Pyramid
- pyramid to clone
name
:str
- new pyramid's name
storage
:Dict[str, Union[str, int]]
- new pyramid's storage informations
Raises
FormatError
- Provided path or the TMS is not a well formed JSON
Exception
- Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS
MissingAttributeError
- Attribute is missing in the content
Returns
Pyramid
- a Pyramid instance
Expand source code
@classmethod def from_other(cls, other: "Pyramid", name: str, storage: Dict) -> "Pyramid": """Create a pyramid from another one Args: other (Pyramid): pyramid to clone name (str): new pyramid's name storage (Dict[str, Union[str, int]]): new pyramid's storage informations Raises: FormatError: Provided path or the TMS is not a well formed JSON Exception: Level issue : no one in the pyramid or the used TMS, or level ID not defined in the TMS MissingAttributeError: Attribute is missing in the content Returns: Pyramid: a Pyramid instance """ try: # On convertit le type de stockage selon l'énumération storage["type"] = StorageType[storage["type"]] if storage["type"] == StorageType.FILE and name.find("/") != -1: raise Exception(f"A FILE stored pyramid's name cannot contain '/' : '{name}'") if storage["type"] == StorageType.FILE and "depth" not in storage: storage["depth"] = 2 pyramid = cls() # Attributs communs pyramid.__name = name pyramid.__storage = storage pyramid.__masks = other.__masks pyramid.__descriptor = get_path_from_infos( pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.json" ) pyramid.__list = get_path_from_infos( pyramid.__storage["type"], pyramid.__storage["root"], f"{pyramid.__name}.list" ) pyramid.__tms = other.__tms pyramid.__format = other.__format # Attributs d'une pyramide raster if pyramid.type == PyramidType.RASTER: if other.own_masks: pyramid.__masks = True else: pyramid.__masks = False pyramid.__raster_specifications = other.__raster_specifications # Niveaux for l in other.__levels.values(): lev = Level.from_other(l, pyramid) pyramid.__levels[lev.id] = lev except KeyError as e: raise MissingAttributeError(descriptor, e) return pyramid
Instance variables
var bottom_level : Level
-
Expand source code
@property def bottom_level(self) -> "Level": """Get the best resolution level in the pyramid Returns: Level: the bottom level """ return sorted(self.__levels.values(), key=lambda l: l.resolution)[0]
var descriptor : str
-
Expand source code
@property def descriptor(self) -> str: return self.__descriptor
var format : str
-
Expand source code
@property def format(self) -> str: return self.__format
var list : str
-
Expand source code
@property def list(self) -> str: return self.__list
var name : str
-
Expand source code
@property def name(self) -> str: return self.__name
var own_masks : bool
-
Expand source code
@property def own_masks(self) -> bool: return self.__masks
var raster_specifications : Dict[~KT, ~VT]
-
Get raster specifications for a RASTER pyramid
Example
{ "channels": 3, "nodata": "255,0,0", "photometric": "rgb", "interpolation": "bicubic" }
Returns
Dict
- Raster specifications, None if VECTOR pyramid
Expand source code
@property def raster_specifications(self) -> Dict: """Get raster specifications for a RASTER pyramid Example: { "channels": 3, "nodata": "255,0,0", "photometric": "rgb", "interpolation": "bicubic" } Returns: Dict: Raster specifications, None if VECTOR pyramid """ return self.__raster_specifications
var serializable : Dict[~KT, ~VT]
-
Get the dict version of the pyramid object, descriptor compliant
Returns
Dict
- descriptor structured object description
Expand source code
@property def serializable(self) -> Dict: """Get the dict version of the pyramid object, descriptor compliant Returns: Dict: descriptor structured object description """ serialization = { "tile_matrix_set": self.__tms.name, "format": self.__format } serialization["levels"] = [] sorted_levels = sorted(self.__levels.values(), key=lambda l: l.resolution, reverse=True) for l in sorted_levels: serialization["levels"].append(l.serializable) if self.type == PyramidType.RASTER: serialization["raster_specifications"] = self.__raster_specifications if self.__masks: serialization["mask_format"] = "TIFF_ZIP_UINT8" return serialization
var size : int
-
Get the size of the pyramid
Examples
from rok4.Pyramid import Pyramid
try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") size = pyramid.size()
except Exception as e: print("Cannot load the pyramid from its descriptor and get his size")
Returns
int
- size of the pyramid
Expand source code
@property def size(self) -> int: """Get the size of the pyramid Examples: from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") size = pyramid.size() except Exception as e: print("Cannot load the pyramid from its descriptor and get his size") Returns: int: size of the pyramid """ if not hasattr(self,"_Pyramid__size") : self.__size = size_path(get_path_from_infos(self.__storage["type"], self.__storage["root"], self.__name)) return self.__size
var storage_depth : int
-
Expand source code
@property def storage_depth(self) -> int: return self.__storage.get("depth", None)
var storage_root : str
-
Get the pyramid's storage root.
If storage is S3, the used cluster is removed.
Returns
str
- Pyramid's storage root
Expand source code
@property def storage_root(self) -> str: """Get the pyramid's storage root. If storage is S3, the used cluster is removed. Returns: str: Pyramid's storage root """ return self.__storage["root"].split("@", 1)[ 0 ] # Suppression de l'éventuel hôte de spécification du cluster S3
var storage_s3_cluster : str
-
Get the pyramid's storage S3 cluster (host name)
Returns
str
- the host if known, None if the default one have to be used or if storage is not S3
Expand source code
@property def storage_s3_cluster(self) -> str: """Get the pyramid's storage S3 cluster (host name) Returns: str: the host if known, None if the default one have to be used or if storage is not S3 """ if self.__storage["type"] == StorageType.S3: try: return self.__storage["root"].split("@")[1] except IndexError: return None else: return None
var storage_type : StorageType
-
Get the storage type
Returns
StorageType
- FILE, S3 or CEPH
Expand source code
@property def storage_type(self) -> StorageType: """Get the storage type Returns: StorageType: FILE, S3 or CEPH """ return self.__storage["type"]
var tile_extension : str
-
Expand source code
@property def tile_extension(self) -> str: if self.__format in [ "TIFF_RAW_UINT8", "TIFF_LZW_UINT8", "TIFF_ZIP_UINT8", "TIFF_PKB_UINT8", "TIFF_RAW_FLOAT32", "TIFF_LZW_FLOAT32", "TIFF_ZIP_FLOAT32", "TIFF_PKB_FLOAT32", ]: return "tif" elif self.__format in ["TIFF_JPG_UINT8", "TIFF_JPG90_UINT8"]: return "jpg" elif self.__format == "TIFF_PNG_UINT8": return "png" elif self.__format == "TIFF_PBF_MVT": return "pbf" else: raise Exception( f"Unknown pyramid's format ({self.__format}), cannot return the tile extension" )
var tms : TileMatrixSet
-
Expand source code
@property def tms(self) -> TileMatrixSet: return self.__tms
var top_level : Level
-
Expand source code
@property def top_level(self) -> "Level": """Get the low resolution level in the pyramid Returns: Level: the top level """ return sorted(self.__levels.values(), key=lambda l: l.resolution)[-1]
var type : PyramidType
-
Expand source code
@property def type(self) -> PyramidType: """Get the pyramid's type (RASTER or VECTOR) from its format Returns: PyramidType: RASTER or VECTOR """ if self.__format == "TIFF_PBF_MVT": return PyramidType.VECTOR else: return PyramidType.RASTER
Methods
def get_infos_from_slab_path(self, path: str) ‑> Tuple[SlabType, str, int, int]
-
Get the slab's indices from its storage path
Args
path
:str
- Slab's storage path
Examples
FILE stored pyramid
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/path/to/descriptor.json") slab_type, level, column, row = self.get_infos_from_slab_path("DATA/12/00/4A/F7.tif") # (SlabType.DATA, "12", 159, 367) except Exception as e: print("Cannot load the pyramid from its descriptor and convert a slab path")
S3 stored pyramid
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/pyramid.json") slab_type, level, column, row = self.get_infos_from_slab_path("s3://bucket_name/path/to/pyramid/MASK_15_9164_5846") # (SlabType.MASK, "15", 9164, 5846) except Exception as e: print("Cannot load the pyramid from its descriptor and convert a slab path")
Returns
Tuple[SlabType, str, int, int]
- Slab's type (DATA or MASK), level identifier, slab's column and slab's row
Expand source code
def get_infos_from_slab_path(self, path: str) -> Tuple[SlabType, str, int, int]: """Get the slab's indices from its storage path Args: path (str): Slab's storage path Examples: FILE stored pyramid from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/path/to/descriptor.json") slab_type, level, column, row = self.get_infos_from_slab_path("DATA/12/00/4A/F7.tif") # (SlabType.DATA, "12", 159, 367) except Exception as e: print("Cannot load the pyramid from its descriptor and convert a slab path") S3 stored pyramid from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/pyramid.json") slab_type, level, column, row = self.get_infos_from_slab_path("s3://bucket_name/path/to/pyramid/MASK_15_9164_5846") # (SlabType.MASK, "15", 9164, 5846) except Exception as e: print("Cannot load the pyramid from its descriptor and convert a slab path") Returns: Tuple[SlabType, str, int, int]: Slab's type (DATA or MASK), level identifier, slab's column and slab's row """ if self.__storage["type"] == StorageType.FILE: parts = path.split("/") # Le partie du chemin qui contient la colonne et ligne de la dalle est à la fin, en fonction de la profondeur choisie # depth = 2 -> on doit utiliser les 3 dernières parties pour la conversion column, row = b36_path_decode("/".join(parts[-(self.__storage["depth"] + 1) :])) level = parts[-(self.__storage["depth"] + 2)] raw_slab_type = parts[-(self.__storage["depth"] + 3)] # Pour être retro compatible avec l'ancien nommage if raw_slab_type == "IMAGE": raw_slab_type = "DATA" slab_type = SlabType[raw_slab_type] return slab_type, level, column, row else: parts = re.split(r"[/_]", path) column = parts[-2] row = parts[-1] level = parts[-3] raw_slab_type = parts[-4] # Pour être retro compatible avec l'ancien nommage if raw_slab_type == "IMG": raw_slab_type = "DATA" elif raw_slab_type == "MSK": raw_slab_type = "MASK" slab_type = SlabType[raw_slab_type] return slab_type, level, int(column), int(row)
def get_level(self, level_id: str) ‑> Level
-
Get one level according to its identifier
Args
level_id
- Level identifier
Returns
The corresponding pyramid's level, None if not present
Expand source code
def get_level(self, level_id: str) -> "Level": """Get one level according to its identifier Args: level_id: Level identifier Returns: The corresponding pyramid's level, None if not present """ return self.__levels.get(level_id, None)
def get_levels(self, bottom_id: str = None, top_id: str = None) ‑> List[Level]
-
Get sorted levels in the provided range from bottom to top
Args
bottom_id
:str, optionnal
- specific bottom level id. Defaults to None.
top_id
:str, optionnal
- specific top level id. Defaults to None.
Raises
Exception
- Provided levels are not consistent (bottom > top or not in the pyramid)
Examples
All levels
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") levels = pyramid.get_levels() except Exception as e: print("Cannot load the pyramid from its descriptor and get levels")
From pyramid's bottom to provided top (level 5)
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") levels = pyramid.get_levels(None, "5") except Exception as e: print("Cannot load the pyramid from its descriptor and get levels")
Returns
List[Level]
- asked sorted levels
Expand source code
def get_levels(self, bottom_id: str = None, top_id: str = None) -> List[Level]: """Get sorted levels in the provided range from bottom to top Args: bottom_id (str, optionnal): specific bottom level id. Defaults to None. top_id (str, optionnal): specific top level id. Defaults to None. Raises: Exception: Provided levels are not consistent (bottom > top or not in the pyramid) Examples: All levels from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") levels = pyramid.get_levels() except Exception as e: print("Cannot load the pyramid from its descriptor and get levels") From pyramid's bottom to provided top (level 5) from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") levels = pyramid.get_levels(None, "5") except Exception as e: print("Cannot load the pyramid from its descriptor and get levels") Returns: List[Level]: asked sorted levels """ sorted_levels = sorted(self.__levels.values(), key=lambda l: l.resolution) levels = [] begin = False if bottom_id is None: # Pas de niveau du bas fourni, on commence tout en bas begin = True else: if self.get_level(bottom_id) is None: raise Exception( f"Pyramid {self.name} does not contain the provided bottom level {bottom_id}" ) if top_id is not None and self.get_level(top_id) is None: raise Exception(f"Pyramid {self.name} does not contain the provided top level {top_id}") end = False for l in sorted_levels: if not begin and l.id == bottom_id: begin = True if begin: levels.append(l) if top_id is not None and l.id == top_id: end = True break else: continue if top_id is None: # Pas de niveau du haut fourni, on a été jusqu'en haut et c'est normal end = True if not begin or not end: raise Exception( f"Provided levels ids are not consistent to extract levels from the pyramid {self.name}" ) return levels
def get_slab_path_from_infos(self, slab_type: SlabType, level: str, column: int, row: int, full: bool = True) ‑> str
-
Get slab's storage path from the indices
Args
slab_type
:SlabType
- DATA or MASK
level
:str
- Level identifier
column
:int
- Slab's column
row
:int
- Slab's row
full
:bool
, optional- Full path or just relative path from pyramid storage root. Defaults to True.
Returns
str
- Absolute or relative slab's storage path
Expand source code
def get_slab_path_from_infos( self, slab_type: SlabType, level: str, column: int, row: int, full: bool = True ) -> str: """Get slab's storage path from the indices Args: slab_type (SlabType): DATA or MASK level (str): Level identifier column (int): Slab's column row (int): Slab's row full (bool, optional): Full path or just relative path from pyramid storage root. Defaults to True. Returns: str: Absolute or relative slab's storage path """ if self.__storage["type"] == StorageType.FILE: slab_path = os.path.join( slab_type.value, level, b36_path_encode(column, row, self.__storage["depth"]) ) else: slab_path = f"{slab_type.value}_{level}_{column}_{row}" if full: return get_path_from_infos( self.__storage["type"], self.__storage["root"], self.__name, slab_path ) else: return slab_path
def get_tile_data_binary(self, level: str, column: int, row: int) ‑> str
-
Get a pyramid's tile as binary string
To get a tile, 3 steps : * calculate slab path from tile index * read slab index to get offsets and sizes of slab's tiles * read the tile into the slab
Args
level
:str
- Tile's level
column
:int
- Tile's column
row
:int
- Tile's row
Limitations
Pyramids with one-tile slab are not handled
Examples
FILE stored raster pyramid, to extract a tile containing a point and save it as independent image
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/SCAN1000.json") level, col, row, pcol, prow = pyramid.get_tile_indices(992904.46, 6733643.15, "9", srs = "IGNF:LAMB93") data = pyramid.get_tile_data_binary(level, col, row) if data is None: print("No data") else: tile_name = f"tile_{level}_{col}_{row}.{pyramid.tile_extension}" with open(tile_name, "wb") as image: image.write(data) print (f"Tile written in {tile_name}") except Exception as e: print("Cannot save a pyramid's tile : {e}")
Raises
Exception
- Level not found in the pyramid
NotImplementedError
- Pyramid owns one-tile slabs
MissingEnvironmentError
- Missing object storage informations
StorageError
- Storage read issue
Returns
str
- data, as binary string, None if no data
Expand source code
def get_tile_data_binary(self, level: str, column: int, row: int) -> str: """Get a pyramid's tile as binary string To get a tile, 3 steps : * calculate slab path from tile index * read slab index to get offsets and sizes of slab's tiles * read the tile into the slab Args: level (str): Tile's level column (int): Tile's column row (int): Tile's row Limitations: Pyramids with one-tile slab are not handled Examples: FILE stored raster pyramid, to extract a tile containing a point and save it as independent image from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/SCAN1000.json") level, col, row, pcol, prow = pyramid.get_tile_indices(992904.46, 6733643.15, "9", srs = "IGNF:LAMB93") data = pyramid.get_tile_data_binary(level, col, row) if data is None: print("No data") else: tile_name = f"tile_{level}_{col}_{row}.{pyramid.tile_extension}" with open(tile_name, "wb") as image: image.write(data) print (f"Tile written in {tile_name}") except Exception as e: print("Cannot save a pyramid's tile : {e}") Raises: Exception: Level not found in the pyramid NotImplementedError: Pyramid owns one-tile slabs MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue Returns: str: data, as binary string, None if no data """ level_object = self.get_level(level) if level_object is None: raise Exception(f"No level {level} in the pyramid") if level_object.slab_width == 1 and level_object.slab_height == 1: raise NotImplementedError(f"One-tile slab pyramid is not handled") if not level_object.is_in_limits(column, row): return None # Indices de la dalle slab_column = column // level_object.slab_width slab_row = row // level_object.slab_height # Indices de la tuile dans la dalle relative_tile_column = column % level_object.slab_width relative_tile_row = row % level_object.slab_height # Numéro de la tuile dans le header tile_index = relative_tile_row * level_object.slab_width + relative_tile_column # Calcul du chemin de la dalle contenant la tuile voulue slab_path = self.get_slab_path_from_infos(SlabType.DATA, level, slab_column, slab_row) # Récupération des offset et tailles des tuiles dans la dalle # Une dalle ROK4 a une en-tête fixe de 2048 octets, # puis sont stockés les offsets (chacun sur 4 octets) # puis les tailles (chacune sur 4 octets) try: binary_index = get_data_binary( slab_path, ( ROK4_IMAGE_HEADER_SIZE, 2 * 4 * level_object.slab_width * level_object.slab_height, ), ) except FileNotFoundError as e: # L'absence de la dalle est gérée comme simplement une absence de données return None offsets = numpy.frombuffer( binary_index, dtype=numpy.dtype("uint32"), count=level_object.slab_width * level_object.slab_height, ) sizes = numpy.frombuffer( binary_index, dtype=numpy.dtype("uint32"), offset=4 * level_object.slab_width * level_object.slab_height, count=level_object.slab_width * level_object.slab_height, ) if sizes[tile_index] == 0: return None return get_data_binary(slab_path, (offsets[tile_index], sizes[tile_index]))
def get_tile_data_raster(self, level: str, column: int, row: int) ‑> numpy.ndarray
-
Get a raster pyramid's tile as 3-dimension numpy ndarray
First dimension is the row, second one is column, third one is band.
Args
level
:str
- Tile's level
column
:int
- Tile's column
row
:int
- Tile's row
Limitations
Packbits (pyramid formats TIFF_PKB_FLOAT32 and TIFF_PKB_UINT8) and LZW (pyramid formats TIFF_LZW_FLOAT32 and TIFF_LZW_UINT8) compressions are not handled.
Raises
Exception
- Cannot get raster data for a vector pyramid
Exception
- Level not found in the pyramid
NotImplementedError
- Pyramid owns one-tile slabs
NotImplementedError
- Raster pyramid format not handled
MissingEnvironmentError
- Missing object storage informations
StorageError
- Storage read issue
FormatError
- Cannot decode tile
Examples
FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") data = pyramid.get_tile_data_raster(level, col, row) if data is None: print("No data") else: print(data[prow][pcol]) except Exception as e: print("Cannot get a pyramid's pixel value : {e}")
Returns
str
- data, as numpy array, None if no data
Expand source code
def get_tile_data_raster(self, level: str, column: int, row: int) -> numpy.ndarray: """Get a raster pyramid's tile as 3-dimension numpy ndarray First dimension is the row, second one is column, third one is band. Args: level (str): Tile's level column (int): Tile's column row (int): Tile's row Limitations: Packbits (pyramid formats TIFF_PKB_FLOAT32 and TIFF_PKB_UINT8) and LZW (pyramid formats TIFF_LZW_FLOAT32 and TIFF_LZW_UINT8) compressions are not handled. Raises: Exception: Cannot get raster data for a vector pyramid Exception: Level not found in the pyramid NotImplementedError: Pyramid owns one-tile slabs NotImplementedError: Raster pyramid format not handled MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue FormatError: Cannot decode tile Examples: FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") data = pyramid.get_tile_data_raster(level, col, row) if data is None: print("No data") else: print(data[prow][pcol]) except Exception as e: print("Cannot get a pyramid's pixel value : {e}") Returns: str: data, as numpy array, None if no data """ if self.type == PyramidType.VECTOR: raise Exception("Cannot get tile as raster data : it's a vector pyramid") binary_tile = self.get_tile_data_binary(level, column, row) if binary_tile is None: return None level_object = self.get_level(level) if self.__format == "TIFF_JPG_UINT8" or self.__format == "TIFF_JPG90_UINT8": try: img = Image.open(io.BytesIO(binary_tile)) except Exception as e: raise FormatError("JPEG", "binary tile", e) data = numpy.asarray(img) elif self.__format == "TIFF_RAW_UINT8": data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("uint8")) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) elif self.__format == "TIFF_PNG_UINT8": try: img = Image.open(io.BytesIO(binary_tile)) except Exception as e: raise FormatError("PNG", "binary tile", e) data = numpy.asarray(img) elif self.__format == "TIFF_ZIP_UINT8": try: data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("uint8")) except Exception as e: raise FormatError("ZIP", "binary tile", e) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) elif self.__format == "TIFF_ZIP_FLOAT32": try: data = numpy.frombuffer(zlib.decompress(binary_tile), dtype=numpy.dtype("float32")) except Exception as e: raise FormatError("ZIP", "binary tile", e) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) elif self.__format == "TIFF_RAW_FLOAT32": data = numpy.frombuffer(binary_tile, dtype=numpy.dtype("float32")) data.shape = ( level_object.tile_matrix.tile_size[0], level_object.tile_matrix.tile_size[1], self.__raster_specifications["channels"], ) else: raise NotImplementedError(f"Cannot get tile as raster data for format {self.__format}") return data
def get_tile_data_vector(self, level: str, column: int, row: int) ‑> Dict[~KT, ~VT]
-
Get a vector pyramid's tile as GeoJSON dictionnary
Args
level
:str
- Tile's level
column
:int
- Tile's column
row
:int
- Tile's row
Raises
Exception
- Cannot get vector data for a raster pyramid
Exception
- Level not found in the pyramid
NotImplementedError
- Pyramid owns one-tile slabs
NotImplementedError
- Vector pyramid format not handled
MissingEnvironmentError
- Missing object storage informations
StorageError
- Storage read issue
FormatError
- Cannot decode tile
Examples
S3 stored vector pyramid, to print a tile as GeoJSON
from rok4.Pyramid import Pyramid import json try: pyramid = Pyramid.from_descriptor("s3://pyramids/vectors/BDTOPO.json") level, col, row, pcol, prow = pyramid.get_tile_indices(40.325, 3.123, srs = "EPSG:4326") data = pyramid.get_tile_data_vector(level, col, row) if data is None: print("No data") else: print(json.dumps(data)) except Exception as e: print("Cannot print a vector pyramid's tile as GeoJSON : {e}")
Returns
str
- data, as GeoJSON dictionnary. None if no data
Expand source code
def get_tile_data_vector(self, level: str, column: int, row: int) -> Dict: """Get a vector pyramid's tile as GeoJSON dictionnary Args: level (str): Tile's level column (int): Tile's column row (int): Tile's row Raises: Exception: Cannot get vector data for a raster pyramid Exception: Level not found in the pyramid NotImplementedError: Pyramid owns one-tile slabs NotImplementedError: Vector pyramid format not handled MissingEnvironmentError: Missing object storage informations StorageError: Storage read issue FormatError: Cannot decode tile Examples: S3 stored vector pyramid, to print a tile as GeoJSON from rok4.Pyramid import Pyramid import json try: pyramid = Pyramid.from_descriptor("s3://pyramids/vectors/BDTOPO.json") level, col, row, pcol, prow = pyramid.get_tile_indices(40.325, 3.123, srs = "EPSG:4326") data = pyramid.get_tile_data_vector(level, col, row) if data is None: print("No data") else: print(json.dumps(data)) except Exception as e: print("Cannot print a vector pyramid's tile as GeoJSON : {e}") Returns: str: data, as GeoJSON dictionnary. None if no data """ if self.type == PyramidType.RASTER: raise Exception("Cannot get tile as vector data : it's a raster pyramid") binary_tile = self.get_tile_data_binary(level, column, row) if binary_tile is None: return None level_object = self.get_level(level) if self.__format == "TIFF_PBF_MVT": try: data = mapbox_vector_tile.decode(binary_tile) except Exception as e: raise FormatError("PBF (MVT)", "binary tile", e) else: raise NotImplementedError(f"Cannot get tile as vector data for format {self.__format}") return data
def get_tile_indices(self, x: float, y: float, level: str = None, **kwargs) ‑> Tuple[str, int, int, int, int]
-
Get pyramid's tile and pixel indices from point's coordinates
Used coordinates system have to be the pyramid one. If EPSG:4326, x is latitude and y longitude.
Args
x
:float
- point's x
y
:float
- point's y
level
:str
, optional- Pyramid's level to take into account, the bottom one if None . Defaults to None.
**srs
:string
- spatial reference system of provided coordinates, with authority and code (same as the pyramid's one if not provided)
Raises
Exception
- Cannot find level to calculate indices
RuntimeError
- Provided SRS is invalid for OSR
Examples
FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") data = pyramid.get_tile_data_raster(level, col, row) if data is None: print("No data") else: print(data[prow][pcol]) except Exception as e: print("Cannot get a pyramid's pixel value : {e}")
Returns
Tuple[str, int, int, int, int]
- Level identifier, tile's column, tile's row, pixel's (in the tile) column, pixel's row
Expand source code
def get_tile_indices( self, x: float, y: float, level: str = None, **kwargs ) -> Tuple[str, int, int, int, int]: """Get pyramid's tile and pixel indices from point's coordinates Used coordinates system have to be the pyramid one. If EPSG:4326, x is latitude and y longitude. Args: x (float): point's x y (float): point's y level (str, optional): Pyramid's level to take into account, the bottom one if None . Defaults to None. **srs (string): spatial reference system of provided coordinates, with authority and code (same as the pyramid's one if not provided) Raises: Exception: Cannot find level to calculate indices RuntimeError: Provided SRS is invalid for OSR Examples: FILE stored DTM (raster) pyramid, to get the altitude value at a point in the best level from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("/data/pyramids/RGEALTI.json") level, col, row, pcol, prow = pyramid.get_tile_indices(44, 5, srs = "EPSG:4326") data = pyramid.get_tile_data_raster(level, col, row) if data is None: print("No data") else: print(data[prow][pcol]) except Exception as e: print("Cannot get a pyramid's pixel value : {e}") Returns: Tuple[str, int, int, int, int]: Level identifier, tile's column, tile's row, pixel's (in the tile) column, pixel's row """ level_object = self.bottom_level if level is not None: level_object = self.get_level(level) if level_object is None: raise Exception(f"Cannot found the level to calculate indices") if ( "srs" in kwargs and kwargs["srs"] is not None and kwargs["srs"].upper() != self.__tms.srs.upper() ): sr = srs_to_spatialreference(kwargs["srs"]) x, y = reproject_point((x, y), sr, self.__tms.sr) return (level_object.id,) + level_object.tile_matrix.point_to_indices(x, y)
def list_generator(self) ‑> Iterator[Tuple[Tuple[SlabType, str, int, int], Dict[~KT, ~VT]]]
-
Get list content
List is copied as temporary file, roots are read and informations about each slab is returned. If list is already loaded, we yield the cached content
Examples
S3 stored descriptor
from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") for (slab_type, level, column, row), infos in pyramid.list_generator(): print(infos) except Exception as e: print("Cannot load the pyramid from its descriptor and read the list")
Yields
Iterator[Tuple[Tuple[SlabType,str,int,int], Dict]]
- Slab indices and storage informations
Value example:
( (<SlabType.DATA: 'DATA'>, '18', 5424, 7526), { 'link': False, 'md5': None, 'root': 'pyramids@localhost:9000/LIMADM', 'slab': 'DATA_18_5424_7526' } )
Expand source code
def list_generator(self) -> Iterator[Tuple[Tuple[SlabType, str, int, int], Dict]]: """Get list content List is copied as temporary file, roots are read and informations about each slab is returned. If list is already loaded, we yield the cached content Examples: S3 stored descriptor from rok4.Pyramid import Pyramid try: pyramid = Pyramid.from_descriptor("s3://bucket_name/path/to/descriptor.json") for (slab_type, level, column, row), infos in pyramid.list_generator(): print(infos) except Exception as e: print("Cannot load the pyramid from its descriptor and read the list") Yields: Iterator[Tuple[Tuple[SlabType,str,int,int], Dict]]: Slab indices and storage informations Value example: ( (<SlabType.DATA: 'DATA'>, '18', 5424, 7526), { 'link': False, 'md5': None, 'root': 'pyramids@localhost:9000/LIMADM', 'slab': 'DATA_18_5424_7526' } ) """ if self.__content["loaded"]: for slab, infos in self.__content["cache"].items(): yield slab, infos else: # Copie de la liste dans un fichier temporaire (cette liste peut être un objet) list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) list_file = list_obj.name copy(self.__list, f"file://{list_file}") list_obj.close() roots = {} s3_cluster = self.storage_s3_cluster with open(list_file, "r") as listin: # Lecture des racines for line in listin: line = line.rstrip() if line == "#": break root_id, root_path = line.split("=", 1) if s3_cluster is None: roots[root_id] = root_path else: # On a un nom de cluster S3, on l'ajoute au nom du bucket dans les racines root_bucket, root_path = root_path.split("/", 1) roots[root_id] = f"{root_bucket}@{s3_cluster}/{root_path}" # Lecture des dalles for line in listin: line = line.rstrip() parts = line.split(" ", 1) slab_path = parts[0] slab_md5 = None if len(parts) == 2: slab_md5 = parts[1] root_id, slab_path = slab_path.split("/", 1) slab_type, level, column, row = self.get_infos_from_slab_path(slab_path) infos = { "root": roots[root_id], "link": root_id != "0", "slab": slab_path, "md5": slab_md5, } yield ((slab_type, level, column, row), infos) remove(f"file://{list_file}")
def load_list(self) ‑> None
-
Load list content and cache it
If list is already loaded, nothing done
Expand source code
def load_list(self) -> None: """Load list content and cache it If list is already loaded, nothing done """ if self.__content["loaded"]: return for slab, infos in self.list_generator(): self.__content["cache"][slab] = infos self.__content["loaded"] = True
def write_descriptor(self) ‑> None
-
Write the pyramid's descriptor to the final location (in the pyramid's storage root)
Expand source code
def write_descriptor(self) -> None: """Write the pyramid's descriptor to the final location (in the pyramid's storage root)""" content = json.dumps(self.serializable) put_data_str(content, self.__descriptor)
class PyramidType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Pyramid's data type
Expand source code
class PyramidType(Enum): """Pyramid's data type""" RASTER = "RASTER" VECTOR = "VECTOR"
Ancestors
- enum.Enum
Class variables
var RASTER
var VECTOR
class SlabType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Slab's type
Expand source code
class SlabType(Enum): """Slab's type""" DATA = "DATA" # Slab of data, raster or vector MASK = "MASK" # Slab of mask, only for raster pyramid, image with one band : 0 is nodata, other values are data
Ancestors
- enum.Enum
Class variables
var DATA
var MASK