Coverage for src/rok4_tools/joincache_utils/finisher.py: 69%

71 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-11-06 17:15 +0000

1import logging 

2import os 

3import tempfile 

4from typing import Dict, List, Tuple, Union 

5 

6from rok4 import storage 

7from rok4.pyramid import Pyramid 

8 

9from rok4_tools.global_utils.source import SourcePyramids 

10 

11 

12def work(config: Dict) -> None: 

13 """Finisher steps : finalize the pyramid's processing 

14 

15 Expects the configuration and all todo lists. Write the output pyramid's descriptor to the final location, 

16 write the output pyramid's list to the final location (from the todo lists) and remove the todo lists 

17 

18 Args: 

19 config (Dict): JOINCACHE configuration 

20 

21 Raises: 

22 Exception: Cannot load the input or output pyramid 

23 Exception: Cannot write output pyramid's descriptor 

24 Exception: Cannot concatenate splits' done lists and write the final output pyramid's list to the final location 

25 """ 

26 

27 datasources = [] 

28 for i in range(len(config["datasources"])): 

29 sources = SourcePyramids( 

30 config["datasources"][i]["bottom"], 

31 config["datasources"][i]["top"], 

32 config["datasources"][i]["source"]["descriptors"], 

33 ) 

34 datasources.append(sources) 

35 

36 # Chargement de la pyramide à écrire 

37 storage_pyramid = { 

38 "type": datasources[0].pyramids[0].storage_type, 

39 "root": config["pyramid"]["root"], 

40 } 

41 try: 

42 to_pyramid = Pyramid.from_other( 

43 datasources[0].pyramids[0], 

44 config["pyramid"]["name"], 

45 storage_pyramid, 

46 mask=config["pyramid"]["mask"], 

47 ) 

48 except Exception as e: 

49 raise Exception( 

50 f"Cannot create the destination pyramid descriptor from the source one: {e}" 

51 ) 

52 

53 for sources in datasources: 

54 from_pyramids = sources.pyramids 

55 levels = from_pyramids[0].get_levels(sources.bottom, sources.top) 

56 for level in levels: 

57 try: 

58 to_pyramid.delete_level(level.id) 

59 except: 

60 pass 

61 info = sources.info_level(level.id) 

62 to_pyramid.add_level(level.id, info[0], info[1], info[2]) 

63 

64 try: 

65 to_pyramid.write_descriptor() 

66 except Exception as e: 

67 raise Exception(f"Cannot write output pyramid's descriptor to final location: {e}") 

68 

69 try: 

70 with tempfile.NamedTemporaryFile(mode="w", delete=False) as list_file_obj: 

71 list_file_tmp = list_file_obj.name 

72 

73 # Écriture de l'en-tête du fichier liste, avec toutes les racines des pyramides utilisées 

74 to_root = os.path.join(to_pyramid.storage_root, to_pyramid.name) 

75 list_file_obj.write(f"0={to_root}\n") 

76 

77 todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) 

78 storage.copy( 

79 os.path.join(config["process"]["directory"], f"todo.finisher.list"), 

80 f"file://{todo_list_obj.name}", 

81 ) 

82 

83 used_pyramids_roots = {} 

84 

85 # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie 

86 todo_list_obj = open(todo_list_obj.name) 

87 for line in todo_list_obj: 

88 line = line.rstrip() 

89 list_file_obj.write(f"{line}\n") 

90 index, root = line.split("=") 

91 used_pyramids_roots[index] = root 

92 

93 todo_list_obj.close() 

94 storage.remove(f"file://{todo_list_obj.name}") 

95 storage.remove(os.path.join(config["process"]["directory"], f"todo.finisher.list")) 

96 

97 list_file_obj.write("#\n") 

98 

99 for i in range(0, config["process"]["parallelization"]): 

100 todo_list_obj = tempfile.NamedTemporaryFile(mode="r", delete=False) 

101 storage.copy( 

102 os.path.join(config["process"]["directory"], f"todo.{i+1}.list"), 

103 f"file://{todo_list_obj.name}", 

104 ) 

105 

106 # On ouvre à nouveau en lecture le fichier pour avoir le contenu après la copie 

107 todo_list_obj = open(todo_list_obj.name) 

108 for line in todo_list_obj: 

109 line = line.rstrip() 

110 parts = line.split(" ") 

111 

112 if parts[0] == "w2c": 

113 storage_type, path, tray, base_name = storage.get_infos_from_path(parts[1]) 

114 # La dalle a été recalculée, elle appartient donc à la pyramide de sortie 

115 path = path.replace(to_root, "0") 

116 list_file_obj.write(f"{path}\n") 

117 

118 elif parts[0] == "link": 

119 storage_type, path, tray, base_name = storage.get_infos_from_path(parts[2]) 

120 # On a fait un lien, on met donc dans la liste la racine de la pyramide source 

121 path = path.replace(used_pyramids_roots[parts[3]], parts[3]) 

122 list_file_obj.write(f"{path}\n") 

123 

124 todo_list_obj.close() 

125 storage.remove(f"file://{todo_list_obj.name}") 

126 storage.remove(os.path.join(config["process"]["directory"], f"todo.{i+1}.list")) 

127 

128 storage.copy(f"file://{list_file_tmp}", to_pyramid.list) 

129 storage.remove(f"file://{list_file_tmp}") 

130 

131 except Exception as e: 

132 raise Exception( 

133 f"Cannot concatenate splits' done lists and write the final output pyramid's list to the final location: {e}" 

134 )