Coverage for src/rok4/Vector.py: 88%

101 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-01-29 10:29 +0100

1"""Provide class to read informations on vector data from file path or object path 

2 

3The module contains the following class : 

4 

5 - 'Vector' - Data Vector 

6 

7""" 

8 

9from osgeo import ogr 

10from rok4.Storage import get_osgeo_path, copy 

11from rok4.Exceptions import * 

12import os 

13import tempfile 

14 

15# Enable GDAL/OGR exceptions 

16ogr.UseExceptions() 

17 

18 

19class Vector: 

20 """A data vector 

21 

22 Attributes: 

23 path (str): path to the file/object 

24 bbox (Tuple[float, float, float, float]): bounding rectange in the data projection 

25 layers (List[Tuple[str, int, List[Tuple[str, str]]]]) : Vector layers with their name, their number of objects and their attributes 

26 """ 

27 

28 @classmethod 

29 def from_file(cls, path: str, **kwargs) -> "Vector": 

30 """Constructor method of a Vector from a file (Shapefile, Geopackage, CSV and GeoJSON) 

31 

32 Args: 

33 path (str): path to the file/object 

34 **csv (Dict[str : str]) : dictionnary of CSV parameters : 

35 -srs (str) ("EPSG:2154" if not provided) : spatial reference system of the geometry 

36 -column_x (str) ("x" if not provided) : field of the x coordinate 

37 -column_y (str) ("y" if not provided) : field of the y coordinate 

38 -column_wkt (str) (None if not provided) : field of the WKT of the geometry if WKT use to define coordinate 

39 

40 Examples: 

41 

42 from rok4.Vector import Vector 

43 

44 try: 

45 vector = Vector.from_file("file://tests/fixtures/ARRONDISSEMENT.shp") 

46 vector_csv1 = Vector.from_file("file://tests/fixtures/vector.csv" , csv={"delimiter":";", "column_x":"x", "column_y":"y"}) 

47 vector_csv2 = Vector.from_file("file://tests/fixtures/vector2.csv" , csv={"delimiter":";", "column_wkt":"WKT"}) 

48 

49 except Exception as e: 

50 print(f"Vector creation raises an exception: {exc}") 

51 

52 Raises: 

53 MissingEnvironmentError: Missing object storage informations 

54 StorageError: Storage read issue 

55 Exception: Wrong column 

56 Exception: Wrong data in column 

57 Exception: Wrong format of file 

58 Exception: Wrong data in the file 

59 

60 """ 

61 

62 self = cls() 

63 

64 self.path = path 

65 

66 path_split = path.split("/") 

67 

68 if path_split[0] == "ceph:" or path.endswith(".csv"): 

69 if path.endswith(".shp"): 

70 with tempfile.TemporaryDirectory() as tmp: 

71 tmp_path = tmp + "/" + path_split[-1][:-4] 

72 

73 copy(path, "file://" + tmp_path + ".shp") 

74 copy(path[:-4] + ".shx", "file://" + tmp_path + ".shx") 

75 copy(path[:-4] + ".cpg", "file://" + tmp_path + ".cpg") 

76 copy(path[:-4] + ".dbf", "file://" + tmp_path + ".dbf") 

77 copy(path[:-4] + ".prj", "file://" + tmp_path + ".prj") 

78 

79 dataSource = ogr.Open(tmp_path + ".shp", 0) 

80 

81 elif path.endswith(".gpkg"): 

82 with tempfile.TemporaryDirectory() as tmp: 

83 tmp_path = tmp + "/" + path_split[-1][:-5] 

84 

85 copy(path, "file://" + tmp_path + ".gpkg") 

86 

87 dataSource = ogr.Open(tmp_path + ".gpkg", 0) 

88 

89 elif path.endswith(".geojson"): 

90 with tempfile.TemporaryDirectory() as tmp: 

91 tmp_path = tmp + "/" + path_split[-1][:-8] 

92 

93 copy(path, "file://" + tmp_path + ".geojson") 

94 

95 dataSource = ogr.Open(tmp_path + ".geojson", 0) 

96 

97 elif path.endswith(".csv"): 

98 # Récupération des informations optionnelles 

99 if "csv" in kwargs: 

100 csv = kwargs["csv"] 

101 else: 

102 csv = {} 

103 

104 if "srs" in csv and csv["srs"] is not None: 

105 srs = csv["srs"] 

106 else: 

107 srs = "EPSG:2154" 

108 

109 if "column_x" in csv and csv["column_x"] is not None: 

110 column_x = csv["column_x"] 

111 else: 

112 column_x = "x" 

113 

114 if "column_y" in csv and csv["column_y"] is not None: 

115 column_y = csv["column_y"] 

116 else: 

117 column_y = "y" 

118 

119 if "column_wkt" in csv: 

120 column_wkt = csv["column_wkt"] 

121 else: 

122 column_wkt = None 

123 

124 with tempfile.TemporaryDirectory() as tmp: 

125 tmp_path = tmp + "/" + path_split[-1][:-4] 

126 name_fich = path_split[-1][:-4] 

127 

128 copy(path, "file://" + tmp_path + ".csv") 

129 

130 with tempfile.NamedTemporaryFile( 

131 mode="w", suffix=".vrt", dir=tmp, delete=False 

132 ) as tmp2: 

133 vrt_file = "<OGRVRTDataSource>\n" 

134 vrt_file += '<OGRVRTLayer name="' + name_fich + '">\n' 

135 vrt_file += "<SrcDataSource>" + tmp_path + ".csv</SrcDataSource>\n" 

136 vrt_file += "<SrcLayer>" + name_fich + "</SrcLayer>\n" 

137 vrt_file += "<LayerSRS>" + srs + "</LayerSRS>\n" 

138 if column_wkt == None: 

139 vrt_file += ( 

140 '<GeometryField encoding="PointFromColumns" x="' 

141 + column_x 

142 + '" y="' 

143 + column_y 

144 + '"/>\n' 

145 ) 

146 else: 

147 vrt_file += ( 

148 '<GeometryField encoding="WKT" field="' + column_wkt + '"/>\n' 

149 ) 

150 vrt_file += "</OGRVRTLayer>\n" 

151 vrt_file += "</OGRVRTDataSource>" 

152 tmp2.write(vrt_file) 

153 dataSourceVRT = ogr.Open(tmp2.name, 0) 

154 os.remove(tmp2.name) 

155 dataSource = ogr.GetDriverByName("ESRI Shapefile").CopyDataSource( 

156 dataSourceVRT, tmp_path + "shp" 

157 ) 

158 

159 else: 

160 raise Exception("This format of file cannot be loaded") 

161 

162 else: 

163 dataSource = ogr.Open(get_osgeo_path(path), 0) 

164 

165 multipolygon = ogr.Geometry(ogr.wkbGeometryCollection) 

166 try: 

167 layer = dataSource.GetLayer() 

168 except AttributeError: 

169 raise Exception(f"The content of {self.path} cannot be read") 

170 

171 layers = [] 

172 for i in range(dataSource.GetLayerCount()): 

173 layer = dataSource.GetLayer(i) 

174 name = layer.GetName() 

175 count = layer.GetFeatureCount() 

176 layerDefinition = layer.GetLayerDefn() 

177 attributes = [] 

178 for j in range(layerDefinition.GetFieldCount()): 

179 fieldName = layerDefinition.GetFieldDefn(j).GetName() 

180 fieldTypeCode = layerDefinition.GetFieldDefn(j).GetType() 

181 fieldType = layerDefinition.GetFieldDefn(j).GetFieldTypeName(fieldTypeCode) 

182 attributes += [(fieldName, fieldType)] 

183 for feature in layer: 

184 geom = feature.GetGeometryRef() 

185 if geom != None: 

186 multipolygon.AddGeometry(geom) 

187 layers += [(name, count, attributes)] 

188 

189 self.layers = layers 

190 self.bbox = multipolygon.GetEnvelope() 

191 

192 return self 

193 

194 @classmethod 

195 def from_parameters(cls, path: str, bbox: tuple, layers: list) -> "Vector": 

196 """Constructor method of a Vector from a parameters 

197 

198 Args: 

199 path (str): path to the file/object 

200 bbox (Tuple[float, float, float, float]): bounding rectange in the data projection 

201 layers (List[Tuple[str, int, List[Tuple[str, str]]]]) : Vector layers with their name, their number of objects and their attributes 

202 

203 Examples: 

204 

205 try : 

206 vector = Vector.from_parameters("file://tests/fixtures/ARRONDISSEMENT.shp", (1,2,3,4), [('ARRONDISSEMENT', 14, [('ID', 'String'), ('NOM', 'String'), ('INSEE_ARR', 'String'), ('INSEE_DEP', 'String'), ('INSEE_REG', 'String'), ('ID_AUT_ADM', 'String'), ('DATE_CREAT', 'String'), ('DATE_MAJ', 'String'), ('DATE_APP', 'Date'), ('DATE_CONF', 'Date')])]) 

207 

208 except Exception as e: 

209 print(f"Vector creation raises an exception: {exc}") 

210 

211 """ 

212 

213 self = cls() 

214 

215 self.path = path 

216 self.bbox = bbox 

217 self.layers = layers 

218 

219 return self