Module rok4_tools.tmsizer_utils.processors.reduce

Provide processor combining data. Input data is transformed and accumulated and the result is yielded juste once, at the end.

The module contains the following classes:

  • CountProcessor - Count the number of item read from the input processor
  • HeatmapProcessor - Generate an heat map with all point coordinate read from the input processor

Classes

class CountProcessor (input: Processor)

Processor counting the number of item read from the input processor

All input formats are allowed and output format is "COUNT"

Attributes

__input : Processor
Processor from which data is read

Constructor method

Args

input : Processor
Processor from which data is read
Expand source code
class CountProcessor(Processor):
    """Processor counting the number of item read from the input processor

    All input formats are allowed and output format is "COUNT"

    Attributes:
        __input (Processor): Processor from which data is read
    """

    def __init__(self, input: Processor):
        """Constructor method

        Args:
            input (Processor): Processor from which data is read
        """  

        super().__init__("COUNT")

        self.__input = input

    def process(self) -> Iterator[int]:
        """Count number of input item

        Yield count only once at the end

        Examples:

            Get input items count

                from rok4_tools.tmsizer_utils.processors.reduce import CountProcessor

                try:
                    # Creation of Processor source_processor
                    processor = CountProcessor(source_processor, level="15", format="GeoJSON" )
                    count = processor.process().__next__()
                    print(f"{count} items in source_processor")

                except Exception as e:
                    print("{e}")

        Yields:
            Iterator[int]: the count of input items
        """  

        for item in self.__input.process():
            self._processed += 1

        yield self._processed

    def __str__(self) -> str:
        return f"CountProcessor : {self._processed} {self.__input.format} items counted"

Ancestors

Methods

def process(self) ‑> Iterator[int]

Count number of input item

Yield count only once at the end

Examples

Get input items count

from rok4_tools.tmsizer_utils.processors.reduce import CountProcessor

try:
    # Creation of Processor source_processor
    processor = CountProcessor(source_processor, level="15", format="GeoJSON" )
    count = processor.process().__next__()
    print(f"{count} items in source_processor")

except Exception as e:
    print("{e}")

Yields

Iterator[int]
the count of input items

Inherited members

class HeatmapProcessor (input: Processor, **options)

Processor counting the number of item read from the input processor

Accepted input format is "POINT" and output format is "FILELIKE". Output file-like object is an in-memory GeoTIFF

Attributes

__input : Processor
Processor from which data is read
__bbox : Tuple[float, float, float, float]
Bounding box of the heat map (xmin,ymin,xmax,ymax)
__dimensions : Tuple[int, int]
Pixel dimensions of the heat map (width, height)
__resolutions : Tuple[float, float]
Pixel resolution (x resolution, y resolution)

Constructor method

Args

input : Processor
Processor from which data is read
**bbox : str
Bounding box of the heat map. Format ",,,". Coordinates system have to be the pivot TMS' one
**dimensions : str
Pixel dimensions of the heat map.Format "x"

Raises

ValueError
Input format is not allowed
KeyError
A mandatory option is missing
ValueError
A mandatory option is not valid
ValueError
Provided level is not in the pivot TMS
Expand source code
class HeatmapProcessor(Processor):
    """Processor counting the number of item read from the input processor

    Accepted input format is "POINT" and output format is "FILELIKE". Output file-like object is an in-memory GeoTIFF

    Attributes:
        __input (Processor): Processor from which data is read
        __bbox (Tuple[float, float, float, float]): Bounding box of the heat map (xmin,ymin,xmax,ymax)
        __dimensions (Tuple[int, int]): Pixel dimensions of the heat map (width, height)
        __resolutions (Tuple[float, float]): Pixel resolution (x resolution, y resolution)
    """

    input_formats_allowed = ["POINT"]

    areas = {
        "EPSG:3857": {
            "FXX": [-649498, 5048729, 1173394, 6661417]
        }
    }

    def __init__(self, input: Processor, **options):
        """Constructor method

        Args:
            input (Processor): Processor from which data is read
            **bbox (str): Bounding box of the heat map. Format "<xmin>,<ymin>,<xmax>,<ymax>". Coordinates system have to be the pivot TMS' one
            **dimensions (str): Pixel dimensions of the heat map.Format "<width>x<height>"

        Raises:
            ValueError: Input format is not allowed
            KeyError: A mandatory option is missing
            ValueError: A mandatory option is not valid
            ValueError: Provided level is not in the pivot TMS
        """  

        if input.format not in self.input_formats_allowed:
            raise Exception(f"Input format {input.format} is not handled for HeatmapProcessor : allowed formats are {self.input_formats_allowed}")

        super().__init__("FILELIKE")

        self.__input = input

        if "bbox" in options:
            try:
                self.__bbox = [float(c) for c in options["bbox"].split(",")]
                self.__bbox = tuple(self.__bbox)
            except ValueError as e:
                raise ValueError(f"Option 'bbox' contains non float values : {e}")

            if len(self.__bbox) != 4 or self.__bbox[0] >= self.__bbox[2] or self.__bbox[1] >= self.__bbox[3]:
                raise ValueError(f"Option 'bbox' have to be provided with format <xmin>,<ymin>,<xmax>,<ymax> (floats, min < max)")

        elif "area" in options:
            try:
                self.__bbox = self.areas[self.tms.srs][options["area"]]
            except KeyError as e:
                if self.tms.srs in self.areas:
                    raise ValueError(f"Area '{options['area']}' is not available for the TMS coordinates system ({self.tms.srs}): available areas are {', '.join(self.areas[self.tms.srs].keys())}")
                else :
                    raise ValueError(f"No defined areas for the TMS coordinates system ({self.tms.srs})")
        else:
            raise KeyError(f"Option 'bbox' or 'area' is required for a heatmap processing")

        if "dimensions" in options:
            try:
                self.__dimensions = [int(d) for d in options["dimensions"].split("x")]
                self.__dimensions = tuple(self.__dimensions)
            except ValueError as e:
                raise ValueError(f"Option 'dimensions' contains non integer values : {e}")

            if len(self.__dimensions) != 2 or self.__dimensions[0] <= 0 or self.__dimensions[1] <= 0:
                raise ValueError(f"Option 'dimensions' have to be provided with format <width>x<height> (positive integers)")

            self.__resolutions = (
                (self.__bbox[2] - self.__bbox[0]) / self.__dimensions[0],
                (self.__bbox[3] - self.__bbox[1]) / self.__dimensions[1]
            )
        elif "level" in options:
            level = self.tms.get_level(options["level"])
            if level is None:
                raise ValueError(f"The provided level '{options['dimensions']}' (to have one pixel per tile) is not in the TMS")

            # On va caler la bbox pour qu'elle coïncide avec les limites de tuiles du niveau demandé
            (col_min, row_min, col_max, row_max) = level.bbox_to_tiles(self.__bbox)

            # Calage du coin en bas à gauche
            (xmin, ymin, xmax, ymax) = level.tile_to_bbox(col_min, row_max)
            self.__bbox[0] = xmin
            self.__bbox[1] = ymin

            # Calage du coin en haut à droite
            (xmin, ymin, xmax, ymax) = level.tile_to_bbox(col_max, row_min)
            self.__bbox[2] = xmax
            self.__bbox[3] = ymax

            self.__resolutions = (
                xmax - xmin,
                ymax - ymin
            )

            self.__dimensions = (
                int((self.__bbox[2] - self.__bbox[0]) / self.__resolutions[0]),
                int((self.__bbox[3] - self.__bbox[1]) / self.__resolutions[1])
            )

        else:
            raise KeyError(f"Option 'dimensions' or 'level' is required for a heatmap processing")

        if self.__dimensions[0] > 10000 or self.__dimensions[1] > 10000:
            raise ValueError(f"Heatmap dimensions have to be less than 10 000 x 10 000: here it's {self.__dimensions}")

    def process(self) -> Iterator[MemoryFile]:
        """Read point coordinates from the input processor and accumule them as a heat map

        Points outsides the provided bounding box are ignored.

        Examples:

            Get intersecting tiles' indices

                from rok4_tools.tmsizer_utils.processors.reduce import HeatmapProcessor

                try:
                    # Creation of Processor source_processor with format POINT
                    
                    processor = HeatmapProcessor(source_processor, bbox="65000,6100000,665000,6500000", dimensions="600x400" )
                    f = processor.process().__next__()

                    with open("hello.txt", "w") as my_file:
                        my_file.write(f.read())
                    
                except Exception as e:
                    print("{e}")

        Yields:
            Iterator[rasterio.io.MemoryFile]: In-memory GeoTIFF
        """  

        data = np.zeros((self.__dimensions[1], self.__dimensions[0]), dtype=np.uint32)

        if self.__input.format == "POINT":

            for item in self.__input.process():
                self._processed += 1

                (x_center, y_center) = item

                if x_center > self.__bbox[2] or y_center > self.__bbox[3] or x_center < self.__bbox[0] or y_center < self.__bbox[1]:
                    continue

                pcol = floor((x_center - self.__bbox[0]) / self.__resolutions[0])
                prow = floor((self.__bbox[3] - y_center) / self.__resolutions[1])

                data[prow][pcol] += 1

        memfile = MemoryFile()
        with memfile.open(
            driver='GTiff',
            height=data.shape[0],
            width=data.shape[1],
            count=1,
            dtype=data.dtype,
            crs=rasterio.CRS.from_string(self.tms.srs),
            nodata=0,
            transform=from_origin(self.__bbox[0], self.__bbox[3], self.__resolutions[0], self.__resolutions[1]),
        ) as dataset:
            dataset.write(data, indexes=1)

        yield memfile

    def __str__(self) -> str:
        return f"HeatmapProcessor : {self._processed} hits on image with dimensions {self.__dimensions} and bbox {self.__bbox} (resolutions {self.__resolutions})"

Ancestors

Class variables

var areas
var input_formats_allowed

Methods

def process(self) ‑> Iterator[rasterio.io.MemoryFile]

Read point coordinates from the input processor and accumule them as a heat map

Points outsides the provided bounding box are ignored.

Examples

Get intersecting tiles' indices

from rok4_tools.tmsizer_utils.processors.reduce import HeatmapProcessor

try:
    # Creation of Processor source_processor with format POINT

    processor = HeatmapProcessor(source_processor, bbox="65000,6100000,665000,6500000", dimensions="600x400" )
    f = processor.process().__next__()

    with open("hello.txt", "w") as my_file:
        my_file.write(f.read())

except Exception as e:
    print("{e}")

Yields

Iterator[rasterio.io.MemoryFile]
In-memory GeoTIFF

Inherited members