Coverage for src/rok4_tools/joincache_utils/finisher.py: 69%
71 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-11-06 17:15 +0000
1import logging
2import os
3import tempfile
4from typing import Dict, List, Tuple, Union
6from rok4 import storage
7from rok4.pyramid import Pyramid
9from rok4_tools.global_utils.source import SourcePyramids
12def work(config: Dict) -> None:
13 """Finisher steps : finalize the pyramid's processing
15 Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location,
16 write the output pyramid's list to the final location (from the todo lists) and remove the todo lists
18 Args:
19 config (Dict): JOINCACHE configuration
21 Raises:
22 Exception: Cannot load the input or output pyramid
23 Exception: Cannot write output pyramid's descriptor
24 Exception: Cannot concatenate splits' done lists and write the final output pyramid's list to the final location
25 """
27 datasources = []
28 for i in range(len(config["datasources"])):
29 sources = SourcePyramids(
30 config["datasources"][i]["bottom"],
31 config["datasources"][i]["top"],
32 config["datasources"][i]["source"]["descriptors"],
33 )
34 datasources.append(sources)
36 # Chargement de la pyramide à écrire
37 storage_pyramid = {
38 "type": datasources[0].pyramids[0].storage_type,
39 "root": config["pyramid"]["root"],
40 }
41 try:
42 to_pyramid = Pyramid.from_other(
43 datasources[0].pyramids[0],
44 config["pyramid"]["name"],
45 storage_pyramid,
46 mask=config["pyramid"]["mask"],
47 )
48 except Exception as e:
49 raise Exception(
50 f"Cannot create the destination pyramid descriptor from the source one: {e}"
51 )
53 for sources in datasources:
54 from_pyramids = sources.pyramids
55 levels = from_pyramids[0].get_levels(sources.bottom, sources.top)
56 for level in levels:
57 try:
58 to_pyramid.delete_level(level.id)
59 except:
60 pass
61 info = sources.info_level(level.id)
62 to_pyramid.add_level(level.id, info[0], info[1], info[2])
64 try:
65 to_pyramid.write_descriptor()
66 except Exception as e:
67 raise Exception(f"Cannot write output pyramid's descriptor to final location: {e}")
69 try:
70 with tempfile.NamedTemporaryFile(mode="w", delete=False) as list_file_obj:
71 list_file_tmp = list_file_obj.name
73 # Écriture de l'en-tête du fichier liste, avec toutes les racines des pyramides utilisées
74 to_root = os.path.join(to_pyramid.storage_root, to_pyramid.name)
75 list_file_obj.write(f"0={to_root}\n")
77 todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
78 storage.copy(
79 os.path.join(config["process"]["directory"], f"todo.finisher.list"),
80 f"file://{todo_list_obj.name}",
81 )
83 used_pyramids_roots = {}
85 # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
86 todo_list_obj = open(todo_list_obj.name)
87 for line in todo_list_obj:
88 line = line.rstrip()
89 list_file_obj.write(f"{line}\n")
90 index, root = line.split("=")
91 used_pyramids_roots[index] = root
93 todo_list_obj.close()
94 storage.remove(f"file://{todo_list_obj.name}")
95 storage.remove(os.path.join(config["process"]["directory"], f"todo.finisher.list"))
97 list_file_obj.write("#\n")
99 for i in range(0, config["process"]["parallelization"]):
100 todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False)
101 storage.copy(
102 os.path.join(config["process"]["directory"], f"todo.{i+1}.list"),
103 f"file://{todo_list_obj.name}",
104 )
106 # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie
107 todo_list_obj = open(todo_list_obj.name)
108 for line in todo_list_obj:
109 line = line.rstrip()
110 parts = line.split(" ")
112 if parts[0] == "w2c":
113 storage_type, path, tray, base_name = storage.get_infos_from_path(parts[1])
114 # La dalle a été recalculée, elle appartient donc à la pyramide de sortie
115 path = path.replace(to_root, "0")
116 list_file_obj.write(f"{path}\n")
118 elif parts[0] == "link":
119 storage_type, path, tray, base_name = storage.get_infos_from_path(parts[2])
120 # On a fait un lien, on met donc dans la liste la racine de la pyramide source
121 path = path.replace(used_pyramids_roots[parts[3]], parts[3])
122 list_file_obj.write(f"{path}\n")
124 todo_list_obj.close()
125 storage.remove(f"file://{todo_list_obj.name}")
126 storage.remove(os.path.join(config["process"]["directory"], f"todo.{i+1}.list"))
128 storage.copy(f"file://{list_file_tmp}", to_pyramid.list)
129 storage.remove(f"file://{list_file_tmp}")
131 except Exception as e:
132 raise Exception(
133 f"Cannot concatenate splits' done lists and write the final output pyramid's list to the final location: {e}"
134 )