Coverage for src/rok4/Utils.py: 97%

107 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-01-29 10:29 +0100

1"""Provide functions to manipulate OGR / OSR entities 

2""" 

3 

4import os 

5import re 

6 

7from typing import Dict, List, Tuple, Union 

8from osgeo import ogr, osr, gdal 

9from enum import Enum 

10 

11ogr.UseExceptions() 

12osr.UseExceptions() 

13gdal.UseExceptions() 

14 

15 

16class ColorFormat(Enum): 

17 """A color format enumeration. 

18 Except from "BIT", the member's name matches 

19 a common variable format name. The member's value is 

20 the allocated bit size associated to this format. 

21 """ 

22 

23 BIT = 1 

24 UINT8 = 8 

25 FLOAT32 = 32 

26 

27 

28__SR_BOOK = {} 

29 

30 

31def srs_to_spatialreference(srs: str) -> "osgeo.osr.SpatialReference": 

32 """Convert coordinates system as string to OSR spatial reference 

33 

34 Using a cache, to instanciate a Spatial Reference from a string only once. 

35 

36 Args: 

37 srs (str): coordinates system PROJ4 compliant, with authority and code, like EPSG:3857 or IGNF:LAMB93 

38 

39 Raises: 

40 RuntimeError: Provided SRS is invalid for OSR 

41 

42 Returns: 

43 osgeo.osr.SpatialReference: Corresponding OSR spatial reference 

44 """ 

45 

46 global __SR_BOOK 

47 

48 if srs.upper() not in __SR_BOOK: 

49 authority, code = srs.split(":", 1) 

50 

51 sr = osr.SpatialReference() 

52 if authority.upper() == "EPSG": 

53 sr.ImportFromEPSG(int(code)) 

54 else: 

55 sr.ImportFromProj4(f"+init={srs.upper()} +wktext") 

56 

57 __SR_BOOK[srs.upper()] = sr 

58 

59 return __SR_BOOK[srs.upper()] 

60 

61 

62def bbox_to_geometry( 

63 bbox: Tuple[float, float, float, float], densification: int = 0 

64) -> "osgeo.ogr.Geometry": 

65 """Convert bbox coordinates to OGR geometry 

66 

67 Args: 

68 bbox (Tuple[float, float, float, float]): bounding box (xmin, ymin, xmax, ymax) 

69 densification (int, optional): Number of point to add for each side of bounding box. Defaults to 0. 

70 

71 Raises: 

72 RuntimeError: Provided SRS is invalid for OSR 

73 

74 Returns: 

75 osgeo.ogr.Geometry: Corresponding OGR geometry, with spatial reference if provided 

76 """ 

77 

78 ring = ogr.Geometry(ogr.wkbLinearRing) 

79 

80 if densification > 0: 

81 step_x = (bbox[2] - bbox[0]) / (densification + 1) 

82 step_y = (bbox[3] - bbox[1]) / (densification + 1) 

83 

84 for i in range(densification + 1): 

85 ring.AddPoint(bbox[0] + step_x * i, bbox[1]) 

86 for i in range(densification + 1): 

87 ring.AddPoint(bbox[2], bbox[1] + step_y * i) 

88 for i in range(densification + 1): 

89 ring.AddPoint(bbox[2] - step_x * i, bbox[3]) 

90 for i in range(densification + 1): 

91 ring.AddPoint(bbox[0], bbox[3] - step_y * i) 

92 ring.AddPoint(bbox[0], bbox[1]) 

93 

94 else: 

95 ring.AddPoint(bbox[0], bbox[1]) 

96 ring.AddPoint(bbox[2], bbox[1]) 

97 ring.AddPoint(bbox[2], bbox[3]) 

98 ring.AddPoint(bbox[0], bbox[3]) 

99 ring.AddPoint(bbox[0], bbox[1]) 

100 

101 geom = ogr.Geometry(ogr.wkbPolygon) 

102 geom.AddGeometry(ring) 

103 geom.SetCoordinateDimension(2) 

104 

105 return geom 

106 

107 

108def reproject_bbox( 

109 bbox: Tuple[float, float, float, float], srs_src: str, srs_dst: str, densification: int = 5 

110) -> Tuple[float, float, float, float]: 

111 """Return bounding box in other coordinates system 

112 

113 Points are added to be sure output bounding box contains input bounding box 

114 

115 Args: 

116 bbox (Tuple[float, float, float, float]): bounding box (xmin, ymin, xmax, ymax) with source coordinates system 

117 srs_src (str): source coordinates system 

118 srs_dst (str): destination coordinates system 

119 densification (int, optional): Number of point to add for each side of bounding box. Defaults to 5. 

120 

121 Returns: 

122 Tuple[float, float, float, float]: bounding box (xmin, ymin, xmax, ymax) with destination coordinates system 

123 """ 

124 

125 sr_src = srs_to_spatialreference(srs_src) 

126 sr_src_inv = sr_src.EPSGTreatsAsLatLong() or sr_src.EPSGTreatsAsNorthingEasting() 

127 

128 sr_dst = srs_to_spatialreference(srs_dst) 

129 sr_dst_inv = sr_dst.EPSGTreatsAsLatLong() or sr_dst.EPSGTreatsAsNorthingEasting() 

130 

131 if sr_src.IsSame(sr_dst) and sr_src_inv == sr_dst_inv: 

132 # Les système sont vraiment les même, avec le même ordre des axes 

133 return bbox 

134 elif sr_src.IsSame(sr_dst) and sr_src_inv != sr_dst_inv: 

135 # Les système sont les même pour OSR, mais l'ordre des axes est différent 

136 return (bbox[1], bbox[0], bbox[3], bbox[2]) 

137 

138 # Systèmes différents 

139 

140 bbox_src = bbox_to_geometry(bbox, densification) 

141 bbox_src.AssignSpatialReference(sr_src) 

142 

143 bbox_dst = bbox_src.Clone() 

144 os.environ["OGR_ENABLE_PARTIAL_REPROJECTION"] = "YES" 

145 bbox_dst.TransformTo(sr_dst) 

146 

147 env = bbox_dst.GetEnvelope() 

148 return (env[0], env[2], env[1], env[3]) 

149 

150 

151def reproject_point( 

152 point: Tuple[float, float], 

153 sr_src: "osgeo.osr.SpatialReference", 

154 sr_dst: "osgeo.osr.SpatialReference", 

155) -> Tuple[float, float]: 

156 """Reproject a point 

157 

158 Args: 

159 point (Tuple[float, float]): source spatial reference point 

160 sr_src (osgeo.osr.SpatialReference): source spatial reference 

161 sr_dst (osgeo.osr.SpatialReference): destination spatial reference 

162 

163 Returns: 

164 Tuple[float, float]: X/Y in destination spatial reference 

165 """ 

166 

167 sr_src_inv = sr_src.EPSGTreatsAsLatLong() or sr_src.EPSGTreatsAsNorthingEasting() 

168 sr_dst_inv = sr_dst.EPSGTreatsAsLatLong() or sr_dst.EPSGTreatsAsNorthingEasting() 

169 

170 if sr_src.IsSame(sr_dst) and sr_src_inv == sr_dst_inv: 

171 # Les système sont vraiment les même, avec le même ordre des axes 

172 return (point[0], point[1]) 

173 elif sr_src.IsSame(sr_dst) and sr_src_inv != sr_dst_inv: 

174 # Les système sont les même pour OSR, mais l'ordre des axes est différent 

175 return (point[1], point[0]) 

176 

177 # Systèmes différents 

178 ct = osr.CreateCoordinateTransformation(sr_src, sr_dst) 

179 x_dst, y_dst, z_dst = ct.TransformPoint(point[0], point[1]) 

180 

181 return (x_dst, y_dst) 

182 

183 

184def compute_bbox(source_dataset: gdal.Dataset) -> Tuple: 

185 """Image boundingbox computing method 

186 

187 Args: 

188 source_dataset (gdal.Dataset): Dataset instanciated 

189 from the raster image 

190 

191 Limitations: 

192 Image's axis must be parallel to SRS' axis 

193 

194 Raises: 

195 AttributeError: source_dataset is not a gdal.Dataset instance. 

196 Exception: The dataset does not contain transform data. 

197 """ 

198 bbox = None 

199 transform_vector = source_dataset.GetGeoTransform() 

200 if transform_vector is None: 

201 raise Exception( 

202 f"No transform vector found in the dataset created from " 

203 + f"the following file : {source_dataset.GetFileList()[0]}" 

204 ) 

205 width = source_dataset.RasterXSize 

206 height = source_dataset.RasterYSize 

207 x_range = ( 

208 transform_vector[0], 

209 transform_vector[0] + width * transform_vector[1] + height * transform_vector[2], 

210 ) 

211 y_range = ( 

212 transform_vector[3], 

213 transform_vector[3] + width * transform_vector[4] + height * transform_vector[5], 

214 ) 

215 spatial_ref = source_dataset.GetSpatialRef() 

216 if spatial_ref is not None and spatial_ref.GetDataAxisToSRSAxisMapping() == [2, 1]: 

217 # Coordonnées terrain de type (latitude, longitude) 

218 # => on permute les coordonnées terrain par rapport à l'image 

219 bbox = (min(y_range), min(x_range), max(y_range), max(x_range)) 

220 else: 

221 # Coordonnées terrain de type (longitude, latitude) ou pas de SRS 

222 # => les coordonnées terrain sont dans le même ordre que celle de l'image 

223 bbox = (min(x_range), min(y_range), max(x_range), max(y_range)) 

224 return bbox 

225 

226 

227def compute_format(dataset: gdal.Dataset, path: str = None) -> ColorFormat: 

228 """Image color format computing method 

229 

230 Args: 

231 dataset (gdal.Dataset): Dataset instanciated from the image 

232 path (str, optionnal): path to the original file/object 

233 

234 Raises: 

235 AttributeError: source_dataset is not a gdal.Dataset instance. 

236 Exception: No color band found or unsupported color format. 

237 """ 

238 color_format = None 

239 if path is None: 

240 path = dataset.GetFileList()[0] 

241 if dataset.RasterCount < 1: 

242 raise Exception(f"Image {path} contains no color band.") 

243 

244 band_1_datatype = dataset.GetRasterBand(1).DataType 

245 data_type_name = gdal.GetDataTypeName(band_1_datatype) 

246 data_type_size = gdal.GetDataTypeSize(band_1_datatype) 

247 color_interpretation = dataset.GetRasterBand(1).GetRasterColorInterpretation() 

248 color_name = None 

249 if color_interpretation is not None: 

250 color_name = gdal.GetColorInterpretationName(color_interpretation) 

251 compression_regex_match = re.search(r"COMPRESSION\s*=\s*PACKBITS", gdal.Info(dataset)) 

252 

253 if ( 

254 data_type_name == "Byte" 

255 and data_type_size == 8 

256 and color_name == "Palette" 

257 and compression_regex_match 

258 ): 

259 # Compris par libTIFF comme du noir et blanc sur 1 bit 

260 color_format = ColorFormat.BIT 

261 elif data_type_name == "Byte" and data_type_size == 8: 

262 color_format = ColorFormat.UINT8 

263 elif data_type_name == "Float32" and data_type_size == 32: 

264 color_format = ColorFormat.FLOAT32 

265 else: 

266 raise Exception( 

267 f"Unsupported color format for image {path} : " 

268 + f"'{data_type_name}' ({data_type_size} bits)" 

269 ) 

270 return color_format