Skript check_container_image_reference_parity.py
Automaticky generovaná dokumentace skriptu scripts/check_container_image_reference_parity.py.
Přehled modulu
Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.
V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).
docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při –fix srovnají na prod.
Výstup pro CI: řádky na stderr s prefixem [image-ref-parity].
Třídy
- class DockerfileScan
Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis
FROM.
Funkce
- log_msg(message)
Vypíše jeden řádek na stderr s prefixem pro grep v CI.
- Parametry:
message – Text bez prefixu.
verbose_only – Pokud True, vypíše jen při verbose režimu.
verbose – Aktuální verbose příznak.
- is_pre_commit()
Zjistí, zda skript běží jako pre-commit hook (pro implicitní verbose).
- Vrací:
True, pokud je v prostředí nastavena proměnná
PRE_COMMIT.
- is_literal_image_ref(value)
Určí, zda hodnota
image:v compose lze použít pro porovnání pinů (bez rozšíření proměnných).- Parametry:
value – Řetězec z pole
imageu služby.- Vrací:
False pro prázdný řetězec, obsah
${/$$; jinak True.
- _strip_platform_flags(rest)
Odstraní z řetězce za
FROMvšechny úvodní příznaky--platform=....- Parametry:
rest – Část řádku za
FROMpřed případnýmAS.- Vrací:
Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.
- parse_from_instruction(line)
Parsuje řádek Dockerfile začínající
FROMna odkaz na image a volitelné jméno stage.- Parametry:
line – Jeden řádek souboru (může obsahovat koncový komentář
#).- Vrací:
(image_ref, as_name)nebo None, pokud řádek není platnýFROM.
- repository_and_pin(image_ref)
Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).
- Parametry:
image_ref – Řetězec image bez části
AS(např.ghcr.io/foo:1.0).- Vrací:
(repo_key, pin_key)— obě části malými písmeny pro porovnání.
- choose_canonical_pin(pins)
Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).
- Parametry:
pins – Množina pin_key.
- Vrací:
Vítězný pin_key.
- pick_display_ref_for_pin(refs_by_pin, pin_key)
Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).
- Parametry:
refs_by_pin – Mapa pin_key -> původní řetězce image.
pin_key – Normalizovaný pin.
- Vrací:
Původní řetězec image.
- build_yaml()
Vytvoří nakonfigurovanou instanci
ruamel.yaml.YAMLpro čtení/zápis compose se zachováním uvozovek.- Vrací:
YAML loader/dumper vhodný pro úpravy compose souborů.
- scan_dockerfile(text)
Projde text Dockerfile a sejme externí base image (mimo
scratcha odkazy na již definované stage).- Parametry:
text – Celý obsah Dockerfile.
- Vrací:
Struktura s množinami pinů, mapou původních řetězců a
line_infoproFROM_IMG_RE.
- fix_dockerfile_text(text, fix, path, _verbose)
Zkontroluje nebo opraví konflikty pinů u stejného
repo_keyv jednom Dockerfile.- Parametry:
text – Původní obsah souboru.
fix – Pokud True, přepíše
FROMřádky na kanonický pin; pokud False, jen hlásí chyby.path – Cesta pro hlášky v chybách.
_verbose – Rezervováno (aktuálně nepoužito).
- Vrací:
Trojice
(nový_text, seznam_chyb, byl_upraven).
- iter_compose_service_images(data)
Projde
servicesv načteném compose a vrátí služby s řetězcovýmimage.- Parametry:
data – Kořenový dict compose (např. z
yaml.load), očekává se klíčservices.- Yield:
(název_služby, hodnota_image, spec_dict)—spec_dictumožní při--fixzměnitimagena místě.- Vrací:
Iterátor trojic (viz
:yield:); při neplatnémdataneboservicesnic nevygeneruje.
- scan_compose_intra(data, _path)
Shromáždí literální
imagez jednoho compose souboru pro intra-file kontrolu shody pinů.- Parametry:
data – Parsovaný obsah compose.
_path – Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).
- Vrací:
(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image).
- apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo_pin, fix, _verbose)
U jednoho compose sjednotí literální
imagese stejnýmrepo_keyna jeden kanonický pin.- Parametry:
data – Parsovaný strom compose (mění se in-place při
fix=True).path – Cesta k souboru pro chybové hlášky.
pins_by_repo – Agregace pinů ze
scan_compose_intra().refs_by_repo_pin – Mapa původních řetězců image podle pinu.
fix – Zapnout zápis oprav do
data._verbose – Rezervováno (nepoužito).
- Vrací:
(seznam_chyb, byl_proveden_zápis_do_dat).
- extract_prod_literal_map(data)
Z produkčního compose vytvoří mapu
repo_key→ literální řetězecimage(první výskyt).- Parametry:
data – Parsovaný
docker-compose.yml(nebo ekvivalent).- Vrací:
Slovník pro cross-file srovnání se spotřebitelskými compose.
- apply_compose_cross_fix(consumer_data, consumer_path, prod_map, fix, verbose)
Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.
- Parametry:
consumer_data – Parsovaný compose spotřebitele (mění se in-place při
fix=True).consumer_path – Cesta k souboru spotřebitele (logy a chyby).
prod_map – Výstup
extract_prod_literal_map()zdocker-compose.yml.fix – Pokud True, přepíše
imagena hodnotu z prod při nesouladu pinu.verbose – Zapíná podrobné SKIP logy u neliterálních image (přes
log_msg).
- Vrací:
(seznam_chyb při fix=False, byl_proveden_zápis).
- process_dockerfiles(root, fix, verbose)
Zpracuje všechny Dockerfile z konstanty
DOCKERFILE_PATHSpod kořenemroot.- Parametry:
root – Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).
fix – Předáno do
fix_dockerfile_text(); při True zapíše změny na disk.verbose – Předáno dál pro případné budoucí logování (část volání zachována kvůli API).
- Vrací:
(souhrnné_chyby, seznam_cest_upravených_souborů).
- process_compose_files(root, fix, verbose)
Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.
Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).
- Parametry:
root – Kořen repozitáře.
fix – Zapne zápis YAML a cross-file úpravy spotřebitelů.
verbose – Statistiky přeskočených neliterálních referencí a SKIP u cross-file.
- Vrací:
(souhrnné_chyby, seznam_cest_upravených_souborů).
- main(argv)
Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.
- Parametry:
argv – Argumenty příkazové řádky (bez
sys.argv[0]); None znamenásys.argv[1:].- Vrací:
0 při úspěchu; 1 při nalezených chybách (bez
--fix) nebo po úpravě souborů (s--fix, aby pre-commit znovu zstageoval).
Zdrojový kód
1#!/usr/bin/env python3
2"""
3Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.
4
5- V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).
6- docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při --fix srovnají na prod.
7
8Výstup pro CI: řádky na stderr s prefixem ``[image-ref-parity]``.
9"""
10
11from __future__ import annotations
12
13import argparse
14import io
15import os
16import re
17import sys
18from collections import defaultdict
19from dataclasses import dataclass, field
20from pathlib import Path
21from typing import Any, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
22
23from packaging.version import InvalidVersion, Version
24from ruamel.yaml import YAML
25
26LOG_PREFIX = "[image-ref-parity]"
27
28# Relativní cesty od kořene repozitáře (pre-commit spouští z rootu).
29DOCKERFILE_PATHS = [
30 "Dockerfile",
31 "Dockerfile-DEV",
32 "Dockerfile-DB",
33 "fedora/Dockerfile",
34 "redis/Dockerfile",
35 "proxy/Dockerfile",
36]
37
38COMPOSE_PRODUCTION = ["docker-compose.yml"]
39COMPOSE_CONSUMERS = [
40 "docker-compose-dev-local-db-all-containers.yml",
41 "docker-compose-dev-local-db.yml",
42 "docker-compose-test.yml",
43 "git_docker-compose.yml",
44]
45
46FROM_LINE = re.compile(r"^FROM\s+(?P<rest>.+?)\s*(?:#.*)?$", re.IGNORECASE)
47AS_SUFFIX = re.compile(r"\s+AS\s+(\S+)\s*$", re.IGNORECASE)
48# FROM s nahraditelným image tokenem (image bez komentáře na konci řádku zvlášť).
49FROM_IMG_RE = re.compile(
50 r"^(FROM\s+)((?:--platform=\S+\s+)+)?(\S+)((\s+AS\s+\S+)?)(\s*(?:#.*)?)$",
51 re.IGNORECASE,
52)
53
54
55def log_msg(message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
56 """
57 Vypíše jeden řádek na stderr s prefixem pro grep v CI.
58
59 :param message: Text bez prefixu.
60 :param verbose_only: Pokud True, vypíše jen při verbose režimu.
61 :param verbose: Aktuální verbose příznak.
62 """
63 if verbose_only and not verbose:
64 return
65 print(f"{LOG_PREFIX} {message}", file=sys.stderr)
66
67
68def is_pre_commit() -> bool:
69 """
70 Zjistí, zda skript běží jako pre-commit hook (pro implicitní verbose).
71
72 :return: True, pokud je v prostředí nastavena proměnná ``PRE_COMMIT``.
73 """
74 return bool(os.environ.get("PRE_COMMIT"))
75
76
77def is_literal_image_ref(value: str) -> bool:
78 """
79 Určí, zda hodnota ``image:`` v compose lze použít pro porovnání pinů (bez rozšíření proměnných).
80
81 :param value: Řetězec z pole ``image`` u služby.
82 :return: False pro prázdný řetězec, obsah ``${`` / ``$$``; jinak True.
83 """
84 s = value.strip()
85 if not s:
86 return False
87 if "${" in s or "$$" in s:
88 return False
89 return True
90
91
92def _strip_platform_flags(rest: str) -> str:
93 """
94 Odstraní z řetězce za ``FROM`` všechny úvodní příznaky ``--platform=...``.
95
96 :param rest: Část řádku za ``FROM`` před případným ``AS``.
97 :return: Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.
98 """
99 current = rest.strip()
100 while current.startswith("--"):
101 space_idx = current.find(" ")
102 if space_idx == -1:
103 return ""
104 current = current[space_idx + 1 :].strip()
105 return current
106
107
108def parse_from_instruction(line: str) -> Optional[Tuple[str, Optional[str]]]:
109 """
110 Parsuje řádek Dockerfile začínající ``FROM`` na odkaz na image a volitelné jméno stage.
111
112 :param line: Jeden řádek souboru (může obsahovat koncový komentář ``#``).
113 :return: ``(image_ref, as_name)`` nebo None, pokud řádek není platný ``FROM``.
114 """
115 stripped = line.strip()
116 m = FROM_LINE.match(stripped)
117 if not m:
118 return None
119 rest = m.group("rest").strip()
120 as_name: Optional[str] = None
121 as_m = AS_SUFFIX.search(rest)
122 if as_m:
123 as_name = as_m.group(1)
124 rest = rest[: as_m.start()].strip()
125 rest = _strip_platform_flags(rest)
126 if not rest:
127 return None
128 return rest, as_name
129
130
131def repository_and_pin(image_ref: str) -> Tuple[str, str]:
132 """
133 Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).
134
135 :param image_ref: Řetězec image bez části ``AS`` (např. ``ghcr.io/foo:1.0``).
136 :return: ``(repo_key, pin_key)`` — obě části malými písmeny pro porovnání.
137 """
138 ref = image_ref.strip()
139 lower = ref.lower()
140 digest_idx = lower.find("@sha256:")
141 digest_suffix = ""
142 if digest_idx != -1:
143 digest_suffix = ref[digest_idx:]
144 ref = ref[:digest_idx]
145
146 last_slash = ref.rfind("/")
147 tail = ref[last_slash + 1 :]
148 if ":" in tail:
149 repo, _, tag = ref.rpartition(":")
150 else:
151 repo, tag = ref, ""
152
153 pin = (tag + digest_suffix).lower()
154 return repo.lower(), pin
155
156
157def choose_canonical_pin(pins: Set[str]) -> str:
158 """
159 Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).
160
161 :param pins: Množina pin_key.
162 :return: Vítězný pin_key.
163 """
164 if len(pins) == 1:
165 return next(iter(pins))
166 pins_list = list(pins)
167
168 def sort_key(pin: str) -> Tuple[int, Any]:
169 tag_part = pin.split("@sha256:", 1)[0]
170 try:
171 return (0, Version(tag_part))
172 except InvalidVersion:
173 return (1, pin)
174
175 if all(sort_key(p)[0] == 0 for p in pins_list):
176 return max(pins_list, key=lambda p: Version(p.split("@sha256:", 1)[0]))
177 return max(pins_list)
178
179
180def pick_display_ref_for_pin(refs_by_pin: Dict[str, List[str]], pin_key: str) -> str:
181 """
182 Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).
183
184 :param refs_by_pin: Mapa pin_key -> původní řetězce image.
185 :param pin_key: Normalizovaný pin.
186 :return: Původní řetězec image.
187 """
188 candidates = refs_by_pin.get(pin_key, [])
189 if not candidates:
190 return pin_key
191 return candidates[0]
192
193
194def build_yaml() -> YAML:
195 """
196 Vytvoří nakonfigurovanou instanci ``ruamel.yaml.YAML`` pro čtení/zápis compose se zachováním uvozovek.
197
198 :return: YAML loader/dumper vhodný pro úpravy compose souborů.
199 """
200 y = YAML()
201 y.preserve_quotes = True
202 y.indent(mapping=2, sequence=4, offset=2)
203 return y
204
205
206@dataclass
207class DockerfileScan:
208 """
209 Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis ``FROM``.
210 """
211
212 pins_by_repo: DefaultDict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))
213 refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = field(
214 default_factory=lambda: defaultdict(lambda: defaultdict(list))
215 )
216 line_info: List[Dict[str, Any]] = field(default_factory=list)
217
218
219def scan_dockerfile(text: str) -> DockerfileScan:
220 """
221 Projde text Dockerfile a sejme externí base image (mimo ``scratch`` a odkazy na již definované stage).
222
223 :param text: Celý obsah Dockerfile.
224 :return: Struktura s množinami pinů, mapou původních řetězců a ``line_info`` pro ``FROM_IMG_RE``.
225 """
226 scan = DockerfileScan()
227 stages: Set[str] = set()
228 for lineno, line in enumerate(text.splitlines(), start=1):
229 parsed = parse_from_instruction(line)
230 if parsed is None:
231 continue
232 image_ref, as_name = parsed
233 if image_ref.lower() == "scratch":
234 if as_name:
235 stages.add(as_name)
236 continue
237 if image_ref in stages:
238 if as_name:
239 stages.add(as_name)
240 continue
241 repo_key, pin_key = repository_and_pin(image_ref)
242 scan.pins_by_repo[repo_key].add(pin_key)
243 scan.refs_by_repo_pin[repo_key][pin_key].append(image_ref)
244 m = FROM_IMG_RE.match(line.rstrip("\n"))
245 if m:
246 scan.line_info.append(
247 {
248 "lineno": lineno,
249 "line": line,
250 "repo_key": repo_key,
251 "pin_key": pin_key,
252 "prefix": m.group(1) + (m.group(2) or ""),
253 "as_part": m.group(4) or "",
254 "suffix": m.group(6) or "",
255 }
256 )
257 if as_name:
258 stages.add(as_name)
259 return scan
260
261
262def fix_dockerfile_text(text: str, fix: bool, path: Path, _verbose: bool) -> Tuple[str, List[str], bool]:
263 """
264 Zkontroluje nebo opraví konflikty pinů u stejného ``repo_key`` v jednom Dockerfile.
265
266 :param text: Původní obsah souboru.
267 :param fix: Pokud True, přepíše ``FROM`` řádky na kanonický pin; pokud False, jen hlásí chyby.
268 :param path: Cesta pro hlášky v chybách.
269 :param _verbose: Rezervováno (aktuálně nepoužito).
270 :return: Trojice ``(nový_text, seznam_chyb, byl_upraven)``.
271 """
272 errors: List[str] = []
273 scan = scan_dockerfile(text)
274 for repo_key, pins in scan.pins_by_repo.items():
275 if len(pins) <= 1:
276 continue
277 if not fix:
278 detail_parts = []
279 for pin in sorted(pins):
280 detail_parts.append(f" - {repr(pin) if pin else '(bez explicitního tagu)'}")
281 errors.append(
282 f"{path}: stejný image {repo_key!r} má v jednom Dockerfile různé tagy/digesty:\n"
283 + "\n".join(detail_parts)
284 )
285 log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
286 continue
287 win = choose_canonical_pin(set(pins))
288 new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
289 log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na pin {win!r} (→ {new_ref!r})")
290
291 if errors and not fix:
292 return text, errors, False
293
294 if not fix:
295 return text, [], False
296
297 lines = text.splitlines(keepends=True)
298 modified = False
299 for info in scan.line_info:
300 repo_key = info["repo_key"]
301 pins = scan.pins_by_repo[repo_key]
302 if len(pins) <= 1:
303 continue
304 win = choose_canonical_pin(set(pins))
305 if info["pin_key"] == win:
306 continue
307 new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
308 lineno = info["lineno"]
309 idx = lineno - 1
310 old = lines[idx]
311 m = FROM_IMG_RE.match(old.rstrip("\r\n"))
312 if not m:
313 continue
314 nl = "\n" if old.endswith("\n") else ""
315 if old.endswith("\r\n"):
316 nl = "\r\n"
317 new_line = m.group(1) + (m.group(2) or "") + new_ref + (m.group(4) or "") + (m.group(6) or "") + nl
318 if new_line != old:
319 lines[idx] = new_line
320 modified = True
321 return "".join(lines), [], modified
322
323
324def iter_compose_service_images(data: Any) -> Iterable[Tuple[str, str, Any]]:
325 """
326 Projde ``services`` v načteném compose a vrátí služby s řetězcovým ``image``.
327
328 :param data: Kořenový dict compose (např. z ``yaml.load``), očekává se klíč ``services``.
329 :yield: ``(název_služby, hodnota_image, spec_dict)`` — ``spec_dict`` umožní při ``--fix`` změnit ``image`` na místě.
330 :return: Iterátor trojic (viz ``:yield:``); při neplatném ``data`` nebo ``services`` nic nevygeneruje.
331 """
332 if not isinstance(data, dict):
333 return
334 services = data.get("services")
335 if not isinstance(services, dict):
336 return
337 for svc_name, spec in services.items():
338 if not isinstance(spec, dict):
339 continue
340 img = spec.get("image")
341 if isinstance(img, str):
342 yield svc_name, img, spec
343
344
345def scan_compose_intra(
346 data: Any, _path: Path
347) -> Tuple[DefaultDict[str, Set[str]], DefaultDict[str, DefaultDict[str, List[str]]], int]:
348 """
349 Shromáždí literální ``image`` z jednoho compose souboru pro intra-file kontrolu shody pinů.
350
351 :param data: Parsovaný obsah compose.
352 :param _path: Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).
353 :return: ``(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image)``.
354 """
355 pins_by_repo: DefaultDict[str, Set[str]] = defaultdict(set)
356 refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = defaultdict(lambda: defaultdict(list))
357 skipped_nonliteral = 0
358 for _svc_name, img, _spec in iter_compose_service_images(data):
359 if not is_literal_image_ref(img):
360 skipped_nonliteral += 1
361 continue
362 repo_key, pin_key = repository_and_pin(img)
363 pins_by_repo[repo_key].add(pin_key)
364 refs_by_repo_pin[repo_key][pin_key].append(img)
365 return pins_by_repo, refs_by_repo_pin, skipped_nonliteral
366
367
368def apply_compose_intra_fix(
369 data: Any,
370 path: Path,
371 pins_by_repo: DefaultDict[str, Set[str]],
372 refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]],
373 fix: bool,
374 _verbose: bool,
375) -> Tuple[List[str], bool]:
376 """
377 U jednoho compose sjednotí literální ``image`` se stejným ``repo_key`` na jeden kanonický pin.
378
379 :param data: Parsovaný strom compose (mění se in-place při ``fix=True``).
380 :param path: Cesta k souboru pro chybové hlášky.
381 :param pins_by_repo: Agregace pinů ze :func:`scan_compose_intra`.
382 :param refs_by_repo_pin: Mapa původních řetězců image podle pinu.
383 :param fix: Zapnout zápis oprav do ``data``.
384 :param _verbose: Rezervováno (nepoužito).
385 :return: ``(seznam_chyb, byl_proveden_zápis_do_dat)``.
386 """
387 errors: List[str] = []
388 modified = False
389 for repo_key, pins in pins_by_repo.items():
390 if len(pins) <= 1:
391 continue
392 if not fix:
393 errors.append(f"{path}: stejný image {repo_key!r} má v compose různé tagy/digesty: {sorted(pins)}")
394 log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
395 continue
396 win = choose_canonical_pin(set(pins))
397 new_ref = pick_display_ref_for_pin({k: list(v) for k, v in refs_by_repo_pin[repo_key].items()}, win)
398 log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na {new_ref!r}")
399 for _svc_name, img, spec in iter_compose_service_images(data):
400 if not is_literal_image_ref(img):
401 continue
402 rk, pk = repository_and_pin(img)
403 if rk != repo_key:
404 continue
405 if pk != win:
406 spec["image"] = new_ref
407 modified = True
408 return errors, modified
409
410
411def extract_prod_literal_map(data: Any) -> Dict[str, str]:
412 """
413 Z produkčního compose vytvoří mapu ``repo_key`` → literální řetězec ``image`` (první výskyt).
414
415 :param data: Parsovaný ``docker-compose.yml`` (nebo ekvivalent).
416 :return: Slovník pro cross-file srovnání se spotřebitelskými compose.
417 """
418 out: Dict[str, str] = {}
419 for _svc, img, _spec in iter_compose_service_images(data):
420 if not is_literal_image_ref(img):
421 continue
422 repo_key, _pin = repository_and_pin(img)
423 if repo_key not in out:
424 out[repo_key] = img.strip()
425 return out
426
427
428def apply_compose_cross_fix(
429 consumer_data: Any,
430 consumer_path: Path,
431 prod_map: Dict[str, str],
432 fix: bool,
433 verbose: bool,
434) -> Tuple[List[str], bool]:
435 """
436 Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.
437
438 :param consumer_data: Parsovaný compose spotřebitele (mění se in-place při ``fix=True``).
439 :param consumer_path: Cesta k souboru spotřebitele (logy a chyby).
440 :param prod_map: Výstup :func:`extract_prod_literal_map` z ``docker-compose.yml``.
441 :param fix: Pokud True, přepíše ``image`` na hodnotu z prod při nesouladu pinu.
442 :param verbose: Zapíná podrobné SKIP logy u neliterálních image (přes ``log_msg``).
443 :return: ``(seznam_chyb při fix=False, byl_proveden_zápis)``.
444 """
445 errors: List[str] = []
446 modified = False
447 seen_deferred: Set[Tuple[str, str]] = set()
448
449 for svc_name, img, spec in iter_compose_service_images(consumer_data):
450 if not is_literal_image_ref(img):
451 if verbose:
452 log_msg(
453 f"SKIP: {consumer_path} služba {svc_name!r}: neliterální image (přeskočeno)",
454 verbose_only=True,
455 verbose=verbose,
456 )
457 continue
458 repo_key, pin_key = repository_and_pin(img)
459 if repo_key in prod_map:
460 prod_ref = prod_map[repo_key]
461 _, prod_pin = repository_and_pin(prod_ref)
462 if pin_key != prod_pin:
463 msg = f"{consumer_path}: služba {svc_name!r}: {img!r} ≠ prod {prod_ref!r} " f"(repo {repo_key!r})"
464 if not fix:
465 errors.append(msg)
466 log_msg(f"ERROR: {msg}")
467 else:
468 log_msg(f"FIX: {consumer_path} {svc_name!r}: {img!r} → {prod_ref!r} (prod)")
469 spec["image"] = prod_ref
470 modified = True
471 else:
472 key = (repo_key, str(consumer_path))
473 if key not in seen_deferred:
474 seen_deferred.add(key)
475 log_msg(
476 f"NOTE: {consumer_path}: repo {repo_key!r} má literál {img!r}, "
477 f"ale {COMPOSE_PRODUCTION[0]} neobsahuje srovnatelný literál pro stejný repozitář — "
478 f"cross-file srovnání přeskočeno (zkontroluj ručně / proměnné v prod)."
479 )
480 return errors, modified
481
482
483def process_dockerfiles(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
484 """
485 Zpracuje všechny Dockerfile z konstanty ``DOCKERFILE_PATHS`` pod kořenem ``root``.
486
487 :param root: Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).
488 :param fix: Předáno do :func:`fix_dockerfile_text`; při True zapíše změny na disk.
489 :param verbose: Předáno dál pro případné budoucí logování (část volání zachována kvůli API).
490 :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
491 """
492 errors: List[str] = []
493 modified_paths: List[Path] = []
494 for rel in DOCKERFILE_PATHS:
495 path = root / rel
496 if not path.is_file():
497 continue
498 text = path.read_text(encoding="utf-8")
499 new_text, errs, mod = fix_dockerfile_text(text, fix, path, verbose)
500 errors.extend(errs)
501 if mod and fix:
502 path.write_text(new_text, encoding="utf-8")
503 modified_paths.append(path)
504 log_msg(f"Upraven soubor {path}")
505 return errors, modified_paths
506
507
508def process_compose_files(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
509 """
510 Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.
511
512 Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).
513
514 :param root: Kořen repozitáře.
515 :param fix: Zapne zápis YAML a cross-file úpravy spotřebitelů.
516 :param verbose: Statistiky přeskočených neliterálních referencí a SKIP u cross-file.
517 :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
518 """
519 yaml = build_yaml()
520 errors: List[str] = []
521 modified_paths: List[Path] = []
522 loaded: Dict[Path, Any] = {}
523 originals: Dict[Path, str] = {}
524
525 all_compose = [root / p for p in COMPOSE_PRODUCTION + COMPOSE_CONSUMERS if (root / p).is_file()]
526
527 for path in all_compose:
528 raw = path.read_text(encoding="utf-8")
529 originals[path] = raw
530 loaded[path] = yaml.load(raw) or {}
531
532 compose_dirty: Set[Path] = set()
533
534 for path in all_compose:
535 data = loaded[path]
536 pins_by_repo, refs_by_repo, skipped = scan_compose_intra(data, path)
537 if verbose and skipped:
538 log_msg(
539 f"INFO: {path}: přeskočeno {skipped} neliterálních image referencí",
540 verbose_only=True,
541 verbose=verbose,
542 )
543 errs, intra_mod = apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo, fix, verbose)
544 errors.extend(errs)
545 if intra_mod:
546 compose_dirty.add(path)
547
548 prod_path = root / COMPOSE_PRODUCTION[0]
549 prod_map: Dict[str, str] = {}
550 if prod_path in loaded:
551 prod_map = extract_prod_literal_map(loaded[prod_path])
552
553 for rel in COMPOSE_CONSUMERS:
554 path = root / rel
555 if path not in loaded:
556 continue
557 errs, cross_mod = apply_compose_cross_fix(loaded[path], path, prod_map, fix, verbose)
558 errors.extend(errs)
559 if cross_mod:
560 compose_dirty.add(path)
561
562 if fix:
563 for path in sorted(compose_dirty, key=str):
564 out = io.StringIO()
565 yaml.dump(loaded[path], out)
566 path.write_text(out.getvalue(), encoding="utf-8")
567 modified_paths.append(path)
568 log_msg(f"Upraven soubor {path}")
569
570 return errors, modified_paths
571
572
573def main(argv: Optional[List[str]] = None) -> int:
574 """
575 Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.
576
577 :param argv: Argumenty příkazové řádky (bez ``sys.argv[0]``); None znamená ``sys.argv[1:]``.
578 :return: 0 při úspěchu; 1 při nalezených chybách (bez ``--fix``) nebo po úpravě souborů (s ``--fix``, aby pre-commit znovu zstageoval).
579 """
580 parser = argparse.ArgumentParser(description="Kontrola shody container image pinů (Dockerfile + compose).")
581 parser.add_argument("files", nargs="*", help="Ignorováno — vždy se kontrolují pevné cesty v repozitáři.")
582 parser.add_argument(
583 "--fix",
584 action="store_true",
585 help="Opravit soubory na místě (intra + cross na spotřebitele).",
586 )
587 parser.add_argument(
588 "--verbose",
589 action="store_true",
590 help="Podrobnější výstup (v pre-commit zapnuto implicitně).",
591 )
592 args = parser.parse_args(argv)
593
594 verbose = bool(args.verbose) or is_pre_commit()
595 root = Path.cwd()
596
597 all_errors: List[str] = []
598 modified: List[Path] = []
599
600 de, md = process_dockerfiles(root, args.fix, verbose)
601 all_errors.extend(de)
602 modified.extend(md)
603
604 ce, mc = process_compose_files(root, args.fix, verbose)
605 all_errors.extend(ce)
606 modified.extend(mc)
607
608 for err in all_errors:
609 if not err.startswith(LOG_PREFIX):
610 print(err, file=sys.stderr)
611
612 if all_errors:
613 return 1
614
615 if args.fix and modified:
616 log_msg(f"HOTOVÁ ÚPRAVA: {len(modified)} soubor(ů); znovu přidej do commitu.")
617 return 1
618
619 return 0
620
621
622if __name__ == "__main__":
623 sys.exit(main())