Skript check_container_image_reference_parity.py
Automaticky generovaná dokumentace skriptu scripts/check_container_image_reference_parity.py.
Přehled modulu
Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.
V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).
docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při –fix srovnají na prod.
Výstup pro CI: řádky na stderr s prefixem [image-ref-parity].
Třídy
- class DockerfileScan
Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis
FROM.
Funkce
- log_msg(message)
Vypíše jeden řádek na stderr s prefixem pro grep v CI.
- Parametry:
message – Text bez prefixu.
verbose_only – Pokud True, vypíše jen při verbose režimu.
verbose – Aktuální verbose příznak.
- is_literal_image_ref(value)
Určí, zda hodnota
image:v compose lze použít pro porovnání pinů (bez rozšíření proměnných).- Parametry:
value – Řetězec z pole
imageu služby.- Vrací:
False pro prázdný řetězec, obsah
${/$$; jinak True.
- _strip_platform_flags(rest)
Odstraní z řetězce za
FROMvšechny úvodní příznaky--platform=....- Parametry:
rest – Část řádku za
FROMpřed případnýmAS.- Vrací:
Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.
- parse_from_instruction(line)
Parsuje řádek Dockerfile začínající
FROMna odkaz na image a volitelné jméno stage.- Parametry:
line – Jeden řádek souboru (může obsahovat koncový komentář
#).- Vrací:
(image_ref, as_name)nebo None, pokud řádek není platnýFROM.
- repository_and_pin(image_ref)
Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).
- Parametry:
image_ref – Řetězec image bez části
AS(např.ghcr.io/foo:1.0).- Vrací:
(repo_key, pin_key)— obě části malými písmeny pro porovnání.
- choose_canonical_pin(pins)
Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).
- Parametry:
pins – Množina pin_key.
- Vrací:
Vítězný pin_key.
- pick_display_ref_for_pin(refs_by_pin, pin_key)
Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).
- Parametry:
refs_by_pin – Mapa pin_key -> původní řetězce image.
pin_key – Normalizovaný pin.
- Vrací:
Původní řetězec image.
- build_yaml()
Vytvoří nakonfigurovanou instanci
ruamel.yaml.YAMLpro čtení/zápis compose se zachováním uvozovek.- Vrací:
YAML loader/dumper vhodný pro úpravy compose souborů.
- scan_dockerfile(text)
Projde text Dockerfile a sejme externí base image (mimo
scratcha odkazy na již definované stage).- Parametry:
text – Celý obsah Dockerfile.
- Vrací:
Struktura s množinami pinů, mapou původních řetězců a
line_infoproFROM_IMG_RE.
- fix_dockerfile_text(text, fix, path, _verbose)
Zkontroluje nebo opraví konflikty pinů u stejného
repo_keyv jednom Dockerfile.- Parametry:
text – Původní obsah souboru.
fix – Pokud True, přepíše
FROMřádky na kanonický pin; pokud False, jen hlásí chyby.path – Cesta pro hlášky v chybách.
_verbose – Rezervováno (aktuálně nepoužito).
- Vrací:
Trojice
(nový_text, seznam_chyb, byl_upraven).
- iter_compose_service_images(data)
Projde
servicesv načteném compose a vrátí služby s řetězcovýmimage.- Parametry:
data – Kořenový dict compose (např. z
yaml.load), očekává se klíčservices.- Yield:
(název_služby, hodnota_image, spec_dict)—spec_dictumožní při--fixzměnitimagena místě.- Vrací:
Iterátor trojic (viz
:yield:); při neplatnémdataneboservicesnic nevygeneruje.
- scan_compose_intra(data, _path)
Shromáždí literální
imagez jednoho compose souboru pro intra-file kontrolu shody pinů.- Parametry:
data – Parsovaný obsah compose.
_path – Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).
- Vrací:
(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image).
- apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo_pin, fix, _verbose)
U jednoho compose sjednotí literální
imagese stejnýmrepo_keyna jeden kanonický pin.- Parametry:
data – Parsovaný strom compose (mění se in-place při
fix=True).path – Cesta k souboru pro chybové hlášky.
pins_by_repo – Agregace pinů ze
scan_compose_intra().refs_by_repo_pin – Mapa původních řetězců image podle pinu.
fix – Zapnout zápis oprav do
data._verbose – Rezervováno (nepoužito).
- Vrací:
(seznam_chyb, byl_proveden_zápis_do_dat).
- extract_prod_literal_map(data)
Z produkčního compose vytvoří mapu
repo_key→ literální řetězecimage(první výskyt).- Parametry:
data – Parsovaný
docker-compose.yml(nebo ekvivalent).- Vrací:
Slovník pro cross-file srovnání se spotřebitelskými compose.
- extract_dockerfile_repo_map(root)
Vytvoří mapu
repo_key-> display reference ze zdrojových lokálních Dockerfile.Používá se pro případy, kdy konkrétní compose repo nemá být porovnáváno proti produkčnímu compose literálu, ale proti základnímu
FROMv lokálním Dockerfile.- Parametry:
root – Kořen repozitáře.
- Vrací:
Mapa
repo_key-> referenční image string.
- apply_compose_cross_fix(consumer_data, consumer_path, prod_map, dockerfile_source_map, fix, verbose)
Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.
Repozitáře bez produkčního literálu jsou povolené jen tehdy, když jsou explicitně uvedené v
COMPOSE_CROSS_FILE_WHITELISTpro daný consumer compose, nebo když mají explicitní zdroj pravdy vCOMPOSE_DOCKERFILE_SOURCES. Ostatní případy jsou chyba i bez--verbose.- Parametry:
consumer_data – Parsovaný compose spotřebitele (mění se in-place při
fix=True).consumer_path – Cesta k souboru spotřebitele (logy a chyby).
prod_map – Výstup
extract_prod_literal_map()zdocker-compose.yml.dockerfile_source_map – Mapa
repo_key-> referenční image z lokálního Dockerfile.fix – Pokud True, přepíše
imagena hodnotu z prod při nesouladu pinu.verbose – Zapíná podrobné SKIP logy u neliterálních image (přes
log_msg).
- Vrací:
(seznam_chyb při fix=False, byl_proveden_zápis).
- process_dockerfiles(root, fix, verbose)
Zpracuje všechny Dockerfile z konstanty
DOCKERFILE_PATHSpod kořenemroot.- Parametry:
root – Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).
fix – Předáno do
fix_dockerfile_text(); při True zapíše změny na disk.verbose – Předáno dál pro případné budoucí logování (část volání zachována kvůli API).
- Vrací:
(souhrnné_chyby, seznam_cest_upravených_souborů).
- process_compose_files(root, fix, verbose)
Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.
Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).
- Parametry:
root – Kořen repozitáře.
fix – Zapne zápis YAML a cross-file úpravy spotřebitelů.
verbose – Statistiky přeskočených neliterálních referencí a SKIP u cross-file.
- Vrací:
(souhrnné_chyby, seznam_cest_upravených_souborů).
- main(argv)
Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.
- Parametry:
argv – Argumenty příkazové řádky (bez
sys.argv[0]); None znamenásys.argv[1:].- Vrací:
0 při úspěchu; 1 při nalezených chybách (bez
--fix) nebo po úpravě souborů (s--fix, aby pre-commit znovu zstageoval).
Zdrojový kód
1#!/usr/bin/env python3
2"""
3Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.
4
5- V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).
6- docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při --fix srovnají na prod.
7
8Výstup pro CI: řádky na stderr s prefixem ``[image-ref-parity]``.
9"""
10
11from __future__ import annotations
12
13import argparse
14import io
15import re
16import sys
17from collections import defaultdict
18from dataclasses import dataclass, field
19from pathlib import Path
20from typing import Any, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
21
22from packaging.version import InvalidVersion, Version
23from ruamel.yaml import YAML
24
25LOG_PREFIX = "[image-ref-parity]"
26
27# Relativní cesty od kořene repozitáře (pre-commit spouští z rootu).
28DOCKERFILE_PATHS = [
29 "Dockerfile",
30 "Dockerfile-DEV",
31 "Dockerfile-DB",
32 "fedora/Dockerfile",
33 "redis/Dockerfile",
34 "proxy/Dockerfile",
35]
36
37COMPOSE_PRODUCTION = ["docker-compose.yml"]
38COMPOSE_CONSUMERS = [
39 "docker-compose-dev-local-db-all-containers.yml",
40 "docker-compose-dev-local-db.yml",
41 "docker-compose-test.yml",
42 "git_docker-compose.yml",
43]
44
45# Repozitáře image, které jsou záměrně jen v konkrétních spotřebitelských compose
46# a nemají odpovídající literál v produkčním compose.
47COMPOSE_CROSS_FILE_WHITELIST: Dict[str, Set[str]] = {
48 "docker-compose-dev-local-db-all-containers.yml": {
49 "dpage/pgadmin4",
50 "memcached",
51 "fcrepo/fcrepo",
52 "postgres",
53 },
54 "docker-compose-test.yml": {
55 "docker.io/library/test_prod",
56 "docker.io/library/test_proxy",
57 "docker.io/library/test_redis",
58 },
59 "git_docker-compose.yml": {
60 "postgis/postgis",
61 },
62}
63
64COMPOSE_DOCKERFILE_SOURCES: Dict[str, str] = {
65 "redis": "redis/Dockerfile",
66}
67
68FROM_LINE = re.compile(r"^FROM\s+(?P<rest>.+?)\s*(?:#.*)?$", re.IGNORECASE)
69AS_SUFFIX = re.compile(r"\s+AS\s+(\S+)\s*$", re.IGNORECASE)
70# FROM s nahraditelným image tokenem (image bez komentáře na konci řádku zvlášť).
71FROM_IMG_RE = re.compile(
72 r"^(FROM\s+)((?:--platform=\S+\s+)+)?(\S+)((\s+AS\s+\S+)?)(\s*(?:#.*)?)$",
73 re.IGNORECASE,
74)
75
76
77def log_msg(message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
78 """
79 Vypíše jeden řádek na stderr s prefixem pro grep v CI.
80
81 :param message: Text bez prefixu.
82 :param verbose_only: Pokud True, vypíše jen při verbose režimu.
83 :param verbose: Aktuální verbose příznak.
84 """
85 if verbose_only and not verbose:
86 return
87 print(f"{LOG_PREFIX} {message}", file=sys.stderr)
88
89
90def is_literal_image_ref(value: str) -> bool:
91 """
92 Určí, zda hodnota ``image:`` v compose lze použít pro porovnání pinů (bez rozšíření proměnných).
93
94 :param value: Řetězec z pole ``image`` u služby.
95 :return: False pro prázdný řetězec, obsah ``${`` / ``$$``; jinak True.
96 """
97 s = value.strip()
98 if not s:
99 return False
100 if "${" in s or "$$" in s:
101 return False
102 return True
103
104
105def _strip_platform_flags(rest: str) -> str:
106 """
107 Odstraní z řetězce za ``FROM`` všechny úvodní příznaky ``--platform=...``.
108
109 :param rest: Část řádku za ``FROM`` před případným ``AS``.
110 :return: Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.
111 """
112 current = rest.strip()
113 while current.startswith("--"):
114 space_idx = current.find(" ")
115 if space_idx == -1:
116 return ""
117 current = current[space_idx + 1 :].strip()
118 return current
119
120
121def parse_from_instruction(line: str) -> Optional[Tuple[str, Optional[str]]]:
122 """
123 Parsuje řádek Dockerfile začínající ``FROM`` na odkaz na image a volitelné jméno stage.
124
125 :param line: Jeden řádek souboru (může obsahovat koncový komentář ``#``).
126 :return: ``(image_ref, as_name)`` nebo None, pokud řádek není platný ``FROM``.
127 """
128 stripped = line.strip()
129 m = FROM_LINE.match(stripped)
130 if not m:
131 return None
132 rest = m.group("rest").strip()
133 as_name: Optional[str] = None
134 as_m = AS_SUFFIX.search(rest)
135 if as_m:
136 as_name = as_m.group(1)
137 rest = rest[: as_m.start()].strip()
138 rest = _strip_platform_flags(rest)
139 if not rest:
140 return None
141 return rest, as_name
142
143
144def repository_and_pin(image_ref: str) -> Tuple[str, str]:
145 """
146 Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).
147
148 :param image_ref: Řetězec image bez části ``AS`` (např. ``ghcr.io/foo:1.0``).
149 :return: ``(repo_key, pin_key)`` — obě části malými písmeny pro porovnání.
150 """
151 ref = image_ref.strip()
152 lower = ref.lower()
153 digest_idx = lower.find("@sha256:")
154 digest_suffix = ""
155 if digest_idx != -1:
156 digest_suffix = ref[digest_idx:]
157 ref = ref[:digest_idx]
158
159 last_slash = ref.rfind("/")
160 tail = ref[last_slash + 1 :]
161 if ":" in tail:
162 repo, _, tag = ref.rpartition(":")
163 else:
164 repo, tag = ref, ""
165
166 pin = (tag + digest_suffix).lower()
167 return repo.lower(), pin
168
169
170def choose_canonical_pin(pins: Set[str]) -> str:
171 """
172 Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).
173
174 :param pins: Množina pin_key.
175 :return: Vítězný pin_key.
176 """
177 if len(pins) == 1:
178 return next(iter(pins))
179 pins_list = list(pins)
180
181 def sort_key(pin: str) -> Tuple[int, Any]:
182 tag_part = pin.split("@sha256:", 1)[0]
183 try:
184 return (0, Version(tag_part))
185 except InvalidVersion:
186 return (1, pin)
187
188 if all(sort_key(p)[0] == 0 for p in pins_list):
189 return max(pins_list, key=lambda p: Version(p.split("@sha256:", 1)[0]))
190 return max(pins_list)
191
192
193def pick_display_ref_for_pin(refs_by_pin: Dict[str, List[str]], pin_key: str) -> str:
194 """
195 Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).
196
197 :param refs_by_pin: Mapa pin_key -> původní řetězce image.
198 :param pin_key: Normalizovaný pin.
199 :return: Původní řetězec image.
200 """
201 candidates = refs_by_pin.get(pin_key, [])
202 if not candidates:
203 return pin_key
204 return candidates[0]
205
206
207def build_yaml() -> YAML:
208 """
209 Vytvoří nakonfigurovanou instanci ``ruamel.yaml.YAML`` pro čtení/zápis compose se zachováním uvozovek.
210
211 :return: YAML loader/dumper vhodný pro úpravy compose souborů.
212 """
213 y = YAML()
214 y.preserve_quotes = True
215 y.indent(mapping=2, sequence=4, offset=2)
216 return y
217
218
219@dataclass
220class DockerfileScan:
221 """
222 Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis ``FROM``.
223 """
224
225 pins_by_repo: DefaultDict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))
226 refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = field(
227 default_factory=lambda: defaultdict(lambda: defaultdict(list))
228 )
229 line_info: List[Dict[str, Any]] = field(default_factory=list)
230
231
232def scan_dockerfile(text: str) -> DockerfileScan:
233 """
234 Projde text Dockerfile a sejme externí base image (mimo ``scratch`` a odkazy na již definované stage).
235
236 :param text: Celý obsah Dockerfile.
237 :return: Struktura s množinami pinů, mapou původních řetězců a ``line_info`` pro ``FROM_IMG_RE``.
238 """
239 scan = DockerfileScan()
240 stages: Set[str] = set()
241 for lineno, line in enumerate(text.splitlines(), start=1):
242 parsed = parse_from_instruction(line)
243 if parsed is None:
244 continue
245 image_ref, as_name = parsed
246 if image_ref.lower() == "scratch":
247 if as_name:
248 stages.add(as_name)
249 continue
250 if image_ref in stages:
251 if as_name:
252 stages.add(as_name)
253 continue
254 repo_key, pin_key = repository_and_pin(image_ref)
255 scan.pins_by_repo[repo_key].add(pin_key)
256 scan.refs_by_repo_pin[repo_key][pin_key].append(image_ref)
257 m = FROM_IMG_RE.match(line.rstrip("\n"))
258 if m:
259 scan.line_info.append(
260 {
261 "lineno": lineno,
262 "line": line,
263 "repo_key": repo_key,
264 "pin_key": pin_key,
265 "prefix": m.group(1) + (m.group(2) or ""),
266 "as_part": m.group(4) or "",
267 "suffix": m.group(6) or "",
268 }
269 )
270 if as_name:
271 stages.add(as_name)
272 return scan
273
274
275def fix_dockerfile_text(text: str, fix: bool, path: Path, _verbose: bool) -> Tuple[str, List[str], bool]:
276 """
277 Zkontroluje nebo opraví konflikty pinů u stejného ``repo_key`` v jednom Dockerfile.
278
279 :param text: Původní obsah souboru.
280 :param fix: Pokud True, přepíše ``FROM`` řádky na kanonický pin; pokud False, jen hlásí chyby.
281 :param path: Cesta pro hlášky v chybách.
282 :param _verbose: Rezervováno (aktuálně nepoužito).
283 :return: Trojice ``(nový_text, seznam_chyb, byl_upraven)``.
284 """
285 errors: List[str] = []
286 scan = scan_dockerfile(text)
287 for repo_key, pins in scan.pins_by_repo.items():
288 if len(pins) <= 1:
289 continue
290 if not fix:
291 detail_parts = []
292 for pin in sorted(pins):
293 detail_parts.append(f" - {repr(pin) if pin else '(bez explicitního tagu)'}")
294 errors.append(
295 f"{path}: stejný image {repo_key!r} má v jednom Dockerfile různé tagy/digesty:\n"
296 + "\n".join(detail_parts)
297 )
298 log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
299 continue
300 win = choose_canonical_pin(set(pins))
301 new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
302 log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na pin {win!r} (→ {new_ref!r})")
303
304 if errors and not fix:
305 return text, errors, False
306
307 if not fix:
308 return text, [], False
309
310 lines = text.splitlines(keepends=True)
311 modified = False
312 for info in scan.line_info:
313 repo_key = info["repo_key"]
314 pins = scan.pins_by_repo[repo_key]
315 if len(pins) <= 1:
316 continue
317 win = choose_canonical_pin(set(pins))
318 if info["pin_key"] == win:
319 continue
320 new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
321 lineno = info["lineno"]
322 idx = lineno - 1
323 old = lines[idx]
324 m = FROM_IMG_RE.match(old.rstrip("\r\n"))
325 if not m:
326 continue
327 nl = "\n" if old.endswith("\n") else ""
328 if old.endswith("\r\n"):
329 nl = "\r\n"
330 new_line = m.group(1) + (m.group(2) or "") + new_ref + (m.group(4) or "") + (m.group(6) or "") + nl
331 if new_line != old:
332 lines[idx] = new_line
333 modified = True
334 return "".join(lines), [], modified
335
336
337def iter_compose_service_images(data: Any) -> Iterable[Tuple[str, str, Any]]:
338 """
339 Projde ``services`` v načteném compose a vrátí služby s řetězcovým ``image``.
340
341 :param data: Kořenový dict compose (např. z ``yaml.load``), očekává se klíč ``services``.
342 :yield: ``(název_služby, hodnota_image, spec_dict)`` — ``spec_dict`` umožní při ``--fix`` změnit ``image`` na místě.
343 :return: Iterátor trojic (viz ``:yield:``); při neplatném ``data`` nebo ``services`` nic nevygeneruje.
344 """
345 if not isinstance(data, dict):
346 return
347 services = data.get("services")
348 if not isinstance(services, dict):
349 return
350 for svc_name, spec in services.items():
351 if not isinstance(spec, dict):
352 continue
353 img = spec.get("image")
354 if isinstance(img, str):
355 yield svc_name, img, spec
356
357
358def scan_compose_intra(
359 data: Any, _path: Path
360) -> Tuple[DefaultDict[str, Set[str]], DefaultDict[str, DefaultDict[str, List[str]]], int]:
361 """
362 Shromáždí literální ``image`` z jednoho compose souboru pro intra-file kontrolu shody pinů.
363
364 :param data: Parsovaný obsah compose.
365 :param _path: Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).
366 :return: ``(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image)``.
367 """
368 pins_by_repo: DefaultDict[str, Set[str]] = defaultdict(set)
369 refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = defaultdict(lambda: defaultdict(list))
370 skipped_nonliteral = 0
371 for _svc_name, img, _spec in iter_compose_service_images(data):
372 if not is_literal_image_ref(img):
373 skipped_nonliteral += 1
374 continue
375 repo_key, pin_key = repository_and_pin(img)
376 pins_by_repo[repo_key].add(pin_key)
377 refs_by_repo_pin[repo_key][pin_key].append(img)
378 return pins_by_repo, refs_by_repo_pin, skipped_nonliteral
379
380
381def apply_compose_intra_fix(
382 data: Any,
383 path: Path,
384 pins_by_repo: DefaultDict[str, Set[str]],
385 refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]],
386 fix: bool,
387 _verbose: bool,
388) -> Tuple[List[str], bool]:
389 """
390 U jednoho compose sjednotí literální ``image`` se stejným ``repo_key`` na jeden kanonický pin.
391
392 :param data: Parsovaný strom compose (mění se in-place při ``fix=True``).
393 :param path: Cesta k souboru pro chybové hlášky.
394 :param pins_by_repo: Agregace pinů ze :func:`scan_compose_intra`.
395 :param refs_by_repo_pin: Mapa původních řetězců image podle pinu.
396 :param fix: Zapnout zápis oprav do ``data``.
397 :param _verbose: Rezervováno (nepoužito).
398 :return: ``(seznam_chyb, byl_proveden_zápis_do_dat)``.
399 """
400 errors: List[str] = []
401 modified = False
402 for repo_key, pins in pins_by_repo.items():
403 if len(pins) <= 1:
404 continue
405 if not fix:
406 errors.append(f"{path}: stejný image {repo_key!r} má v compose různé tagy/digesty: {sorted(pins)}")
407 log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
408 continue
409 win = choose_canonical_pin(set(pins))
410 new_ref = pick_display_ref_for_pin({k: list(v) for k, v in refs_by_repo_pin[repo_key].items()}, win)
411 log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na {new_ref!r}")
412 for _svc_name, img, spec in iter_compose_service_images(data):
413 if not is_literal_image_ref(img):
414 continue
415 rk, pk = repository_and_pin(img)
416 if rk != repo_key:
417 continue
418 if pk != win:
419 spec["image"] = new_ref
420 modified = True
421 return errors, modified
422
423
424def extract_prod_literal_map(data: Any) -> Dict[str, str]:
425 """
426 Z produkčního compose vytvoří mapu ``repo_key`` → literální řetězec ``image`` (první výskyt).
427
428 :param data: Parsovaný ``docker-compose.yml`` (nebo ekvivalent).
429 :return: Slovník pro cross-file srovnání se spotřebitelskými compose.
430 """
431 out: Dict[str, str] = {}
432 for _svc, img, _spec in iter_compose_service_images(data):
433 if not is_literal_image_ref(img):
434 continue
435 repo_key, _pin = repository_and_pin(img)
436 if repo_key not in out:
437 out[repo_key] = img.strip()
438 return out
439
440
441def extract_dockerfile_repo_map(root: Path) -> Dict[str, str]:
442 """
443 Vytvoří mapu ``repo_key`` -> display reference ze zdrojových lokálních Dockerfile.
444
445 Používá se pro případy, kdy konkrétní compose repo nemá být porovnáváno proti
446 produkčnímu compose literálu, ale proti základnímu ``FROM`` v lokálním Dockerfile.
447
448 :param root: Kořen repozitáře.
449 :return: Mapa ``repo_key`` -> referenční image string.
450 """
451 out: Dict[str, str] = {}
452 for repo_key, rel_path in COMPOSE_DOCKERFILE_SOURCES.items():
453 path = root / rel_path
454 if not path.is_file():
455 continue
456 scan = scan_dockerfile(path.read_text(encoding="utf-8"))
457 pins = scan.pins_by_repo.get(repo_key, set())
458 if not pins:
459 continue
460 win = choose_canonical_pin(set(pins))
461 out[repo_key] = pick_display_ref_for_pin(
462 {k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()},
463 win,
464 )
465 return out
466
467
468def apply_compose_cross_fix(
469 consumer_data: Any,
470 consumer_path: Path,
471 prod_map: Dict[str, str],
472 dockerfile_source_map: Dict[str, str],
473 fix: bool,
474 verbose: bool,
475) -> Tuple[List[str], bool]:
476 """
477 Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.
478
479 Repozitáře bez produkčního literálu jsou povolené jen tehdy, když jsou
480 explicitně uvedené v ``COMPOSE_CROSS_FILE_WHITELIST`` pro daný consumer
481 compose, nebo když mají explicitní zdroj pravdy v ``COMPOSE_DOCKERFILE_SOURCES``.
482 Ostatní případy jsou chyba i bez ``--verbose``.
483
484 :param consumer_data: Parsovaný compose spotřebitele (mění se in-place při ``fix=True``).
485 :param consumer_path: Cesta k souboru spotřebitele (logy a chyby).
486 :param prod_map: Výstup :func:`extract_prod_literal_map` z ``docker-compose.yml``.
487 :param dockerfile_source_map: Mapa ``repo_key`` -> referenční image z lokálního Dockerfile.
488 :param fix: Pokud True, přepíše ``image`` na hodnotu z prod při nesouladu pinu.
489 :param verbose: Zapíná podrobné SKIP logy u neliterálních image (přes ``log_msg``).
490 :return: ``(seznam_chyb při fix=False, byl_proveden_zápis)``.
491 """
492 errors: List[str] = []
493 modified = False
494 seen_missing_prod: Set[Tuple[str, str]] = set()
495 whitelist = COMPOSE_CROSS_FILE_WHITELIST.get(consumer_path.name, set())
496
497 for svc_name, img, spec in iter_compose_service_images(consumer_data):
498 if not is_literal_image_ref(img):
499 if verbose:
500 log_msg(
501 f"SKIP: {consumer_path} služba {svc_name!r}: neliterální image (přeskočeno)",
502 verbose_only=True,
503 verbose=verbose,
504 )
505 continue
506 repo_key, pin_key = repository_and_pin(img)
507 if repo_key in prod_map:
508 prod_ref = prod_map[repo_key]
509 _, prod_pin = repository_and_pin(prod_ref)
510 if pin_key != prod_pin:
511 msg = f"{consumer_path}: služba {svc_name!r}: {img!r} ≠ prod {prod_ref!r} " f"(repo {repo_key!r})"
512 if not fix:
513 errors.append(msg)
514 log_msg(f"ERROR: {msg}")
515 else:
516 log_msg(f"FIX: {consumer_path} {svc_name!r}: {img!r} → {prod_ref!r} (prod)")
517 spec["image"] = prod_ref
518 modified = True
519 elif repo_key in dockerfile_source_map:
520 source_ref = dockerfile_source_map[repo_key]
521 _, source_pin = repository_and_pin(source_ref)
522 if pin_key != source_pin:
523 msg = (
524 f"{consumer_path}: služba {svc_name!r}: {img!r} != Dockerfile source {source_ref!r} "
525 f"(repo {repo_key!r})"
526 )
527 if not fix:
528 errors.append(msg)
529 log_msg(f"ERROR: {msg}")
530 else:
531 log_msg(f"FIX: {consumer_path} {svc_name!r}: {img!r} → {source_ref!r} (Dockerfile)")
532 spec["image"] = source_ref
533 modified = True
534 else:
535 key = (repo_key, str(consumer_path))
536 if key in seen_missing_prod:
537 continue
538 seen_missing_prod.add(key)
539 if repo_key in whitelist:
540 log_msg(
541 f"NOTE: {consumer_path}: repo {repo_key!r} má literál {img!r}, "
542 "ale je explicitně povolený jako dev-only výjimka mimo produkční compose."
543 )
544 continue
545 msg = (
546 f"{consumer_path}: repo {repo_key!r} má literál {img!r}, "
547 f"ale {COMPOSE_PRODUCTION[0]} neobsahuje srovnatelný literál pro stejný repozitář "
548 "(cross-file srovnání vyžaduje whitelist nebo produkční literál)"
549 )
550 errors.append(msg)
551 log_msg(f"ERROR: {msg}")
552 return errors, modified
553
554
555def process_dockerfiles(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
556 """
557 Zpracuje všechny Dockerfile z konstanty ``DOCKERFILE_PATHS`` pod kořenem ``root``.
558
559 :param root: Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).
560 :param fix: Předáno do :func:`fix_dockerfile_text`; při True zapíše změny na disk.
561 :param verbose: Předáno dál pro případné budoucí logování (část volání zachována kvůli API).
562 :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
563 """
564 errors: List[str] = []
565 modified_paths: List[Path] = []
566 for rel in DOCKERFILE_PATHS:
567 path = root / rel
568 if not path.is_file():
569 continue
570 text = path.read_text(encoding="utf-8")
571 new_text, errs, mod = fix_dockerfile_text(text, fix, path, verbose)
572 errors.extend(errs)
573 if mod and fix:
574 path.write_text(new_text, encoding="utf-8")
575 modified_paths.append(path)
576 log_msg(f"Upraven soubor {path}")
577 return errors, modified_paths
578
579
580def process_compose_files(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
581 """
582 Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.
583
584 Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).
585
586 :param root: Kořen repozitáře.
587 :param fix: Zapne zápis YAML a cross-file úpravy spotřebitelů.
588 :param verbose: Statistiky přeskočených neliterálních referencí a SKIP u cross-file.
589 :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
590 """
591 yaml = build_yaml()
592 errors: List[str] = []
593 modified_paths: List[Path] = []
594 loaded: Dict[Path, Any] = {}
595 originals: Dict[Path, str] = {}
596
597 all_compose = [root / p for p in COMPOSE_PRODUCTION + COMPOSE_CONSUMERS if (root / p).is_file()]
598
599 for path in all_compose:
600 raw = path.read_text(encoding="utf-8")
601 originals[path] = raw
602 loaded[path] = yaml.load(raw) or {}
603
604 compose_dirty: Set[Path] = set()
605
606 for path in all_compose:
607 data = loaded[path]
608 pins_by_repo, refs_by_repo, skipped = scan_compose_intra(data, path)
609 if verbose and skipped:
610 log_msg(
611 f"INFO: {path}: přeskočeno {skipped} neliterálních image referencí",
612 verbose_only=True,
613 verbose=verbose,
614 )
615 errs, intra_mod = apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo, fix, verbose)
616 errors.extend(errs)
617 if intra_mod:
618 compose_dirty.add(path)
619
620 prod_path = root / COMPOSE_PRODUCTION[0]
621 prod_map: Dict[str, str] = {}
622 dockerfile_source_map = extract_dockerfile_repo_map(root)
623 if prod_path in loaded:
624 prod_map = extract_prod_literal_map(loaded[prod_path])
625
626 for rel in COMPOSE_CONSUMERS:
627 path = root / rel
628 if path not in loaded:
629 continue
630 errs, cross_mod = apply_compose_cross_fix(loaded[path], path, prod_map, dockerfile_source_map, fix, verbose)
631 errors.extend(errs)
632 if cross_mod:
633 compose_dirty.add(path)
634
635 if fix:
636 for path in sorted(compose_dirty, key=str):
637 out = io.StringIO()
638 yaml.dump(loaded[path], out)
639 path.write_text(out.getvalue(), encoding="utf-8")
640 modified_paths.append(path)
641 log_msg(f"Upraven soubor {path}")
642
643 return errors, modified_paths
644
645
646def main(argv: Optional[List[str]] = None) -> int:
647 """
648 Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.
649
650 :param argv: Argumenty příkazové řádky (bez ``sys.argv[0]``); None znamená ``sys.argv[1:]``.
651 :return: 0 při úspěchu; 1 při nalezených chybách (bez ``--fix``) nebo po úpravě souborů (s ``--fix``, aby pre-commit znovu zstageoval).
652 """
653 parser = argparse.ArgumentParser(description="Kontrola shody container image pinů (Dockerfile + compose).")
654 parser.add_argument("files", nargs="*", help="Ignorováno — vždy se kontrolují pevné cesty v repozitáři.")
655 parser.add_argument(
656 "--fix",
657 action="store_true",
658 help="Opravit soubory na místě (intra + cross na spotřebitele).",
659 )
660 parser.add_argument(
661 "--verbose",
662 action="store_true",
663 help="Podrobnější výstup.",
664 )
665 args = parser.parse_args(argv)
666
667 verbose = bool(args.verbose)
668 root = Path.cwd()
669
670 all_errors: List[str] = []
671 modified: List[Path] = []
672
673 de, md = process_dockerfiles(root, args.fix, verbose)
674 all_errors.extend(de)
675 modified.extend(md)
676
677 ce, mc = process_compose_files(root, args.fix, verbose)
678 all_errors.extend(ce)
679 modified.extend(mc)
680
681 for err in all_errors:
682 if not err.startswith(LOG_PREFIX):
683 print(err, file=sys.stderr)
684
685 if all_errors:
686 return 1
687
688 if args.fix and modified:
689 log_msg(f"HOTOVÁ ÚPRAVA: {len(modified)} soubor(ů); znovu přidej do commitu.")
690 return 1
691
692 return 0
693
694
695if __name__ == "__main__":
696 sys.exit(main())