Coverage for src/rok4_tools/joincache_utils/agent.py: 64%

137 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-06 17:15 +0000

1import os 

2import tempfile 

3from typing import Dict, List, Tuple, Union 

4 

5from rok4 import storage 

6from rok4.enums import SlabType 

7from rok4.pyramid import Level, Pyramid 

8 

9 

10def work(config: Dict, split: int) -> None: 

11 """Agent steps : make links or merge images 

12 

13 Expects the configuration, the todo list and the optionnal last done slab name : if exists, work 

14 does not start from the beginning, but after the last copied slab. This file contains only the 

15 destination path of the last processed slab. 

16 

17 A line in the todo list is either a slab's copy from pyramid format to work format, or a merge of 

18 stacking slabs or a slab's copy from work format to pyramid format. 

19 

20 Args: 

21 config (Dict): JOINCACHE configuration 

22 split (int): Split number 

23 

24 Raises: 

25 Exception: Cannot get todo list 

26 Exception: Cannot load the input or output pyramid 

27 Exception: Cannot process todo lists 

28 Exception: System command raises an error 

29 """ 

30 

31 # On récupère la todo list sous forme de fichier temporaire 

32 try: 

33 todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) 

34 storage.copy( 

35 os.path.join(config["process"]["directory"], f"todo.{split}.list"), 

36 f"file://{todo_list_obj.name}", 

37 ) 

38 

39 except Exception as e: 

40 raise Exception(f"Cannot copy todo lists to final location: {e}") 

41 

42 try: 

43 pyramid = Pyramid.from_descriptor(config["datasources"][0]["source"]["descriptors"][0]) 

44 except Exception as e: 

45 raise Exception( 

46 f"Cannot load source pyramid descriptor : {config['datasources'][0]['source']['descriptors'][0]} : {e}" 

47 ) 

48 

49 last_done_slab = None 

50 last_done_fo = os.path.join(config["process"]["directory"], f"slab.{split}.last") 

51 have_to_work = True 

52 

53 try: 

54 if storage.exists(last_done_fo): 

55 last_done_slab = storage.get_data_str(last_done_fo) 

56 logging.info( 

57 f"The last done slab file exists, last slab to have been copied is {last_done_slab}" 

58 ) 

59 have_to_work = False 

60 

61 raster_specifications = pyramid.raster_specifications 

62 format = pyramid.format 

63 compression = format.split("_")[1].lower() 

64 if "UINT" in format: 

65 format_channel = "uint" 

66 elif "FLOAT" in format: 

67 format_channel = "float" 

68 if "8" in format: 

69 bits_channel = "8" 

70 elif "32" in format: 

71 bits_channel = "32" 

72 

73 multiple_slabs = False 

74 with open(todo_list_obj.name) as file: 

75 for line in file: 

76 line = line.rstrip() 

77 parts = line.split(" ") 

78 

79 if parts[0] == "link": 

80 if not have_to_work: 

81 if parts[1] == last_done_slab: 

82 # On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer 

83 logging.info(f"Last copied slab reached, copies can start again") 

84 have_to_work = True 

85 

86 continue 

87 

88 storage.link(parts[2], parts[1]) 

89 

90 if parts[0] == "c2w": 

91 if not have_to_work: 

92 continue 

93 

94 if not multiple_slabs: 

95 i = 0 

96 data_file = tempfile.NamedTemporaryFile( 

97 mode="r", delete=False, suffix=".tif" 

98 ) 

99 result_value = os.system(f"cache2work -c zip {parts[1]} {data_file.name}") 

100 if result_value != 0: 

101 raise Exception(f"cache2work raises an error") 

102 data = [data_file] 

103 mask = [""] 

104 multiple_slabs = True 

105 else: 

106 slab_type = pyramid.get_infos_from_slab_path(parts[1])[0] 

107 if slab_type == SlabType.MASK: 

108 mask_file = tempfile.NamedTemporaryFile( 

109 mode="r", delete=False, suffix=".tif" 

110 ) 

111 result_value = os.system( 

112 f"cache2work -c zip {parts[1]} {mask_file.name}" 

113 ) 

114 if result_value != 0: 

115 raise Exception(f"cache2work raises an error") 

116 mask[i] = mask_file 

117 else: 

118 i += 1 

119 data_file = tempfile.NamedTemporaryFile( 

120 mode="r", delete=False, suffix=".tif" 

121 ) 

122 result_value = os.system( 

123 f"cache2work -c zip {parts[1]} {data_file.name}" 

124 ) 

125 if result_value != 0: 

126 raise Exception(f"cache2work raises an error") 

127 data += [data_file] 

128 mask += [""] 

129 

130 if parts[0] == "oNt": 

131 if not have_to_work: 

132 continue 

133 

134 result = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".tif") 

135 file_tiff = f"{result.name}" 

136 if config["pyramid"]["mask"]: 

137 result_mask = tempfile.NamedTemporaryFile( 

138 mode="r", delete=False, suffix=".tif" 

139 ) 

140 file_tiff += f" {result_mask.name}\n" 

141 else: 

142 file_tiff += "\n" 

143 for i in range(len(data) - 1, -1, -1): 

144 file_tiff += f"{data[i].name}" 

145 if mask[i] != "": 

146 if i == 0: 

147 file_tiff += f" {mask[i].name}" 

148 else: 

149 file_tiff += f" {mask[i].name}\n" 

150 else: 

151 if i != 0: 

152 file_tiff += "\n" 

153 fichier = tempfile.NamedTemporaryFile(mode="r", delete=False, suffix=".txt") 

154 with open(fichier.name, "w") as f: 

155 f.write(file_tiff) 

156 result_value = os.system( 

157 f"overlayNtiff -f {fichier.name} -m TOP -b {raster_specifications['nodata']} -c zip -s {raster_specifications['channels']} -p {raster_specifications['photometric']}" 

158 ) 

159 if result_value != 0: 

160 raise Exception(f"overlayNtiff raises an error") 

161 storage.remove(f"file://{fichier.name}") 

162 for i in range(len(data)): 

163 storage.remove(f"file://{data[i].name}") 

164 pass 

165 for i in range(len(mask)): 

166 if mask[i] != "": 

167 pass 

168 storage.remove(f"file://{mask[i].name}") 

169 multiple_slabs = False 

170 

171 if parts[0] == "w2c": 

172 if not have_to_work: 

173 if parts[1] == last_done_slab: 

174 # On est retombé sur la dernière dalles traitées, on passe à la suivante mais on arrête de passer 

175 logging.info(f"Last copied slab reached, copies can start again") 

176 have_to_work = True 

177 

178 continue 

179 

180 to_tray = storage.get_infos_from_path(parts[1])[2] 

181 if to_tray != "": 

182 os.makedirs(to_tray, exist_ok=True) 

183 

184 slab_type = pyramid.get_infos_from_slab_path(parts[1])[0] 

185 if slab_type == SlabType.DATA: 

186 level = pyramid.get_infos_from_slab_path(parts[1])[1] 

187 tile_width = pyramid.tms.get_level(level).tile_width 

188 tile_heigth = pyramid.tms.get_level(level).tile_heigth 

189 result_value = os.system( 

190 f"work2cache -c {compression} -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result.name} {parts[1]}" 

191 ) 

192 if result_value != 0: 

193 raise Exception(f"work2cache raises an error") 

194 storage.remove(f"file://{result.name}") 

195 elif slab_type == SlabType.MASK: 

196 level = pyramid.get_infos_from_slab_path(parts[1])[1] 

197 tile_width = pyramid.tms.get_level(level).tile_width 

198 tile_heigth = pyramid.tms.get_level(level).tile_heigth 

199 result_value = os.system( 

200 f"work2cache -c zip -t {tile_width} {tile_heigth} -a {format_channel} -b {bits_channel} -s {raster_specifications['channels']} {result_mask.name} {parts[1]}" 

201 ) 

202 if result_value != 0: 

203 raise Exception(f"work2cache raises an error") 

204 storage.remove(f"file://{result_mask.name}") 

205 

206 # On nettoie les fichiers locaux et comme tout s'est bien passé, on peut supprimer aussi le fichier local du travail fait 

207 todo_list_obj.close() 

208 storage.remove(f"file://{todo_list_obj.name}") 

209 storage.remove(last_done_fo) 

210 

211 except Exception as e: 

212 if last_done_slab is not None: 

213 storage.put_data_str(last_done_slab, last_done_fo) 

214 raise Exception(f"Cannot process the todo list: {e}")