Skript check_container_image_reference_parity.py

Automaticky generovaná dokumentace skriptu scripts/check_container_image_reference_parity.py.

Přehled modulu

Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.

  • V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).

  • docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při –fix srovnají na prod.

Výstup pro CI: řádky na stderr s prefixem [image-ref-parity].

Třídy

class DockerfileScan

Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis FROM.

Funkce

log_msg(message)

Vypíše jeden řádek na stderr s prefixem pro grep v CI.

Parametry:
  • message – Text bez prefixu.

  • verbose_only – Pokud True, vypíše jen při verbose režimu.

  • verbose – Aktuální verbose příznak.

is_literal_image_ref(value)

Určí, zda hodnota image: v compose lze použít pro porovnání pinů (bez rozšíření proměnných).

Parametry:

value – Řetězec z pole image u služby.

Vrací:

False pro prázdný řetězec, obsah ${ / $$; jinak True.

_strip_platform_flags(rest)

Odstraní z řetězce za FROM všechny úvodní příznaky --platform=....

Parametry:

rest – Část řádku za FROM před případným AS.

Vrací:

Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.

parse_from_instruction(line)

Parsuje řádek Dockerfile začínající FROM na odkaz na image a volitelné jméno stage.

Parametry:

line – Jeden řádek souboru (může obsahovat koncový komentář #).

Vrací:

(image_ref, as_name) nebo None, pokud řádek není platný FROM.

repository_and_pin(image_ref)

Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).

Parametry:

image_ref – Řetězec image bez části AS (např. ghcr.io/foo:1.0).

Vrací:

(repo_key, pin_key) — obě části malými písmeny pro porovnání.

choose_canonical_pin(pins)

Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).

Parametry:

pins – Množina pin_key.

Vrací:

Vítězný pin_key.

pick_display_ref_for_pin(refs_by_pin, pin_key)

Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).

Parametry:
  • refs_by_pin – Mapa pin_key -> původní řetězce image.

  • pin_key – Normalizovaný pin.

Vrací:

Původní řetězec image.

build_yaml()

Vytvoří nakonfigurovanou instanci ruamel.yaml.YAML pro čtení/zápis compose se zachováním uvozovek.

Vrací:

YAML loader/dumper vhodný pro úpravy compose souborů.

scan_dockerfile(text)

Projde text Dockerfile a sejme externí base image (mimo scratch a odkazy na již definované stage).

Parametry:

text – Celý obsah Dockerfile.

Vrací:

Struktura s množinami pinů, mapou původních řetězců a line_info pro FROM_IMG_RE.

fix_dockerfile_text(text, fix, path, _verbose)

Zkontroluje nebo opraví konflikty pinů u stejného repo_key v jednom Dockerfile.

Parametry:
  • text – Původní obsah souboru.

  • fix – Pokud True, přepíše FROM řádky na kanonický pin; pokud False, jen hlásí chyby.

  • path – Cesta pro hlášky v chybách.

  • _verbose – Rezervováno (aktuálně nepoužito).

Vrací:

Trojice (nový_text, seznam_chyb, byl_upraven).

iter_compose_service_images(data)

Projde services v načteném compose a vrátí služby s řetězcovým image.

Parametry:

data – Kořenový dict compose (např. z yaml.load), očekává se klíč services.

Yield:

(název_služby, hodnota_image, spec_dict)spec_dict umožní při --fix změnit image na místě.

Vrací:

Iterátor trojic (viz :yield:); při neplatném data nebo services nic nevygeneruje.

scan_compose_intra(data, _path)

Shromáždí literální image z jednoho compose souboru pro intra-file kontrolu shody pinů.

Parametry:
  • data – Parsovaný obsah compose.

  • _path – Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).

Vrací:

(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image).

apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo_pin, fix, _verbose)

U jednoho compose sjednotí literální image se stejným repo_key na jeden kanonický pin.

Parametry:
  • data – Parsovaný strom compose (mění se in-place při fix=True).

  • path – Cesta k souboru pro chybové hlášky.

  • pins_by_repo – Agregace pinů ze scan_compose_intra().

  • refs_by_repo_pin – Mapa původních řetězců image podle pinu.

  • fix – Zapnout zápis oprav do data.

  • _verbose – Rezervováno (nepoužito).

Vrací:

(seznam_chyb, byl_proveden_zápis_do_dat).

extract_prod_literal_map(data)

Z produkčního compose vytvoří mapu repo_key → literální řetězec image (první výskyt).

Parametry:

data – Parsovaný docker-compose.yml (nebo ekvivalent).

Vrací:

Slovník pro cross-file srovnání se spotřebitelskými compose.

extract_dockerfile_repo_map(root)

Vytvoří mapu repo_key -> display reference ze zdrojových lokálních Dockerfile.

Používá se pro případy, kdy konkrétní compose repo nemá být porovnáváno proti produkčnímu compose literálu, ale proti základnímu FROM v lokálním Dockerfile.

Parametry:

root – Kořen repozitáře.

Vrací:

Mapa repo_key -> referenční image string.

apply_compose_cross_fix(consumer_data, consumer_path, prod_map, dockerfile_source_map, fix, verbose)

Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.

Repozitáře bez produkčního literálu jsou povolené jen tehdy, když jsou explicitně uvedené v COMPOSE_CROSS_FILE_WHITELIST pro daný consumer compose, nebo když mají explicitní zdroj pravdy v COMPOSE_DOCKERFILE_SOURCES. Ostatní případy jsou chyba i bez --verbose.

Parametry:
  • consumer_data – Parsovaný compose spotřebitele (mění se in-place při fix=True).

  • consumer_path – Cesta k souboru spotřebitele (logy a chyby).

  • prod_map – Výstup extract_prod_literal_map() z docker-compose.yml.

  • dockerfile_source_map – Mapa repo_key -> referenční image z lokálního Dockerfile.

  • fix – Pokud True, přepíše image na hodnotu z prod při nesouladu pinu.

  • verbose – Zapíná podrobné SKIP logy u neliterálních image (přes log_msg).

Vrací:

(seznam_chyb při fix=False, byl_proveden_zápis).

process_dockerfiles(root, fix, verbose)

Zpracuje všechny Dockerfile z konstanty DOCKERFILE_PATHS pod kořenem root.

Parametry:
  • root – Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).

  • fix – Předáno do fix_dockerfile_text(); při True zapíše změny na disk.

  • verbose – Předáno dál pro případné budoucí logování (část volání zachována kvůli API).

Vrací:

(souhrnné_chyby, seznam_cest_upravených_souborů).

process_compose_files(root, fix, verbose)

Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.

Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).

Parametry:
  • root – Kořen repozitáře.

  • fix – Zapne zápis YAML a cross-file úpravy spotřebitelů.

  • verbose – Statistiky přeskočených neliterálních referencí a SKIP u cross-file.

Vrací:

(souhrnné_chyby, seznam_cest_upravených_souborů).

main(argv)

Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.

Parametry:

argv – Argumenty příkazové řádky (bez sys.argv[0]); None znamená sys.argv[1:].

Vrací:

0 při úspěchu; 1 při nalezených chybách (bez --fix) nebo po úpravě souborů (s --fix, aby pre-commit znovu zstageoval).

Zdrojový kód

  1#!/usr/bin/env python3
  2"""
  3Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.
  4
  5- V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).
  6- docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při --fix srovnají na prod.
  7
  8Výstup pro CI: řádky na stderr s prefixem ``[image-ref-parity]``.
  9"""
 10
 11from __future__ import annotations
 12
 13import argparse
 14import io
 15import re
 16import sys
 17from collections import defaultdict
 18from dataclasses import dataclass, field
 19from pathlib import Path
 20from typing import Any, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
 21
 22from packaging.version import InvalidVersion, Version
 23from ruamel.yaml import YAML
 24
 25LOG_PREFIX = "[image-ref-parity]"
 26
 27# Relativní cesty od kořene repozitáře (pre-commit spouští z rootu).
 28DOCKERFILE_PATHS = [
 29    "Dockerfile",
 30    "Dockerfile-DEV",
 31    "Dockerfile-DB",
 32    "fedora/Dockerfile",
 33    "redis/Dockerfile",
 34    "proxy/Dockerfile",
 35]
 36
 37COMPOSE_PRODUCTION = ["docker-compose.yml"]
 38COMPOSE_CONSUMERS = [
 39    "docker-compose-dev-local-db-all-containers.yml",
 40    "docker-compose-dev-local-db.yml",
 41    "docker-compose-test.yml",
 42    "git_docker-compose.yml",
 43]
 44
 45# Repozitáře image, které jsou záměrně jen v konkrétních spotřebitelských compose
 46# a nemají odpovídající literál v produkčním compose.
 47COMPOSE_CROSS_FILE_WHITELIST: Dict[str, Set[str]] = {
 48    "docker-compose-dev-local-db-all-containers.yml": {
 49        "dpage/pgadmin4",
 50        "memcached",
 51        "fcrepo/fcrepo",
 52        "postgres",
 53    },
 54    "docker-compose-test.yml": {
 55        "docker.io/library/test_prod",
 56        "docker.io/library/test_proxy",
 57        "docker.io/library/test_redis",
 58    },
 59    "git_docker-compose.yml": {
 60        "postgis/postgis",
 61    },
 62}
 63
 64COMPOSE_DOCKERFILE_SOURCES: Dict[str, str] = {
 65    "redis": "redis/Dockerfile",
 66}
 67
 68FROM_LINE = re.compile(r"^FROM\s+(?P<rest>.+?)\s*(?:#.*)?$", re.IGNORECASE)
 69AS_SUFFIX = re.compile(r"\s+AS\s+(\S+)\s*$", re.IGNORECASE)
 70# FROM s nahraditelným image tokenem (image bez komentáře na konci řádku zvlášť).
 71FROM_IMG_RE = re.compile(
 72    r"^(FROM\s+)((?:--platform=\S+\s+)+)?(\S+)((\s+AS\s+\S+)?)(\s*(?:#.*)?)$",
 73    re.IGNORECASE,
 74)
 75
 76
 77def log_msg(message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
 78    """
 79    Vypíše jeden řádek na stderr s prefixem pro grep v CI.
 80
 81    :param message: Text bez prefixu.
 82    :param verbose_only: Pokud True, vypíše jen při verbose režimu.
 83    :param verbose: Aktuální verbose příznak.
 84    """
 85    if verbose_only and not verbose:
 86        return
 87    print(f"{LOG_PREFIX} {message}", file=sys.stderr)
 88
 89
 90def is_literal_image_ref(value: str) -> bool:
 91    """
 92    Určí, zda hodnota ``image:`` v compose lze použít pro porovnání pinů (bez rozšíření proměnných).
 93
 94    :param value: Řetězec z pole ``image`` u služby.
 95    :return: False pro prázdný řetězec, obsah ``${`` / ``$$``; jinak True.
 96    """
 97    s = value.strip()
 98    if not s:
 99        return False
100    if "${" in s or "$$" in s:
101        return False
102    return True
103
104
105def _strip_platform_flags(rest: str) -> str:
106    """
107    Odstraní z řetězce za ``FROM`` všechny úvodní příznaky ``--platform=...``.
108
109    :param rest: Část řádku za ``FROM`` před případným ``AS``.
110    :return: Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.
111    """
112    current = rest.strip()
113    while current.startswith("--"):
114        space_idx = current.find(" ")
115        if space_idx == -1:
116            return ""
117        current = current[space_idx + 1 :].strip()
118    return current
119
120
121def parse_from_instruction(line: str) -> Optional[Tuple[str, Optional[str]]]:
122    """
123    Parsuje řádek Dockerfile začínající ``FROM`` na odkaz na image a volitelné jméno stage.
124
125    :param line: Jeden řádek souboru (může obsahovat koncový komentář ``#``).
126    :return: ``(image_ref, as_name)`` nebo None, pokud řádek není platný ``FROM``.
127    """
128    stripped = line.strip()
129    m = FROM_LINE.match(stripped)
130    if not m:
131        return None
132    rest = m.group("rest").strip()
133    as_name: Optional[str] = None
134    as_m = AS_SUFFIX.search(rest)
135    if as_m:
136        as_name = as_m.group(1)
137        rest = rest[: as_m.start()].strip()
138    rest = _strip_platform_flags(rest)
139    if not rest:
140        return None
141    return rest, as_name
142
143
144def repository_and_pin(image_ref: str) -> Tuple[str, str]:
145    """
146    Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).
147
148    :param image_ref: Řetězec image bez části ``AS`` (např. ``ghcr.io/foo:1.0``).
149    :return: ``(repo_key, pin_key)`` — obě části malými písmeny pro porovnání.
150    """
151    ref = image_ref.strip()
152    lower = ref.lower()
153    digest_idx = lower.find("@sha256:")
154    digest_suffix = ""
155    if digest_idx != -1:
156        digest_suffix = ref[digest_idx:]
157        ref = ref[:digest_idx]
158
159    last_slash = ref.rfind("/")
160    tail = ref[last_slash + 1 :]
161    if ":" in tail:
162        repo, _, tag = ref.rpartition(":")
163    else:
164        repo, tag = ref, ""
165
166    pin = (tag + digest_suffix).lower()
167    return repo.lower(), pin
168
169
170def choose_canonical_pin(pins: Set[str]) -> str:
171    """
172    Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).
173
174    :param pins: Množina pin_key.
175    :return: Vítězný pin_key.
176    """
177    if len(pins) == 1:
178        return next(iter(pins))
179    pins_list = list(pins)
180
181    def sort_key(pin: str) -> Tuple[int, Any]:
182        tag_part = pin.split("@sha256:", 1)[0]
183        try:
184            return (0, Version(tag_part))
185        except InvalidVersion:
186            return (1, pin)
187
188    if all(sort_key(p)[0] == 0 for p in pins_list):
189        return max(pins_list, key=lambda p: Version(p.split("@sha256:", 1)[0]))
190    return max(pins_list)
191
192
193def pick_display_ref_for_pin(refs_by_pin: Dict[str, List[str]], pin_key: str) -> str:
194    """
195    Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).
196
197    :param refs_by_pin: Mapa pin_key -> původní řetězce image.
198    :param pin_key: Normalizovaný pin.
199    :return: Původní řetězec image.
200    """
201    candidates = refs_by_pin.get(pin_key, [])
202    if not candidates:
203        return pin_key
204    return candidates[0]
205
206
207def build_yaml() -> YAML:
208    """
209    Vytvoří nakonfigurovanou instanci ``ruamel.yaml.YAML`` pro čtení/zápis compose se zachováním uvozovek.
210
211    :return: YAML loader/dumper vhodný pro úpravy compose souborů.
212    """
213    y = YAML()
214    y.preserve_quotes = True
215    y.indent(mapping=2, sequence=4, offset=2)
216    return y
217
218
219@dataclass
220class DockerfileScan:
221    """
222    Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis ``FROM``.
223    """
224
225    pins_by_repo: DefaultDict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))
226    refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = field(
227        default_factory=lambda: defaultdict(lambda: defaultdict(list))
228    )
229    line_info: List[Dict[str, Any]] = field(default_factory=list)
230
231
232def scan_dockerfile(text: str) -> DockerfileScan:
233    """
234    Projde text Dockerfile a sejme externí base image (mimo ``scratch`` a odkazy na již definované stage).
235
236    :param text: Celý obsah Dockerfile.
237    :return: Struktura s množinami pinů, mapou původních řetězců a ``line_info`` pro ``FROM_IMG_RE``.
238    """
239    scan = DockerfileScan()
240    stages: Set[str] = set()
241    for lineno, line in enumerate(text.splitlines(), start=1):
242        parsed = parse_from_instruction(line)
243        if parsed is None:
244            continue
245        image_ref, as_name = parsed
246        if image_ref.lower() == "scratch":
247            if as_name:
248                stages.add(as_name)
249            continue
250        if image_ref in stages:
251            if as_name:
252                stages.add(as_name)
253            continue
254        repo_key, pin_key = repository_and_pin(image_ref)
255        scan.pins_by_repo[repo_key].add(pin_key)
256        scan.refs_by_repo_pin[repo_key][pin_key].append(image_ref)
257        m = FROM_IMG_RE.match(line.rstrip("\n"))
258        if m:
259            scan.line_info.append(
260                {
261                    "lineno": lineno,
262                    "line": line,
263                    "repo_key": repo_key,
264                    "pin_key": pin_key,
265                    "prefix": m.group(1) + (m.group(2) or ""),
266                    "as_part": m.group(4) or "",
267                    "suffix": m.group(6) or "",
268                }
269            )
270        if as_name:
271            stages.add(as_name)
272    return scan
273
274
275def fix_dockerfile_text(text: str, fix: bool, path: Path, _verbose: bool) -> Tuple[str, List[str], bool]:
276    """
277    Zkontroluje nebo opraví konflikty pinů u stejného ``repo_key`` v jednom Dockerfile.
278
279    :param text: Původní obsah souboru.
280    :param fix: Pokud True, přepíše ``FROM`` řádky na kanonický pin; pokud False, jen hlásí chyby.
281    :param path: Cesta pro hlášky v chybách.
282    :param _verbose: Rezervováno (aktuálně nepoužito).
283    :return: Trojice ``(nový_text, seznam_chyb, byl_upraven)``.
284    """
285    errors: List[str] = []
286    scan = scan_dockerfile(text)
287    for repo_key, pins in scan.pins_by_repo.items():
288        if len(pins) <= 1:
289            continue
290        if not fix:
291            detail_parts = []
292            for pin in sorted(pins):
293                detail_parts.append(f"  - {repr(pin) if pin else '(bez explicitního tagu)'}")
294            errors.append(
295                f"{path}: stejný image {repo_key!r} má v jednom Dockerfile různé tagy/digesty:\n"
296                + "\n".join(detail_parts)
297            )
298            log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
299            continue
300        win = choose_canonical_pin(set(pins))
301        new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
302        log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na pin {win!r} (→ {new_ref!r})")
303
304    if errors and not fix:
305        return text, errors, False
306
307    if not fix:
308        return text, [], False
309
310    lines = text.splitlines(keepends=True)
311    modified = False
312    for info in scan.line_info:
313        repo_key = info["repo_key"]
314        pins = scan.pins_by_repo[repo_key]
315        if len(pins) <= 1:
316            continue
317        win = choose_canonical_pin(set(pins))
318        if info["pin_key"] == win:
319            continue
320        new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
321        lineno = info["lineno"]
322        idx = lineno - 1
323        old = lines[idx]
324        m = FROM_IMG_RE.match(old.rstrip("\r\n"))
325        if not m:
326            continue
327        nl = "\n" if old.endswith("\n") else ""
328        if old.endswith("\r\n"):
329            nl = "\r\n"
330        new_line = m.group(1) + (m.group(2) or "") + new_ref + (m.group(4) or "") + (m.group(6) or "") + nl
331        if new_line != old:
332            lines[idx] = new_line
333            modified = True
334    return "".join(lines), [], modified
335
336
337def iter_compose_service_images(data: Any) -> Iterable[Tuple[str, str, Any]]:
338    """
339    Projde ``services`` v načteném compose a vrátí služby s řetězcovým ``image``.
340
341    :param data: Kořenový dict compose (např. z ``yaml.load``), očekává se klíč ``services``.
342    :yield: ``(název_služby, hodnota_image, spec_dict)`` — ``spec_dict`` umožní při ``--fix`` změnit ``image`` na místě.
343    :return: Iterátor trojic (viz ``:yield:``); při neplatném ``data`` nebo ``services`` nic nevygeneruje.
344    """
345    if not isinstance(data, dict):
346        return
347    services = data.get("services")
348    if not isinstance(services, dict):
349        return
350    for svc_name, spec in services.items():
351        if not isinstance(spec, dict):
352            continue
353        img = spec.get("image")
354        if isinstance(img, str):
355            yield svc_name, img, spec
356
357
358def scan_compose_intra(
359    data: Any, _path: Path
360) -> Tuple[DefaultDict[str, Set[str]], DefaultDict[str, DefaultDict[str, List[str]]], int]:
361    """
362    Shromáždí literální ``image`` z jednoho compose souboru pro intra-file kontrolu shody pinů.
363
364    :param data: Parsovaný obsah compose.
365    :param _path: Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).
366    :return: ``(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image)``.
367    """
368    pins_by_repo: DefaultDict[str, Set[str]] = defaultdict(set)
369    refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = defaultdict(lambda: defaultdict(list))
370    skipped_nonliteral = 0
371    for _svc_name, img, _spec in iter_compose_service_images(data):
372        if not is_literal_image_ref(img):
373            skipped_nonliteral += 1
374            continue
375        repo_key, pin_key = repository_and_pin(img)
376        pins_by_repo[repo_key].add(pin_key)
377        refs_by_repo_pin[repo_key][pin_key].append(img)
378    return pins_by_repo, refs_by_repo_pin, skipped_nonliteral
379
380
381def apply_compose_intra_fix(
382    data: Any,
383    path: Path,
384    pins_by_repo: DefaultDict[str, Set[str]],
385    refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]],
386    fix: bool,
387    _verbose: bool,
388) -> Tuple[List[str], bool]:
389    """
390    U jednoho compose sjednotí literální ``image`` se stejným ``repo_key`` na jeden kanonický pin.
391
392    :param data: Parsovaný strom compose (mění se in-place při ``fix=True``).
393    :param path: Cesta k souboru pro chybové hlášky.
394    :param pins_by_repo: Agregace pinů ze :func:`scan_compose_intra`.
395    :param refs_by_repo_pin: Mapa původních řetězců image podle pinu.
396    :param fix: Zapnout zápis oprav do ``data``.
397    :param _verbose: Rezervováno (nepoužito).
398    :return: ``(seznam_chyb, byl_proveden_zápis_do_dat)``.
399    """
400    errors: List[str] = []
401    modified = False
402    for repo_key, pins in pins_by_repo.items():
403        if len(pins) <= 1:
404            continue
405        if not fix:
406            errors.append(f"{path}: stejný image {repo_key!r} má v compose různé tagy/digesty: {sorted(pins)}")
407            log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
408            continue
409        win = choose_canonical_pin(set(pins))
410        new_ref = pick_display_ref_for_pin({k: list(v) for k, v in refs_by_repo_pin[repo_key].items()}, win)
411        log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na {new_ref!r}")
412        for _svc_name, img, spec in iter_compose_service_images(data):
413            if not is_literal_image_ref(img):
414                continue
415            rk, pk = repository_and_pin(img)
416            if rk != repo_key:
417                continue
418            if pk != win:
419                spec["image"] = new_ref
420                modified = True
421    return errors, modified
422
423
424def extract_prod_literal_map(data: Any) -> Dict[str, str]:
425    """
426    Z produkčního compose vytvoří mapu ``repo_key`` → literální řetězec ``image`` (první výskyt).
427
428    :param data: Parsovaný ``docker-compose.yml`` (nebo ekvivalent).
429    :return: Slovník pro cross-file srovnání se spotřebitelskými compose.
430    """
431    out: Dict[str, str] = {}
432    for _svc, img, _spec in iter_compose_service_images(data):
433        if not is_literal_image_ref(img):
434            continue
435        repo_key, _pin = repository_and_pin(img)
436        if repo_key not in out:
437            out[repo_key] = img.strip()
438    return out
439
440
441def extract_dockerfile_repo_map(root: Path) -> Dict[str, str]:
442    """
443    Vytvoří mapu ``repo_key`` -> display reference ze zdrojových lokálních Dockerfile.
444
445    Používá se pro případy, kdy konkrétní compose repo nemá být porovnáváno proti
446    produkčnímu compose literálu, ale proti základnímu ``FROM`` v lokálním Dockerfile.
447
448    :param root: Kořen repozitáře.
449    :return: Mapa ``repo_key`` -> referenční image string.
450    """
451    out: Dict[str, str] = {}
452    for repo_key, rel_path in COMPOSE_DOCKERFILE_SOURCES.items():
453        path = root / rel_path
454        if not path.is_file():
455            continue
456        scan = scan_dockerfile(path.read_text(encoding="utf-8"))
457        pins = scan.pins_by_repo.get(repo_key, set())
458        if not pins:
459            continue
460        win = choose_canonical_pin(set(pins))
461        out[repo_key] = pick_display_ref_for_pin(
462            {k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()},
463            win,
464        )
465    return out
466
467
468def apply_compose_cross_fix(
469    consumer_data: Any,
470    consumer_path: Path,
471    prod_map: Dict[str, str],
472    dockerfile_source_map: Dict[str, str],
473    fix: bool,
474    verbose: bool,
475) -> Tuple[List[str], bool]:
476    """
477    Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.
478
479    Repozitáře bez produkčního literálu jsou povolené jen tehdy, když jsou
480    explicitně uvedené v ``COMPOSE_CROSS_FILE_WHITELIST`` pro daný consumer
481    compose, nebo když mají explicitní zdroj pravdy v ``COMPOSE_DOCKERFILE_SOURCES``.
482    Ostatní případy jsou chyba i bez ``--verbose``.
483
484    :param consumer_data: Parsovaný compose spotřebitele (mění se in-place při ``fix=True``).
485    :param consumer_path: Cesta k souboru spotřebitele (logy a chyby).
486    :param prod_map: Výstup :func:`extract_prod_literal_map` z ``docker-compose.yml``.
487    :param dockerfile_source_map: Mapa ``repo_key`` -> referenční image z lokálního Dockerfile.
488    :param fix: Pokud True, přepíše ``image`` na hodnotu z prod při nesouladu pinu.
489    :param verbose: Zapíná podrobné SKIP logy u neliterálních image (přes ``log_msg``).
490    :return: ``(seznam_chyb při fix=False, byl_proveden_zápis)``.
491    """
492    errors: List[str] = []
493    modified = False
494    seen_missing_prod: Set[Tuple[str, str]] = set()
495    whitelist = COMPOSE_CROSS_FILE_WHITELIST.get(consumer_path.name, set())
496
497    for svc_name, img, spec in iter_compose_service_images(consumer_data):
498        if not is_literal_image_ref(img):
499            if verbose:
500                log_msg(
501                    f"SKIP: {consumer_path} služba {svc_name!r}: neliterální image (přeskočeno)",
502                    verbose_only=True,
503                    verbose=verbose,
504                )
505            continue
506        repo_key, pin_key = repository_and_pin(img)
507        if repo_key in prod_map:
508            prod_ref = prod_map[repo_key]
509            _, prod_pin = repository_and_pin(prod_ref)
510            if pin_key != prod_pin:
511                msg = f"{consumer_path}: služba {svc_name!r}: {img!r} ≠ prod {prod_ref!r} " f"(repo {repo_key!r})"
512                if not fix:
513                    errors.append(msg)
514                    log_msg(f"ERROR: {msg}")
515                else:
516                    log_msg(f"FIX: {consumer_path} {svc_name!r}: {img!r}{prod_ref!r} (prod)")
517                    spec["image"] = prod_ref
518                    modified = True
519        elif repo_key in dockerfile_source_map:
520            source_ref = dockerfile_source_map[repo_key]
521            _, source_pin = repository_and_pin(source_ref)
522            if pin_key != source_pin:
523                msg = (
524                    f"{consumer_path}: služba {svc_name!r}: {img!r} != Dockerfile source {source_ref!r} "
525                    f"(repo {repo_key!r})"
526                )
527                if not fix:
528                    errors.append(msg)
529                    log_msg(f"ERROR: {msg}")
530                else:
531                    log_msg(f"FIX: {consumer_path} {svc_name!r}: {img!r}{source_ref!r} (Dockerfile)")
532                    spec["image"] = source_ref
533                    modified = True
534        else:
535            key = (repo_key, str(consumer_path))
536            if key in seen_missing_prod:
537                continue
538            seen_missing_prod.add(key)
539            if repo_key in whitelist:
540                log_msg(
541                    f"NOTE: {consumer_path}: repo {repo_key!r} má literál {img!r}, "
542                    "ale je explicitně povolený jako dev-only výjimka mimo produkční compose."
543                )
544                continue
545            msg = (
546                f"{consumer_path}: repo {repo_key!r} má literál {img!r}, "
547                f"ale {COMPOSE_PRODUCTION[0]} neobsahuje srovnatelný literál pro stejný repozitář "
548                "(cross-file srovnání vyžaduje whitelist nebo produkční literál)"
549            )
550            errors.append(msg)
551            log_msg(f"ERROR: {msg}")
552    return errors, modified
553
554
555def process_dockerfiles(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
556    """
557    Zpracuje všechny Dockerfile z konstanty ``DOCKERFILE_PATHS`` pod kořenem ``root``.
558
559    :param root: Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).
560    :param fix: Předáno do :func:`fix_dockerfile_text`; při True zapíše změny na disk.
561    :param verbose: Předáno dál pro případné budoucí logování (část volání zachována kvůli API).
562    :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
563    """
564    errors: List[str] = []
565    modified_paths: List[Path] = []
566    for rel in DOCKERFILE_PATHS:
567        path = root / rel
568        if not path.is_file():
569            continue
570        text = path.read_text(encoding="utf-8")
571        new_text, errs, mod = fix_dockerfile_text(text, fix, path, verbose)
572        errors.extend(errs)
573        if mod and fix:
574            path.write_text(new_text, encoding="utf-8")
575            modified_paths.append(path)
576            log_msg(f"Upraven soubor {path}")
577    return errors, modified_paths
578
579
580def process_compose_files(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
581    """
582    Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.
583
584    Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).
585
586    :param root: Kořen repozitáře.
587    :param fix: Zapne zápis YAML a cross-file úpravy spotřebitelů.
588    :param verbose: Statistiky přeskočených neliterálních referencí a SKIP u cross-file.
589    :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
590    """
591    yaml = build_yaml()
592    errors: List[str] = []
593    modified_paths: List[Path] = []
594    loaded: Dict[Path, Any] = {}
595    originals: Dict[Path, str] = {}
596
597    all_compose = [root / p for p in COMPOSE_PRODUCTION + COMPOSE_CONSUMERS if (root / p).is_file()]
598
599    for path in all_compose:
600        raw = path.read_text(encoding="utf-8")
601        originals[path] = raw
602        loaded[path] = yaml.load(raw) or {}
603
604    compose_dirty: Set[Path] = set()
605
606    for path in all_compose:
607        data = loaded[path]
608        pins_by_repo, refs_by_repo, skipped = scan_compose_intra(data, path)
609        if verbose and skipped:
610            log_msg(
611                f"INFO: {path}: přeskočeno {skipped} neliterálních image referencí",
612                verbose_only=True,
613                verbose=verbose,
614            )
615        errs, intra_mod = apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo, fix, verbose)
616        errors.extend(errs)
617        if intra_mod:
618            compose_dirty.add(path)
619
620    prod_path = root / COMPOSE_PRODUCTION[0]
621    prod_map: Dict[str, str] = {}
622    dockerfile_source_map = extract_dockerfile_repo_map(root)
623    if prod_path in loaded:
624        prod_map = extract_prod_literal_map(loaded[prod_path])
625
626    for rel in COMPOSE_CONSUMERS:
627        path = root / rel
628        if path not in loaded:
629            continue
630        errs, cross_mod = apply_compose_cross_fix(loaded[path], path, prod_map, dockerfile_source_map, fix, verbose)
631        errors.extend(errs)
632        if cross_mod:
633            compose_dirty.add(path)
634
635    if fix:
636        for path in sorted(compose_dirty, key=str):
637            out = io.StringIO()
638            yaml.dump(loaded[path], out)
639            path.write_text(out.getvalue(), encoding="utf-8")
640            modified_paths.append(path)
641            log_msg(f"Upraven soubor {path}")
642
643    return errors, modified_paths
644
645
646def main(argv: Optional[List[str]] = None) -> int:
647    """
648    Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.
649
650    :param argv: Argumenty příkazové řádky (bez ``sys.argv[0]``); None znamená ``sys.argv[1:]``.
651    :return: 0 při úspěchu; 1 při nalezených chybách (bez ``--fix``) nebo po úpravě souborů (s ``--fix``, aby pre-commit znovu zstageoval).
652    """
653    parser = argparse.ArgumentParser(description="Kontrola shody container image pinů (Dockerfile + compose).")
654    parser.add_argument("files", nargs="*", help="Ignorováno — vždy se kontrolují pevné cesty v repozitáři.")
655    parser.add_argument(
656        "--fix",
657        action="store_true",
658        help="Opravit soubory na místě (intra + cross na spotřebitele).",
659    )
660    parser.add_argument(
661        "--verbose",
662        action="store_true",
663        help="Podrobnější výstup.",
664    )
665    args = parser.parse_args(argv)
666
667    verbose = bool(args.verbose)
668    root = Path.cwd()
669
670    all_errors: List[str] = []
671    modified: List[Path] = []
672
673    de, md = process_dockerfiles(root, args.fix, verbose)
674    all_errors.extend(de)
675    modified.extend(md)
676
677    ce, mc = process_compose_files(root, args.fix, verbose)
678    all_errors.extend(ce)
679    modified.extend(mc)
680
681    for err in all_errors:
682        if not err.startswith(LOG_PREFIX):
683            print(err, file=sys.stderr)
684
685    if all_errors:
686        return 1
687
688    if args.fix and modified:
689        log_msg(f"HOTOVÁ ÚPRAVA: {len(modified)} soubor(ů); znovu přidej do commitu.")
690        return 1
691
692    return 0
693
694
695if __name__ == "__main__":
696    sys.exit(main())