Coverage for src/rok4_tools/tmsizer_utils/processors/io.py: 81%
78 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
1"""Provide processor to read or write data.
3The module contains the following classes:
5- `StdinProcessor` - Read data from standard input
6- `PathinProcessor` - Read data from a file/object
7- `StdoutProcessor` - Write data to standard output
8- `PathoutProcessor` - Write data to file/object
9"""
11import os
12import sys
13from typing import Dict, List, Tuple, Union, Iterator
14import tempfile
15from tqdm import tqdm
17from rok4.storage import put_data_str, copy
19from rok4_tools.tmsizer_utils.processors.processor import Processor
21class StdinProcessor(Processor):
22 """Processor to read from standard input
24 Data is read line by line. Data's format cannot be detected and have to be provided by user.
26 Attributes:
27 __pbar (tqdm): Progress bar, to print read progression to standard output
28 """
30 def __init__(self, format: str, pbar: bool = False):
31 """Constructor method
33 Args:
34 format (str): Format of read and processor's output data
35 pbar (bool, optional): Print a read progress bar ? Defaults to False.
36 """
38 super().__init__(format)
39 self.__pbar = None
40 if pbar:
41 self.__pbar = tqdm()
43 def process(self) -> Iterator[str]:
44 """Read from standard input line by line and yield it
46 Examples:
48 Read from standard input (GetTile urls), line by line, and print it
50 from rok4_tools.tmsizer_utils.processors.io import StdinProcessor
52 try:
53 reader_processor = StdinProcessor("GETTILE_PARAMS")
54 for line in reader_processor.process():
55 print(line)
57 except Exception as e:
58 print("{e}")
60 Yields:
61 Iterator[str]: line from the standard input
62 """
64 for line in sys.stdin:
65 self._processed += 1
66 if self.__pbar is not None:
67 self.__pbar.update(1)
68 yield line.rstrip()
70 def __str__(self) -> str:
71 return f"StdinProcessor : {self._processed} lines read (format {self._format}) from standard input"
75class PathinProcessor(Processor):
76 """Processor to read from file or object
78 Data is read line by line. Data's format cannot be detected and have to be provided by user.
80 Attributes:
81 __path (str): Path to file or object to read
82 __pbar (tqdm): Progress bar, to print read progression to standard output
83 """
85 def __init__(self, format: str, path: str, pbar: bool = False):
86 """Constructor method
88 Args:
89 format (str): Format of read and processor's output data
90 path (str): Path to file or object to read
91 pbar (bool, optional): Print a read progress bar ? Defaults to False.
92 """
94 super().__init__(format)
95 self.__path = path
96 self.__pbar = None
97 if pbar:
98 self.__pbar = tqdm()
100 def process(self) -> Iterator[str]:
101 """Read from the file or object line by line and yield it
103 The input file or object is copied in a temporary file to be read line by line.
105 Examples:
107 Read from an S3 object (geometries), line by line, and print it
109 from rok4_tools.tmsizer_utils.processors.io import PathinProcessor
111 try:
112 reader_processor = PathinProcessor("GEOMETRY", "s3://bucket/data.txt")
113 for line in reader_processor.process():
114 print(line)
116 except Exception as e:
117 print("{e}")
119 Yields:
120 Iterator[str]: line from the file or object
121 """
123 tmp = tempfile.NamedTemporaryFile(mode="r", delete=False)
124 copy(self.__path, tmp.name)
125 with open(tmp.name) as f:
126 for line in f:
127 self._processed += 1
128 if self.__pbar is not None:
129 self.__pbar.update(1)
130 yield line.rstrip()
131 tmp.close()
132 os.remove(tmp.name)
135 def __str__(self) -> str:
136 return f"PathinProcessor : {self._processed} lines read (format {self._format}) from {self.__path}"
139class StdoutProcessor(Processor):
140 """Processor to write to standard output
142 Data is read from the input processor and write as string, item by item. Output format is "NONE"
144 Attributes:
145 __input (Processor): Processor from which data is read
146 """
148 def __init__(self, input: Processor):
149 """Constructor method
151 All input format are accepted except "FILELIKE".
153 Args:
154 input (Processor): Processor from which data is read
156 Raises:
157 ValueError: Input format is not allowed
158 """
160 if input.format == "FILELIKE":
161 raise ValueError(f"Input format {input.format} is not handled for StdoutProcessor")
163 super().__init__("NONE")
164 self.__input = input
166 def process(self) -> Iterator[bool]:
167 """Read items one by one from the input processor and write it to the standard output
169 Yield True only once at the end
171 Examples:
173 Write results into standard output
175 from rok4_tools.tmsizer_utils.processors.io import StdoutProcessor
177 try:
178 # Creation of Processor source_processor
179 writer_processor = StdoutProcessor(source_processor)
180 status = writer_processor.process().__next__()
181 print("Results successfully written to S3 object")
183 except Exception as e:
184 print("{e}")
186 Yields:
187 Iterator[bool]: True when work is done
188 """
190 for item in self.__input.process():
191 self._processed += 1
192 print(str(item))
194 yield True
196 def __str__(self) -> str:
197 return f"StdoutProcessor : {self._processed} {self._format} items write to standard output"
200class PathoutProcessor(Processor):
201 """Processor to write to file or object
203 Data is read from the input processor and write as string, item by item. Output format is "NONE"
205 Attributes:
206 __input (Processor): Processor from which data is read
207 __path (str): Path to file or object to write
208 """
210 def __init__(self, input: Processor, path: str):
211 """Constructor method
213 All input format are accepted.
215 Args:
216 input (Processor): Processor from which data is read
217 path (str): Path to file or object to write
218 """
219 super().__init__("NONE")
220 self.__input = input
221 self.__path = path
223 def process(self) -> Iterator[bool]:
224 """Read items one by one from the input processor and write it to the standard output
226 Items are write into a temporary file, then copied to final location (file or object). Yield True only once at the end
228 Examples:
230 Write results into a S3 object
232 from rok4_tools.tmsizer_utils.processors.io import PathoutProcessor
234 try:
235 # Creation of Processor source_processor
236 writer_processor = PathoutProcessor(source_processor, "s3://bucket/results.txt")
237 status = writer_processor.process().__next__()
238 print(f"Results successfully written to S3 object")
240 except Exception as e:
241 print("{e}")
243 Yields:
244 Iterator[bool]: True when work is done
245 """
247 if self.__input.format == "FILELIKE":
248 tmp = tempfile.NamedTemporaryFile(mode="wb", delete=False)
249 f = self.__input.process().__next__()
250 self._processed += 1
251 tmp.write(f.read())
252 tmp.close()
253 copy(tmp.name, self.__path)
254 os.remove(tmp.name)
256 else:
257 tmp = tempfile.NamedTemporaryFile(mode="w", delete=False)
258 for item in self.__input.process():
259 self._processed += 1
260 tmp.write(f"{str(item)}\n")
261 tmp.close()
262 copy(tmp.name, self.__path)
263 os.remove(tmp.name)
265 yield True
267 def __str__(self) -> str:
268 return f"PathoutProcessor : {self._processed} {self._format} items write to {self.__path}"