Coverage for src/rok4_tools/tmsizer_utils/processors/io.py: 81%

78 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-06 17:15 +0000

1"""Provide processor to read or write data. 

2 

3The module contains the following classes: 

4 

5- `StdinProcessor` - Read data from standard input 

6- `PathinProcessor` - Read data from a file/object 

7- `StdoutProcessor` - Write data to standard output 

8- `PathoutProcessor` - Write data to file/object 

9""" 

10 

11import os 

12import sys 

13from typing import Dict, List, Tuple, Union, Iterator 

14import tempfile 

15from tqdm import tqdm 

16 

17from rok4.storage import put_data_str, copy 

18 

19from rok4_tools.tmsizer_utils.processors.processor import Processor 

20 

21class StdinProcessor(Processor): 

22 """Processor to read from standard input 

23 

24 Data is read line by line. Data's format cannot be detected and have to be provided by user. 

25 

26 Attributes: 

27 __pbar (tqdm): Progress bar, to print read progression to standard output 

28 """ 

29 

30 def __init__(self, format: str, pbar: bool = False): 

31 """Constructor method 

32 

33 Args: 

34 format (str): Format of read and processor's output data 

35 pbar (bool, optional): Print a read progress bar ? Defaults to False. 

36 """ 

37 

38 super().__init__(format) 

39 self.__pbar = None 

40 if pbar: 

41 self.__pbar = tqdm() 

42 

43 def process(self) -> Iterator[str]: 

44 """Read from standard input line by line and yield it 

45 

46 Examples: 

47 

48 Read from standard input (GetTile urls), line by line, and print it 

49 

50 from rok4_tools.tmsizer_utils.processors.io import StdinProcessor 

51 

52 try: 

53 reader_processor = StdinProcessor("GETTILE_PARAMS") 

54 for line in reader_processor.process(): 

55 print(line) 

56 

57 except Exception as e: 

58 print("{e}") 

59 

60 Yields: 

61 Iterator[str]: line from the standard input 

62 """ 

63 

64 for line in sys.stdin: 

65 self._processed += 1 

66 if self.__pbar is not None: 

67 self.__pbar.update(1) 

68 yield line.rstrip() 

69 

70 def __str__(self) -> str: 

71 return f"StdinProcessor : {self._processed} lines read (format {self._format}) from standard input" 

72 

73 

74 

75class PathinProcessor(Processor): 

76 """Processor to read from file or object 

77 

78 Data is read line by line. Data's format cannot be detected and have to be provided by user. 

79 

80 Attributes: 

81 __path (str): Path to file or object to read 

82 __pbar (tqdm): Progress bar, to print read progression to standard output 

83 """ 

84 

85 def __init__(self, format: str, path: str, pbar: bool = False): 

86 """Constructor method 

87 

88 Args: 

89 format (str): Format of read and processor's output data 

90 path (str): Path to file or object to read 

91 pbar (bool, optional): Print a read progress bar ? Defaults to False. 

92 """ 

93 

94 super().__init__(format) 

95 self.__path = path 

96 self.__pbar = None 

97 if pbar: 

98 self.__pbar = tqdm() 

99 

100 def process(self) -> Iterator[str]: 

101 """Read from the file or object line by line and yield it 

102 

103 The input file or object is copied in a temporary file to be read line by line. 

104 

105 Examples: 

106 

107 Read from an S3 object (geometries), line by line, and print it 

108 

109 from rok4_tools.tmsizer_utils.processors.io import PathinProcessor 

110 

111 try: 

112 reader_processor = PathinProcessor("GEOMETRY", "s3://bucket/data.txt") 

113 for line in reader_processor.process(): 

114 print(line) 

115 

116 except Exception as e: 

117 print("{e}") 

118 

119 Yields: 

120 Iterator[str]: line from the file or object 

121 """ 

122 

123 tmp = tempfile.NamedTemporaryFile(mode="r", delete=False) 

124 copy(self.__path, tmp.name) 

125 with open(tmp.name) as f: 

126 for line in f: 

127 self._processed += 1 

128 if self.__pbar is not None: 

129 self.__pbar.update(1) 

130 yield line.rstrip() 

131 tmp.close() 

132 os.remove(tmp.name) 

133 

134 

135 def __str__(self) -> str: 

136 return f"PathinProcessor : {self._processed} lines read (format {self._format}) from {self.__path}" 

137 

138 

139class StdoutProcessor(Processor): 

140 """Processor to write to standard output 

141 

142 Data is read from the input processor and write as string, item by item. Output format is "NONE" 

143 

144 Attributes: 

145 __input (Processor): Processor from which data is read 

146 """ 

147 

148 def __init__(self, input: Processor): 

149 """Constructor method 

150 

151 All input format are accepted except "FILELIKE". 

152 

153 Args: 

154 input (Processor): Processor from which data is read 

155 

156 Raises: 

157 ValueError: Input format is not allowed 

158 """ 

159 

160 if input.format == "FILELIKE": 

161 raise ValueError(f"Input format {input.format} is not handled for StdoutProcessor") 

162 

163 super().__init__("NONE") 

164 self.__input = input 

165 

166 def process(self) -> Iterator[bool]: 

167 """Read items one by one from the input processor and write it to the standard output 

168 

169 Yield True only once at the end 

170 

171 Examples: 

172 

173 Write results into standard output 

174 

175 from rok4_tools.tmsizer_utils.processors.io import StdoutProcessor 

176 

177 try: 

178 # Creation of Processor source_processor 

179 writer_processor = StdoutProcessor(source_processor) 

180 status = writer_processor.process().__next__() 

181 print("Results successfully written to S3 object") 

182 

183 except Exception as e: 

184 print("{e}") 

185 

186 Yields: 

187 Iterator[bool]: True when work is done 

188 """ 

189 

190 for item in self.__input.process(): 

191 self._processed += 1 

192 print(str(item)) 

193 

194 yield True 

195 

196 def __str__(self) -> str: 

197 return f"StdoutProcessor : {self._processed} {self._format} items write to standard output" 

198 

199 

200class PathoutProcessor(Processor): 

201 """Processor to write to file or object 

202 

203 Data is read from the input processor and write as string, item by item. Output format is "NONE" 

204 

205 Attributes: 

206 __input (Processor): Processor from which data is read 

207 __path (str): Path to file or object to write 

208 """ 

209 

210 def __init__(self, input: Processor, path: str): 

211 """Constructor method 

212 

213 All input format are accepted. 

214 

215 Args: 

216 input (Processor): Processor from which data is read 

217 path (str): Path to file or object to write  

218 """ 

219 super().__init__("NONE") 

220 self.__input = input 

221 self.__path = path 

222 

223 def process(self) -> Iterator[bool]: 

224 """Read items one by one from the input processor and write it to the standard output 

225 

226 Items are write into a temporary file, then copied to final location (file or object). Yield True only once at the end 

227 

228 Examples: 

229 

230 Write results into a S3 object 

231 

232 from rok4_tools.tmsizer_utils.processors.io import PathoutProcessor 

233 

234 try: 

235 # Creation of Processor source_processor 

236 writer_processor = PathoutProcessor(source_processor, "s3://bucket/results.txt") 

237 status = writer_processor.process().__next__() 

238 print(f"Results successfully written to S3 object") 

239 

240 except Exception as e: 

241 print("{e}") 

242 

243 Yields: 

244 Iterator[bool]: True when work is done 

245 """ 

246 

247 if self.__input.format == "FILELIKE": 

248 tmp = tempfile.NamedTemporaryFile(mode="wb", delete=False) 

249 f = self.__input.process().__next__() 

250 self._processed += 1 

251 tmp.write(f.read()) 

252 tmp.close() 

253 copy(tmp.name, self.__path) 

254 os.remove(tmp.name) 

255 

256 else: 

257 tmp = tempfile.NamedTemporaryFile(mode="w", delete=False) 

258 for item in self.__input.process(): 

259 self._processed += 1 

260 tmp.write(f"{str(item)}\n") 

261 tmp.close() 

262 copy(tmp.name, self.__path) 

263 os.remove(tmp.name) 

264 

265 yield True 

266 

267 def __str__(self) -> str: 

268 return f"PathoutProcessor : {self._processed} {self._format} items write to {self.__path}"