Coverage for src/rok4_tools/joincache_utils/master.py: 92%

133 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-06 17:15 +0000

1import itertools 

2import os 

3import tempfile 

4from typing import Dict, List, Tuple, Union 

5 

6from rok4 import storage 

7from rok4.enums import PyramidType, SlabType 

8from rok4.pyramid import Pyramid 

9 

10from rok4_tools.global_utils.source import SourcePyramids 

11 

12"""Todo list instructions 

13 

14* c2w <source slab> - Convert a source pyramid slab (MASK ro DATA) to work format 

15* oNt - Overlay previous converted data slabs (considering possible masks) 

16* w2c <destination slab> - Convert the output of last overlayNtiff to slab format, into the output pyramid 

17* link <destination slab> <source slab> <source index> - Make a symbolic slab to source slab. Source index will be used to generate the pyramid's list 

18 

19""" 

20 

21 

22def work(config: Dict) -> None: 

23 """Master steps : prepare and split copies and merge to do 

24 

25 Load and check input pyramids from the descriptors and write the todo lists, splitting the copies or merge to do 

26 

27 Args: 

28 config (Dict): JOINCACHE configuration 

29 

30 Raises: 

31 Exception: Cannot load the input or the output pyramid 

32 Exception: S3 cluster host have not to be provided into bucket names (output or inputs) 

33 Exception: Sources pyramid have different features 

34 Exception: Cannot open stream to write todo lists 

35 Exception: Cannot copy todo lists 

36 """ 

37 

38 datasources = [] 

39 tms_reference = None 

40 format_reference = None 

41 channels_reference = None 

42 for datasource in config["datasources"]: 

43 sources = SourcePyramids( 

44 datasource["bottom"], 

45 datasource["top"], 

46 datasource["source"]["descriptors"], 

47 ) 

48 # Vérification de l'unicité du TMS 

49 if not tms_reference: 

50 tms_reference = sources.tms 

51 elif tms_reference.name != sources.tms.name: 

52 raise Exception( 

53 f"Sources pyramids cannot have two different TMS : {tms_reference} and {sources.tms}" 

54 ) 

55 

56 # Vérification de l'unicité du format 

57 if not format_reference: 

58 format_reference = sources.format 

59 elif format_reference != sources.format: 

60 raise Exception( 

61 f"Sources pyramids cannot have two different format : {format_reference} and {sources.format}" 

62 ) 

63 

64 # Vérification de l'unicité du nombre de canaux 

65 if not channels_reference: 

66 channels_reference = sources.channels 

67 elif channels_reference != sources.channels: 

68 raise Exception( 

69 f"Sources pyramids cannot have two different numbers of channels : {channels_reference} and {sources.channels}" 

70 ) 

71 

72 # Vérification du type des pyramides 

73 if sources.type != PyramidType.RASTER: 

74 raise Exception(f"Some sources pyramids are not a raster") 

75 

76 datasources += [sources] 

77 

78 # Chargement de la pyramide à écrire 

79 storage_pyramid = { 

80 "type": datasources[0].pyramids[0].storage_type, 

81 "root": config["pyramid"]["root"], 

82 } 

83 try: 

84 to_pyramid = Pyramid.from_other( 

85 datasources[0].pyramids[0], 

86 config["pyramid"]["name"], 

87 storage_pyramid, 

88 mask=config["pyramid"]["mask"], 

89 ) 

90 except Exception as e: 

91 raise Exception( 

92 f"Cannot create the destination pyramid descriptor from the source one: {e}" 

93 ) 

94 

95 if to_pyramid.storage_s3_cluster is not None: 

96 # On ne travaille que sur un unique cluster S3, il ne faut pas préciser lequel dans les chemins 

97 raise Exception( 

98 f"Do not set S3 cluster host into output bucket name ({config['pyramid']['root']}) : only one cluster can be used with JOINCACHE" 

99 ) 

100 

101 # Ouverture des flux vers les listes de travail à faire 

102 temp_agent_todos = [] 

103 temp_finisher_todo = None 

104 try: 

105 for i in range(0, config["process"]["parallelization"]): 

106 tmp = tempfile.NamedTemporaryFile(mode="w", delete=False) 

107 temp_agent_todos.append(tmp) 

108 

109 temp_finisher_todo = tempfile.NamedTemporaryFile(mode="w", delete=False) 

110 

111 except Exception as e: 

112 raise Exception(f"Cannot open stream to write todo lists: {e}") 

113 

114 round_robin = itertools.cycle(temp_agent_todos) 

115 

116 slab_finish = [] 

117 level_finish = [] 

118 used_pyramids_roots = {} 

119 used_pyramids_count = 0 

120 

121 for sources in datasources: 

122 from_pyramids = sources.pyramids 

123 levels = from_pyramids[0].get_levels(sources.bottom, sources.top) 

124 for level in levels: 

125 # Vérification que plusieurs datasources ne définissent pas un même niveau 

126 if level.id not in level_finish: 

127 try: 

128 to_pyramid.delete_level(level.id) 

129 except: 

130 pass 

131 info = sources.info_level(level.id) 

132 to_pyramid.add_level(level.id, info[0], info[1], info[2]) 

133 level_finish += [level.id] 

134 else: 

135 raise Exception(f"Different datasources cannot define the same level : {level.id}") 

136 

137 for i in range(len(from_pyramids)): 

138 # Récupération des dalles de la pyramide 

139 slabs = from_pyramids[i].list_generator(level.id) 

140 slabs_mask = from_pyramids[i].list_generator(level.id) 

141 # Vérification si la dalle à déjà été traitée 

142 for slab in slabs: 

143 if slab[0] in slab_finish: 

144 continue 

145 if slab[0][0].name == "MASK": 

146 continue 

147 from_slab_path = storage.get_path_from_infos( 

148 from_pyramids[i].storage_type, slab[1]["root"], slab[1]["slab"] 

149 ) 

150 process = [from_slab_path] 

151 slab_finish += [slab[0]] 

152 

153 # Recherche du masque correspondant à la dalle 

154 if config["process"]["mask"]: 

155 slab_mask = ( 

156 (SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3]), 

157 slab[1], 

158 ) 

159 slab_mask[1]["slab"] = from_pyramids[i].get_slab_path_from_infos( 

160 SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3], False 

161 ) 

162 if slab_mask in slabs_mask: 

163 mask = [ 

164 storage.get_path_from_infos( 

165 from_pyramids[i].storage_type, 

166 slab_mask[1]["root"], 

167 slab_mask[1]["slab"], 

168 ) 

169 ] 

170 else: 

171 mask = [""] 

172 

173 # Recherche de la dalle dans d'autres pyramides sources 

174 if not config["process"]["only_links"]: 

175 for j in range(i + 1, len(from_pyramids)): 

176 slabs_other = from_pyramids[j].list_generator(level.id) 

177 slabs_other_mask = from_pyramids[j].list_generator(level.id) 

178 for slab_other in slabs_other: 

179 if slab_other[0] == slab[0]: 

180 from_slab_path_other = storage.get_path_from_infos( 

181 from_pyramids[j].storage_type, 

182 slab_other[1]["root"], 

183 slab_other[1]["slab"], 

184 ) 

185 process += [from_slab_path_other] 

186 if config["process"]["mask"]: 

187 slab_mask_other = ( 

188 ( 

189 SlabType["MASK"], 

190 slab_other[0][1], 

191 slab_other[0][2], 

192 slab_other[0][3], 

193 ), 

194 slab_other[1], 

195 ) 

196 slab_mask_other[1]["slab"] = from_pyramids[ 

197 j 

198 ].get_slab_path_from_infos( 

199 SlabType["MASK"], 

200 slab_other[0][1], 

201 slab_other[0][2], 

202 slab_other[0][3], 

203 False, 

204 ) 

205 if slab_mask_other in slabs_other_mask: 

206 mask += [ 

207 storage.get_path_from_infos( 

208 from_pyramids[i].storage_type, 

209 slab_mask_other[1]["root"], 

210 slab_mask_other[1]["slab"], 

211 ) 

212 ] 

213 else: 

214 mask += [""] 

215 continue 

216 

217 to_slab_path = to_pyramid.get_slab_path_from_infos( 

218 slab[0][0], slab[0][1], slab[0][2], slab[0][3] 

219 ) 

220 if config["pyramid"]["mask"]: 

221 to_slab_path_mask = to_pyramid.get_slab_path_from_infos( 

222 SlabType["MASK"], slab[0][1], slab[0][2], slab[0][3] 

223 ) 

224 

225 # Ecriture des commandes dans les todo-lists 

226 if len(process) == 1: 

227 if slab[1]["root"] in used_pyramids_roots: 

228 root_index = used_pyramids_roots[slab[1]["root"]] 

229 else: 

230 used_pyramids_count += 1 

231 root_index = used_pyramids_count 

232 used_pyramids_roots[slab[1]["root"]] = used_pyramids_count 

233 

234 next(round_robin).write( 

235 f"link {to_slab_path} {from_slab_path} {root_index}\n" 

236 ) 

237 if config["pyramid"]["mask"]: 

238 if mask[0] != "": 

239 next(round_robin).write( 

240 f"link {to_slab_path_mask} {mask[0]} {root_index}\n" 

241 ) 

242 else: 

243 command = "" 

244 for j in range(len(process)): 

245 command += f"c2w {process[j]}\n" 

246 if config["process"]["mask"]: 

247 if mask[j] != "": 

248 command += f"c2w {mask[j]}\n" 

249 command += "oNt\n" 

250 command += f"w2c {to_slab_path}\n" 

251 if config["pyramid"]["mask"]: 

252 command += f"w2c {to_slab_path_mask}\n" 

253 next(round_robin).write(command) 

254 

255 for root in used_pyramids_roots: 

256 temp_finisher_todo.write(f"{used_pyramids_roots[root]}={root}\n") 

257 

258 # Copie des listes de recopies à l'emplacement partagé (peut être du stockage objet) 

259 try: 

260 for i in range(0, config["process"]["parallelization"]): 

261 tmp = temp_agent_todos[i] 

262 tmp.close() 

263 storage.copy( 

264 f"file://{tmp.name}", 

265 os.path.join(config["process"]["directory"], f"todo.{i+1}.list"), 

266 ) 

267 storage.remove(f"file://{tmp.name}") 

268 

269 temp_finisher_todo.close() 

270 storage.copy( 

271 f"file://{temp_finisher_todo.name}", 

272 os.path.join(config["process"]["directory"], f"todo.finisher.list"), 

273 ) 

274 storage.remove(f"file://{temp_finisher_todo.name}") 

275 

276 except Exception as e: 

277 raise Exception(f"Cannot copy todo lists to final location and clean: {e}")