Coverage for src/rok4/vector.py: 88%

100 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-10-01 15:35 +0000

1"""Provide class to read informations on vector data from file path or object path 

2 

3The module contains the following class : 

4 

5- `Vector` - Data Vector 

6""" 

7 

8# -- IMPORTS -- 

9 

10# standard library 

11import os 

12import tempfile 

13 

14# 3rd party 

15from osgeo import ogr 

16 

17# package 

18from rok4.storage import copy, get_osgeo_path 

19 

20# -- GLOBALS -- 

21 

22# Enable GDAL/OGR exceptions 

23ogr.UseExceptions() 

24 

25 

26class Vector: 

27 """A data vector 

28 

29 Attributes: 

30 path (str): path to the file/object 

31 bbox (Tuple[float, float, float, float]): bounding rectange in the data projection 

32 layers (List[Tuple[str, int, List[Tuple[str, str]]]]) : Vector layers with their name, their number of objects and their attributes 

33 """ 

34 

35 @classmethod 

36 def from_file(cls, path: str, **kwargs) -> "Vector": 

37 """Constructor method of a Vector from a file (Shapefile, Geopackage, CSV and GeoJSON) 

38 

39 Args: 

40 path (str): path to the file/object 

41 **csv (Dict[str : str]) : dictionnary of CSV parameters : 

42 -srs (str) ("EPSG:2154" if not provided) : spatial reference system of the geometry 

43 -column_x (str) ("x" if not provided) : field of the x coordinate 

44 -column_y (str) ("y" if not provided) : field of the y coordinate 

45 -column_wkt (str) (None if not provided) : field of the WKT of the geometry if WKT use to define coordinate 

46 

47 Examples: 

48 

49 from rok4.vector import Vector 

50 

51 try: 

52 vector = Vector.from_file("file://tests/fixtures/ARRONDISSEMENT.shp") 

53 vector_csv1 = Vector.from_file("file://tests/fixtures/vector.csv" , csv={"delimiter":";", "column_x":"x", "column_y":"y"}) 

54 vector_csv2 = Vector.from_file("file://tests/fixtures/vector2.csv" , csv={"delimiter":";", "column_wkt":"WKT"}) 

55 

56 except Exception as e: 

57 print(f"Vector creation raises an exception: {exc}") 

58 

59 Raises: 

60 MissingEnvironmentError: Missing object storage informations 

61 StorageError: Storage read issue 

62 Exception: Wrong column 

63 Exception: Wrong data in column 

64 Exception: Wrong format of file 

65 Exception: Wrong data in the file 

66 

67 """ 

68 

69 self = cls() 

70 

71 self.path = path 

72 

73 path_split = path.split("/") 

74 

75 if path_split[0] == "ceph:" or path.endswith(".csv"): 

76 if path.endswith(".shp"): 

77 with tempfile.TemporaryDirectory() as tmp: 

78 tmp_path = tmp + "/" + path_split[-1][:-4] 

79 

80 copy(path, "file://" + tmp_path + ".shp") 

81 copy(path[:-4] + ".shx", "file://" + tmp_path + ".shx") 

82 copy(path[:-4] + ".cpg", "file://" + tmp_path + ".cpg") 

83 copy(path[:-4] + ".dbf", "file://" + tmp_path + ".dbf") 

84 copy(path[:-4] + ".prj", "file://" + tmp_path + ".prj") 

85 

86 dataSource = ogr.Open(tmp_path + ".shp", 0) 

87 

88 elif path.endswith(".gpkg"): 

89 with tempfile.TemporaryDirectory() as tmp: 

90 tmp_path = tmp + "/" + path_split[-1][:-5] 

91 

92 copy(path, "file://" + tmp_path + ".gpkg") 

93 

94 dataSource = ogr.Open(tmp_path + ".gpkg", 0) 

95 

96 elif path.endswith(".geojson"): 

97 with tempfile.TemporaryDirectory() as tmp: 

98 tmp_path = tmp + "/" + path_split[-1][:-8] 

99 

100 copy(path, "file://" + tmp_path + ".geojson") 

101 

102 dataSource = ogr.Open(tmp_path + ".geojson", 0) 

103 

104 elif path.endswith(".csv"): 

105 # Récupération des informations optionnelles 

106 if "csv" in kwargs: 

107 csv = kwargs["csv"] 

108 else: 

109 csv = {} 

110 

111 if "srs" in csv and csv["srs"] is not None: 

112 srs = csv["srs"] 

113 else: 

114 srs = "EPSG:2154" 

115 

116 if "column_x" in csv and csv["column_x"] is not None: 

117 column_x = csv["column_x"] 

118 else: 

119 column_x = "x" 

120 

121 if "column_y" in csv and csv["column_y"] is not None: 

122 column_y = csv["column_y"] 

123 else: 

124 column_y = "y" 

125 

126 if "column_wkt" in csv: 

127 column_wkt = csv["column_wkt"] 

128 else: 

129 column_wkt = None 

130 

131 with tempfile.TemporaryDirectory() as tmp: 

132 tmp_path = tmp + "/" + path_split[-1][:-4] 

133 name_fich = path_split[-1][:-4] 

134 

135 copy(path, "file://" + tmp_path + ".csv") 

136 

137 with tempfile.NamedTemporaryFile( 

138 mode="w", suffix=".vrt", dir=tmp, delete=False 

139 ) as tmp2: 

140 vrt_file = "<OGRVRTDataSource>\n" 

141 vrt_file += '<OGRVRTLayer name="' + name_fich + '">\n' 

142 vrt_file += "<SrcDataSource>" + tmp_path + ".csv</SrcDataSource>\n" 

143 vrt_file += "<SrcLayer>" + name_fich + "</SrcLayer>\n" 

144 vrt_file += "<LayerSRS>" + srs + "</LayerSRS>\n" 

145 if column_wkt is None: 

146 vrt_file += ( 

147 '<GeometryField encoding="PointFromColumns" x="' 

148 + column_x 

149 + '" y="' 

150 + column_y 

151 + '"/>\n' 

152 ) 

153 else: 

154 vrt_file += ( 

155 '<GeometryField encoding="WKT" field="' + column_wkt + '"/>\n' 

156 ) 

157 vrt_file += "</OGRVRTLayer>\n" 

158 vrt_file += "</OGRVRTDataSource>" 

159 tmp2.write(vrt_file) 

160 dataSourceVRT = ogr.Open(tmp2.name, 0) 

161 os.remove(tmp2.name) 

162 dataSource = ogr.GetDriverByName("ESRI Shapefile").CopyDataSource( 

163 dataSourceVRT, tmp_path + "shp" 

164 ) 

165 

166 else: 

167 raise Exception("This format of file cannot be loaded") 

168 

169 else: 

170 dataSource = ogr.Open(get_osgeo_path(path), 0) 

171 

172 multipolygon = ogr.Geometry(ogr.wkbGeometryCollection) 

173 try: 

174 layer = dataSource.GetLayer() 

175 except AttributeError: 

176 raise Exception(f"The content of {self.path} cannot be read") 

177 

178 layers = [] 

179 for i in range(dataSource.GetLayerCount()): 

180 layer = dataSource.GetLayer(i) 

181 name = layer.GetName() 

182 count = layer.GetFeatureCount() 

183 layerDefinition = layer.GetLayerDefn() 

184 attributes = [] 

185 for j in range(layerDefinition.GetFieldCount()): 

186 fieldName = layerDefinition.GetFieldDefn(j).GetName() 

187 fieldTypeCode = layerDefinition.GetFieldDefn(j).GetType() 

188 fieldType = layerDefinition.GetFieldDefn(j).GetFieldTypeName(fieldTypeCode) 

189 attributes += [(fieldName, fieldType)] 

190 for feature in layer: 

191 geom = feature.GetGeometryRef() 

192 if geom is not None: 

193 multipolygon.AddGeometry(geom) 

194 layers += [(name, count, attributes)] 

195 

196 self.layers = layers 

197 self.bbox = multipolygon.GetEnvelope() 

198 

199 return self 

200 

201 @classmethod 

202 def from_parameters(cls, path: str, bbox: tuple, layers: list) -> "Vector": 

203 """Constructor method of a Vector from a parameters 

204 

205 Args: 

206 path (str): path to the file/object 

207 bbox (Tuple[float, float, float, float]): bounding rectange in the data projection 

208 layers (List[Tuple[str, int, List[Tuple[str, str]]]]) : Vector layers with their name, their number of objects and their attributes 

209 

210 Examples: 

211 

212 try : 

213 vector = Vector.from_parameters("file://tests/fixtures/ARRONDISSEMENT.shp", (1,2,3,4), [('ARRONDISSEMENT', 14, [('ID', 'String'), ('NOM', 'String'), ('INSEE_ARR', 'String'), ('INSEE_DEP', 'String'), ('INSEE_REG', 'String'), ('ID_AUT_ADM', 'String'), ('DATE_CREAT', 'String'), ('DATE_MAJ', 'String'), ('DATE_APP', 'Date'), ('DATE_CONF', 'Date')])]) 

214 

215 except Exception as e: 

216 print(f"Vector creation raises an exception: {exc}") 

217 

218 """ 

219 

220 self = cls() 

221 

222 self.path = path 

223 self.bbox = bbox 

224 self.layers = layers 

225 

226 return self