Coverage for src/rok4_tools/tmsizer_utils/processors/reduce.py: 81%

62 statements  

« prev     ^ index     » next       coverage.py v7.5.0, created at 2024-04-30 07:30 +0000

1"""Provide processor combining data. Input data is transformed and accumulated and the result is yielded juste once, at the end. 

2 

3The module contains the following classes: 

4 

5- `CountProcessor` - Count the number of item read from the input processor 

6- `HeatmapProcessor` - Generate an heat map with all point coordinate read from the input processor 

7""" 

8import sys 

9import numpy as np 

10from typing import Dict, List, Tuple, Union, Iterator 

11from math import floor 

12 

13import rasterio 

14from rasterio.transform import from_origin 

15from rasterio.io import MemoryFile 

16from rasterio import logging 

17logging.getLogger().setLevel(logging.ERROR) 

18 

19from rok4_tools.tmsizer_utils.processors.processor import Processor 

20 

21class CountProcessor(Processor): 

22 """Processor counting the number of item read from the input processor 

23 

24 All input formats are allowed and output format is "COUNT" 

25 

26 Attributes: 

27 __input (Processor): Processor from which data is read 

28 """ 

29 

30 def __init__(self, input: Processor): 

31 """Constructor method 

32 

33 Args: 

34 input (Processor): Processor from which data is read 

35 """ 

36 

37 super().__init__("COUNT") 

38 

39 self.__input = input 

40 

41 def process(self) -> Iterator[int]: 

42 """Count number of input item 

43 

44 Yield count only once at the end 

45 

46 Examples: 

47 

48 Get input items count 

49 

50 from rok4_tools.tmsizer_utils.processors.reduce import CountProcessor 

51 

52 try: 

53 # Creation of Processor source_processor 

54 processor = CountProcessor(source_processor, level="15", format="GeoJSON" ) 

55 count = processor.process().__next__() 

56 print(f"{count} items in source_processor") 

57 

58 except Exception as e: 

59 print("{e}") 

60 

61 Yields: 

62 Iterator[int]: the count of input items 

63 """ 

64 

65 for item in self.__input.process(): 

66 self._processed += 1 

67 

68 yield self._processed 

69 

70 def __str__(self) -> str: 

71 return f"CountProcessor : {self._processed} {self.__input.format} items counted" 

72 

73 

74class HeatmapProcessor(Processor): 

75 """Processor counting the number of item read from the input processor 

76 

77 Accepted input format is "POINT" and output format is "FILELIKE". Output file-like object is an in-memory GeoTIFF 

78 

79 Attributes: 

80 __input (Processor): Processor from which data is read 

81 __bbox (Tuple[float, float, float, float]): Bounding box of the heat map (xmin,ymin,xmax,ymax) 

82 __dimensions (Tuple[int, int]): Pixel dimensions of the heat map (width, height) 

83 __resolutions (Tuple[float, float]): Pixel resolution (x resolution, y resolution) 

84 """ 

85 

86 input_formats_allowed = ["POINT"] 

87 

88 def __init__(self, input: Processor, **options): 

89 """Constructor method 

90 

91 Args: 

92 input (Processor): Processor from which data is read 

93 **bbox (str): Bounding box of the heat map. Format "<xmin>,<ymin>,<xmax>,<ymax>". Coordinates system have to be the pivot TMS' one 

94 **dimensions (str): Pixel dimensions of the heat map.Format "<width>x<height>" 

95 

96 Raises: 

97 ValueError: Input format is not allowed 

98 KeyError: A mandatory option is missing 

99 ValueError: A mandatory option is not valid 

100 ValueError: Provided level is not in the pivot TMS 

101 """ 

102 

103 if input.format not in self.input_formats_allowed: 

104 raise Exception(f"Input format {input.format} is not handled for HeatmapProcessor : allowed formats are {self.input_formats_allowed}") 

105 

106 super().__init__("FILELIKE") 

107 

108 self.__input = input 

109 

110 try: 

111 try: 

112 self.__bbox = [float(c) for c in options["bbox"].split(",")] 

113 self.__bbox = tuple(self.__bbox) 

114 except ValueError as e: 

115 raise ValueError(f"Option 'bbox' contains non float values : {e}") 

116 

117 if len(self.__bbox) != 4 or self.__bbox[0] >= self.__bbox[2] or self.__bbox[1] >= self.__bbox[3]: 

118 raise ValueError(f"Option 'bbox' have to be provided with format <xmin>,<ymin>,<xmax>,<ymax> (floats, min < max)") 

119 

120 try: 

121 self.__dimensions = [int(d) for d in options["dimensions"].split("x")] 

122 self.__dimensions = tuple(self.__dimensions) 

123 except ValueError as e: 

124 raise ValueError(f"Option 'dimensions' contains non integer values : {e}") 

125 

126 if len(self.__dimensions) != 2 or self.__dimensions[0] <= 0 or self.__dimensions[1] <= 0: 

127 raise ValueError(f"Option 'dimensions' have to be provided with format <width>x<height> (positive integers)") 

128 

129 self.__resolutions = ( 

130 (self.__bbox[2] - self.__bbox[0]) / self.__dimensions[0], 

131 (self.__bbox[3] - self.__bbox[1]) / self.__dimensions[1] 

132 ) 

133 

134 except KeyError as e: 

135 raise KeyError(f"Option {e} is required for a heatmap processing") 

136 

137 def process(self) -> Iterator[MemoryFile]: 

138 """Read point coordinates from the input processor and accumule them as a heat map 

139 

140 Points outsides the provided bounding box are ignored. 

141 

142 Examples: 

143 

144 Get intersecting tiles' indices 

145 

146 from rok4_tools.tmsizer_utils.processors.reduce import HeatmapProcessor 

147 

148 try: 

149 # Creation of Processor source_processor with format POINT 

150  

151 processor = HeatmapProcessor(source_processor, bbox="65000,6100000,665000,6500000", dimensions="600x400" ) 

152 f = processor.process().__next__() 

153 

154 with open("hello.txt", "w") as my_file: 

155 my_file.write(f.read()) 

156  

157 except Exception as e: 

158 print("{e}") 

159 

160 Yields: 

161 Iterator[rasterio.io.MemoryFile]: In-memory GeoTIFF 

162 """ 

163 

164 data = np.zeros((self.__dimensions[1], self.__dimensions[0]), dtype=np.uint32) 

165 

166 if self.__input.format == "POINT": 

167 

168 for item in self.__input.process(): 

169 self._processed += 1 

170 

171 (x_center, y_center) = item 

172 

173 if x_center > self.__bbox[2] or y_center > self.__bbox[3] or x_center < self.__bbox[0] or y_center < self.__bbox[1]: 

174 continue 

175 

176 pcol = floor((x_center - self.__bbox[0]) / self.__resolutions[0]) 

177 prow = floor((self.__bbox[3] - y_center) / self.__resolutions[1]) 

178 

179 data[prow][pcol] += 1 

180 

181 memfile = MemoryFile() 

182 with memfile.open( 

183 driver='GTiff', 

184 height=data.shape[0], 

185 width=data.shape[1], 

186 count=1, 

187 dtype=data.dtype, 

188 crs=rasterio.CRS.from_string(self.tms.srs), 

189 nodata=0, 

190 transform=from_origin(self.__bbox[0], self.__bbox[3], self.__resolutions[0], self.__resolutions[1]), 

191 ) as dataset: 

192 dataset.write(data, indexes=1) 

193 

194 yield memfile 

195 

196 def __str__(self) -> str: 

197 return f"HeatmapProcessor : {self._processed} hits on image with dimensions {self.__dimensions} and bbox {self.__bbox}"