Skript check_container_image_reference_parity.py

Automaticky generovaná dokumentace skriptu scripts/check_container_image_reference_parity.py.

Přehled modulu

Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.

  • V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).

  • docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při –fix srovnají na prod.

Výstup pro CI: řádky na stderr s prefixem [image-ref-parity].

Třídy

class DockerfileScan

Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis FROM.

Funkce

log_msg(message)

Vypíše jeden řádek na stderr s prefixem pro grep v CI.

Parametry:
  • message – Text bez prefixu.

  • verbose_only – Pokud True, vypíše jen při verbose režimu.

  • verbose – Aktuální verbose příznak.

is_pre_commit()

Zjistí, zda skript běží jako pre-commit hook (pro implicitní verbose).

Vrací:

True, pokud je v prostředí nastavena proměnná PRE_COMMIT.

is_literal_image_ref(value)

Určí, zda hodnota image: v compose lze použít pro porovnání pinů (bez rozšíření proměnných).

Parametry:

value – Řetězec z pole image u služby.

Vrací:

False pro prázdný řetězec, obsah ${ / $$; jinak True.

_strip_platform_flags(rest)

Odstraní z řetězce za FROM všechny úvodní příznaky --platform=....

Parametry:

rest – Část řádku za FROM před případným AS.

Vrací:

Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.

parse_from_instruction(line)

Parsuje řádek Dockerfile začínající FROM na odkaz na image a volitelné jméno stage.

Parametry:

line – Jeden řádek souboru (může obsahovat koncový komentář #).

Vrací:

(image_ref, as_name) nebo None, pokud řádek není platný FROM.

repository_and_pin(image_ref)

Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).

Parametry:

image_ref – Řetězec image bez části AS (např. ghcr.io/foo:1.0).

Vrací:

(repo_key, pin_key) — obě části malými písmeny pro porovnání.

choose_canonical_pin(pins)

Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).

Parametry:

pins – Množina pin_key.

Vrací:

Vítězný pin_key.

pick_display_ref_for_pin(refs_by_pin, pin_key)

Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).

Parametry:
  • refs_by_pin – Mapa pin_key -> původní řetězce image.

  • pin_key – Normalizovaný pin.

Vrací:

Původní řetězec image.

build_yaml()

Vytvoří nakonfigurovanou instanci ruamel.yaml.YAML pro čtení/zápis compose se zachováním uvozovek.

Vrací:

YAML loader/dumper vhodný pro úpravy compose souborů.

scan_dockerfile(text)

Projde text Dockerfile a sejme externí base image (mimo scratch a odkazy na již definované stage).

Parametry:

text – Celý obsah Dockerfile.

Vrací:

Struktura s množinami pinů, mapou původních řetězců a line_info pro FROM_IMG_RE.

fix_dockerfile_text(text, fix, path, _verbose)

Zkontroluje nebo opraví konflikty pinů u stejného repo_key v jednom Dockerfile.

Parametry:
  • text – Původní obsah souboru.

  • fix – Pokud True, přepíše FROM řádky na kanonický pin; pokud False, jen hlásí chyby.

  • path – Cesta pro hlášky v chybách.

  • _verbose – Rezervováno (aktuálně nepoužito).

Vrací:

Trojice (nový_text, seznam_chyb, byl_upraven).

iter_compose_service_images(data)

Projde services v načteném compose a vrátí služby s řetězcovým image.

Parametry:

data – Kořenový dict compose (např. z yaml.load), očekává se klíč services.

Yield:

(název_služby, hodnota_image, spec_dict)spec_dict umožní při --fix změnit image na místě.

Vrací:

Iterátor trojic (viz :yield:); při neplatném data nebo services nic nevygeneruje.

scan_compose_intra(data, _path)

Shromáždí literální image z jednoho compose souboru pro intra-file kontrolu shody pinů.

Parametry:
  • data – Parsovaný obsah compose.

  • _path – Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).

Vrací:

(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image).

apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo_pin, fix, _verbose)

U jednoho compose sjednotí literální image se stejným repo_key na jeden kanonický pin.

Parametry:
  • data – Parsovaný strom compose (mění se in-place při fix=True).

  • path – Cesta k souboru pro chybové hlášky.

  • pins_by_repo – Agregace pinů ze scan_compose_intra().

  • refs_by_repo_pin – Mapa původních řetězců image podle pinu.

  • fix – Zapnout zápis oprav do data.

  • _verbose – Rezervováno (nepoužito).

Vrací:

(seznam_chyb, byl_proveden_zápis_do_dat).

extract_prod_literal_map(data)

Z produkčního compose vytvoří mapu repo_key → literální řetězec image (první výskyt).

Parametry:

data – Parsovaný docker-compose.yml (nebo ekvivalent).

Vrací:

Slovník pro cross-file srovnání se spotřebitelskými compose.

apply_compose_cross_fix(consumer_data, consumer_path, prod_map, fix, verbose)

Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.

Parametry:
  • consumer_data – Parsovaný compose spotřebitele (mění se in-place při fix=True).

  • consumer_path – Cesta k souboru spotřebitele (logy a chyby).

  • prod_map – Výstup extract_prod_literal_map() z docker-compose.yml.

  • fix – Pokud True, přepíše image na hodnotu z prod při nesouladu pinu.

  • verbose – Zapíná podrobné SKIP logy u neliterálních image (přes log_msg).

Vrací:

(seznam_chyb při fix=False, byl_proveden_zápis).

process_dockerfiles(root, fix, verbose)

Zpracuje všechny Dockerfile z konstanty DOCKERFILE_PATHS pod kořenem root.

Parametry:
  • root – Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).

  • fix – Předáno do fix_dockerfile_text(); při True zapíše změny na disk.

  • verbose – Předáno dál pro případné budoucí logování (část volání zachována kvůli API).

Vrací:

(souhrnné_chyby, seznam_cest_upravených_souborů).

process_compose_files(root, fix, verbose)

Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.

Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).

Parametry:
  • root – Kořen repozitáře.

  • fix – Zapne zápis YAML a cross-file úpravy spotřebitelů.

  • verbose – Statistiky přeskočených neliterálních referencí a SKIP u cross-file.

Vrací:

(souhrnné_chyby, seznam_cest_upravených_souborů).

main(argv)

Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.

Parametry:

argv – Argumenty příkazové řádky (bez sys.argv[0]); None znamená sys.argv[1:].

Vrací:

0 při úspěchu; 1 při nalezených chybách (bez --fix) nebo po úpravě souborů (s --fix, aby pre-commit znovu zstageoval).

Zdrojový kód

  1#!/usr/bin/env python3
  2"""
  3Kontrola a úprava shody pinů (tag/digest) u stejného container image v Dockerfileech a docker-compose.
  4
  5- V jednom souboru: stejný image repozitář nesmí mít různé tagy/digesty (FROM nebo image:).
  6- docker-compose.yml je zdroj pravdy pro literály; spotřebitelské compose soubory se při --fix srovnají na prod.
  7
  8Výstup pro CI: řádky na stderr s prefixem ``[image-ref-parity]``.
  9"""
 10
 11from __future__ import annotations
 12
 13import argparse
 14import io
 15import os
 16import re
 17import sys
 18from collections import defaultdict
 19from dataclasses import dataclass, field
 20from pathlib import Path
 21from typing import Any, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
 22
 23from packaging.version import InvalidVersion, Version
 24from ruamel.yaml import YAML
 25
 26LOG_PREFIX = "[image-ref-parity]"
 27
 28# Relativní cesty od kořene repozitáře (pre-commit spouští z rootu).
 29DOCKERFILE_PATHS = [
 30    "Dockerfile",
 31    "Dockerfile-DEV",
 32    "Dockerfile-DB",
 33    "fedora/Dockerfile",
 34    "redis/Dockerfile",
 35    "proxy/Dockerfile",
 36]
 37
 38COMPOSE_PRODUCTION = ["docker-compose.yml"]
 39COMPOSE_CONSUMERS = [
 40    "docker-compose-dev-local-db-all-containers.yml",
 41    "docker-compose-dev-local-db.yml",
 42    "docker-compose-test.yml",
 43    "git_docker-compose.yml",
 44]
 45
 46FROM_LINE = re.compile(r"^FROM\s+(?P<rest>.+?)\s*(?:#.*)?$", re.IGNORECASE)
 47AS_SUFFIX = re.compile(r"\s+AS\s+(\S+)\s*$", re.IGNORECASE)
 48# FROM s nahraditelným image tokenem (image bez komentáře na konci řádku zvlášť).
 49FROM_IMG_RE = re.compile(
 50    r"^(FROM\s+)((?:--platform=\S+\s+)+)?(\S+)((\s+AS\s+\S+)?)(\s*(?:#.*)?)$",
 51    re.IGNORECASE,
 52)
 53
 54
 55def log_msg(message: str, *, verbose_only: bool = False, verbose: bool = False) -> None:
 56    """
 57    Vypíše jeden řádek na stderr s prefixem pro grep v CI.
 58
 59    :param message: Text bez prefixu.
 60    :param verbose_only: Pokud True, vypíše jen při verbose režimu.
 61    :param verbose: Aktuální verbose příznak.
 62    """
 63    if verbose_only and not verbose:
 64        return
 65    print(f"{LOG_PREFIX} {message}", file=sys.stderr)
 66
 67
 68def is_pre_commit() -> bool:
 69    """
 70    Zjistí, zda skript běží jako pre-commit hook (pro implicitní verbose).
 71
 72    :return: True, pokud je v prostředí nastavena proměnná ``PRE_COMMIT``.
 73    """
 74    return bool(os.environ.get("PRE_COMMIT"))
 75
 76
 77def is_literal_image_ref(value: str) -> bool:
 78    """
 79    Určí, zda hodnota ``image:`` v compose lze použít pro porovnání pinů (bez rozšíření proměnných).
 80
 81    :param value: Řetězec z pole ``image`` u služby.
 82    :return: False pro prázdný řetězec, obsah ``${`` / ``$$``; jinak True.
 83    """
 84    s = value.strip()
 85    if not s:
 86        return False
 87    if "${" in s or "$$" in s:
 88        return False
 89    return True
 90
 91
 92def _strip_platform_flags(rest: str) -> str:
 93    """
 94    Odstraní z řetězce za ``FROM`` všechny úvodní příznaky ``--platform=...``.
 95
 96    :param rest: Část řádku za ``FROM`` před případným ``AS``.
 97    :return: Zbytek začínající odkazem na image, nebo prázdný řetězec při neplatném tvaru.
 98    """
 99    current = rest.strip()
100    while current.startswith("--"):
101        space_idx = current.find(" ")
102        if space_idx == -1:
103            return ""
104        current = current[space_idx + 1 :].strip()
105    return current
106
107
108def parse_from_instruction(line: str) -> Optional[Tuple[str, Optional[str]]]:
109    """
110    Parsuje řádek Dockerfile začínající ``FROM`` na odkaz na image a volitelné jméno stage.
111
112    :param line: Jeden řádek souboru (může obsahovat koncový komentář ``#``).
113    :return: ``(image_ref, as_name)`` nebo None, pokud řádek není platný ``FROM``.
114    """
115    stripped = line.strip()
116    m = FROM_LINE.match(stripped)
117    if not m:
118        return None
119    rest = m.group("rest").strip()
120    as_name: Optional[str] = None
121    as_m = AS_SUFFIX.search(rest)
122    if as_m:
123        as_name = as_m.group(1)
124        rest = rest[: as_m.start()].strip()
125    rest = _strip_platform_flags(rest)
126    if not rest:
127        return None
128    return rest, as_name
129
130
131def repository_and_pin(image_ref: str) -> Tuple[str, str]:
132    """
133    Rozdělí odkaz na container image na klíč repozitáře a normalizovaný pin (tag a/nebo digest).
134
135    :param image_ref: Řetězec image bez části ``AS`` (např. ``ghcr.io/foo:1.0``).
136    :return: ``(repo_key, pin_key)`` — obě části malými písmeny pro porovnání.
137    """
138    ref = image_ref.strip()
139    lower = ref.lower()
140    digest_idx = lower.find("@sha256:")
141    digest_suffix = ""
142    if digest_idx != -1:
143        digest_suffix = ref[digest_idx:]
144        ref = ref[:digest_idx]
145
146    last_slash = ref.rfind("/")
147    tail = ref[last_slash + 1 :]
148    if ":" in tail:
149        repo, _, tag = ref.rpartition(":")
150    else:
151        repo, tag = ref, ""
152
153    pin = (tag + digest_suffix).lower()
154    return repo.lower(), pin
155
156
157def choose_canonical_pin(pins: Set[str]) -> str:
158    """
159    Vybere jeden kanonický pin z množiny normalizovaných pinů (malá písmena).
160
161    :param pins: Množina pin_key.
162    :return: Vítězný pin_key.
163    """
164    if len(pins) == 1:
165        return next(iter(pins))
166    pins_list = list(pins)
167
168    def sort_key(pin: str) -> Tuple[int, Any]:
169        tag_part = pin.split("@sha256:", 1)[0]
170        try:
171            return (0, Version(tag_part))
172        except InvalidVersion:
173            return (1, pin)
174
175    if all(sort_key(p)[0] == 0 for p in pins_list):
176        return max(pins_list, key=lambda p: Version(p.split("@sha256:", 1)[0]))
177    return max(pins_list)
178
179
180def pick_display_ref_for_pin(refs_by_pin: Dict[str, List[str]], pin_key: str) -> str:
181    """
182    Vrátí reprezentaci image pro zápis (zachovává casing z jednoho výskytu).
183
184    :param refs_by_pin: Mapa pin_key -> původní řetězce image.
185    :param pin_key: Normalizovaný pin.
186    :return: Původní řetězec image.
187    """
188    candidates = refs_by_pin.get(pin_key, [])
189    if not candidates:
190        return pin_key
191    return candidates[0]
192
193
194def build_yaml() -> YAML:
195    """
196    Vytvoří nakonfigurovanou instanci ``ruamel.yaml.YAML`` pro čtení/zápis compose se zachováním uvozovek.
197
198    :return: YAML loader/dumper vhodný pro úpravy compose souborů.
199    """
200    y = YAML()
201    y.preserve_quotes = True
202    y.indent(mapping=2, sequence=4, offset=2)
203    return y
204
205
206@dataclass
207class DockerfileScan:
208    """
209    Výsledek průchodu Dockerfile: agregace pinů podle repozitáře a metadata řádků pro přepis ``FROM``.
210    """
211
212    pins_by_repo: DefaultDict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))
213    refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = field(
214        default_factory=lambda: defaultdict(lambda: defaultdict(list))
215    )
216    line_info: List[Dict[str, Any]] = field(default_factory=list)
217
218
219def scan_dockerfile(text: str) -> DockerfileScan:
220    """
221    Projde text Dockerfile a sejme externí base image (mimo ``scratch`` a odkazy na již definované stage).
222
223    :param text: Celý obsah Dockerfile.
224    :return: Struktura s množinami pinů, mapou původních řetězců a ``line_info`` pro ``FROM_IMG_RE``.
225    """
226    scan = DockerfileScan()
227    stages: Set[str] = set()
228    for lineno, line in enumerate(text.splitlines(), start=1):
229        parsed = parse_from_instruction(line)
230        if parsed is None:
231            continue
232        image_ref, as_name = parsed
233        if image_ref.lower() == "scratch":
234            if as_name:
235                stages.add(as_name)
236            continue
237        if image_ref in stages:
238            if as_name:
239                stages.add(as_name)
240            continue
241        repo_key, pin_key = repository_and_pin(image_ref)
242        scan.pins_by_repo[repo_key].add(pin_key)
243        scan.refs_by_repo_pin[repo_key][pin_key].append(image_ref)
244        m = FROM_IMG_RE.match(line.rstrip("\n"))
245        if m:
246            scan.line_info.append(
247                {
248                    "lineno": lineno,
249                    "line": line,
250                    "repo_key": repo_key,
251                    "pin_key": pin_key,
252                    "prefix": m.group(1) + (m.group(2) or ""),
253                    "as_part": m.group(4) or "",
254                    "suffix": m.group(6) or "",
255                }
256            )
257        if as_name:
258            stages.add(as_name)
259    return scan
260
261
262def fix_dockerfile_text(text: str, fix: bool, path: Path, _verbose: bool) -> Tuple[str, List[str], bool]:
263    """
264    Zkontroluje nebo opraví konflikty pinů u stejného ``repo_key`` v jednom Dockerfile.
265
266    :param text: Původní obsah souboru.
267    :param fix: Pokud True, přepíše ``FROM`` řádky na kanonický pin; pokud False, jen hlásí chyby.
268    :param path: Cesta pro hlášky v chybách.
269    :param _verbose: Rezervováno (aktuálně nepoužito).
270    :return: Trojice ``(nový_text, seznam_chyb, byl_upraven)``.
271    """
272    errors: List[str] = []
273    scan = scan_dockerfile(text)
274    for repo_key, pins in scan.pins_by_repo.items():
275        if len(pins) <= 1:
276            continue
277        if not fix:
278            detail_parts = []
279            for pin in sorted(pins):
280                detail_parts.append(f"  - {repr(pin) if pin else '(bez explicitního tagu)'}")
281            errors.append(
282                f"{path}: stejný image {repo_key!r} má v jednom Dockerfile různé tagy/digesty:\n"
283                + "\n".join(detail_parts)
284            )
285            log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
286            continue
287        win = choose_canonical_pin(set(pins))
288        new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
289        log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na pin {win!r} (→ {new_ref!r})")
290
291    if errors and not fix:
292        return text, errors, False
293
294    if not fix:
295        return text, [], False
296
297    lines = text.splitlines(keepends=True)
298    modified = False
299    for info in scan.line_info:
300        repo_key = info["repo_key"]
301        pins = scan.pins_by_repo[repo_key]
302        if len(pins) <= 1:
303            continue
304        win = choose_canonical_pin(set(pins))
305        if info["pin_key"] == win:
306            continue
307        new_ref = pick_display_ref_for_pin({k: list(v) for k, v in scan.refs_by_repo_pin[repo_key].items()}, win)
308        lineno = info["lineno"]
309        idx = lineno - 1
310        old = lines[idx]
311        m = FROM_IMG_RE.match(old.rstrip("\r\n"))
312        if not m:
313            continue
314        nl = "\n" if old.endswith("\n") else ""
315        if old.endswith("\r\n"):
316            nl = "\r\n"
317        new_line = m.group(1) + (m.group(2) or "") + new_ref + (m.group(4) or "") + (m.group(6) or "") + nl
318        if new_line != old:
319            lines[idx] = new_line
320            modified = True
321    return "".join(lines), [], modified
322
323
324def iter_compose_service_images(data: Any) -> Iterable[Tuple[str, str, Any]]:
325    """
326    Projde ``services`` v načteném compose a vrátí služby s řetězcovým ``image``.
327
328    :param data: Kořenový dict compose (např. z ``yaml.load``), očekává se klíč ``services``.
329    :yield: ``(název_služby, hodnota_image, spec_dict)`` — ``spec_dict`` umožní při ``--fix`` změnit ``image`` na místě.
330    :return: Iterátor trojic (viz ``:yield:``); při neplatném ``data`` nebo ``services`` nic nevygeneruje.
331    """
332    if not isinstance(data, dict):
333        return
334    services = data.get("services")
335    if not isinstance(services, dict):
336        return
337    for svc_name, spec in services.items():
338        if not isinstance(spec, dict):
339            continue
340        img = spec.get("image")
341        if isinstance(img, str):
342            yield svc_name, img, spec
343
344
345def scan_compose_intra(
346    data: Any, _path: Path
347) -> Tuple[DefaultDict[str, Set[str]], DefaultDict[str, DefaultDict[str, List[str]]], int]:
348    """
349    Shromáždí literální ``image`` z jednoho compose souboru pro intra-file kontrolu shody pinů.
350
351    :param data: Parsovaný obsah compose.
352    :param _path: Cesta k souboru (pro rozhraní; aktuálně se nepoužívá).
353    :return: ``(pins_by_repo, refs_by_repo_pin, počet_přeskočených_neliterálních_image)``.
354    """
355    pins_by_repo: DefaultDict[str, Set[str]] = defaultdict(set)
356    refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]] = defaultdict(lambda: defaultdict(list))
357    skipped_nonliteral = 0
358    for _svc_name, img, _spec in iter_compose_service_images(data):
359        if not is_literal_image_ref(img):
360            skipped_nonliteral += 1
361            continue
362        repo_key, pin_key = repository_and_pin(img)
363        pins_by_repo[repo_key].add(pin_key)
364        refs_by_repo_pin[repo_key][pin_key].append(img)
365    return pins_by_repo, refs_by_repo_pin, skipped_nonliteral
366
367
368def apply_compose_intra_fix(
369    data: Any,
370    path: Path,
371    pins_by_repo: DefaultDict[str, Set[str]],
372    refs_by_repo_pin: DefaultDict[str, DefaultDict[str, List[str]]],
373    fix: bool,
374    _verbose: bool,
375) -> Tuple[List[str], bool]:
376    """
377    U jednoho compose sjednotí literální ``image`` se stejným ``repo_key`` na jeden kanonický pin.
378
379    :param data: Parsovaný strom compose (mění se in-place při ``fix=True``).
380    :param path: Cesta k souboru pro chybové hlášky.
381    :param pins_by_repo: Agregace pinů ze :func:`scan_compose_intra`.
382    :param refs_by_repo_pin: Mapa původních řetězců image podle pinu.
383    :param fix: Zapnout zápis oprav do ``data``.
384    :param _verbose: Rezervováno (nepoužito).
385    :return: ``(seznam_chyb, byl_proveden_zápis_do_dat)``.
386    """
387    errors: List[str] = []
388    modified = False
389    for repo_key, pins in pins_by_repo.items():
390        if len(pins) <= 1:
391            continue
392        if not fix:
393            errors.append(f"{path}: stejný image {repo_key!r} má v compose různé tagy/digesty: {sorted(pins)}")
394            log_msg(f"ERROR: {path}: konflikt pinů pro {repo_key!r}: {', '.join(sorted(pins))}")
395            continue
396        win = choose_canonical_pin(set(pins))
397        new_ref = pick_display_ref_for_pin({k: list(v) for k, v in refs_by_repo_pin[repo_key].items()}, win)
398        log_msg(f"FIX: {path}: sjednocuji {repo_key!r} na {new_ref!r}")
399        for _svc_name, img, spec in iter_compose_service_images(data):
400            if not is_literal_image_ref(img):
401                continue
402            rk, pk = repository_and_pin(img)
403            if rk != repo_key:
404                continue
405            if pk != win:
406                spec["image"] = new_ref
407                modified = True
408    return errors, modified
409
410
411def extract_prod_literal_map(data: Any) -> Dict[str, str]:
412    """
413    Z produkčního compose vytvoří mapu ``repo_key`` → literální řetězec ``image`` (první výskyt).
414
415    :param data: Parsovaný ``docker-compose.yml`` (nebo ekvivalent).
416    :return: Slovník pro cross-file srovnání se spotřebitelskými compose.
417    """
418    out: Dict[str, str] = {}
419    for _svc, img, _spec in iter_compose_service_images(data):
420        if not is_literal_image_ref(img):
421            continue
422        repo_key, _pin = repository_and_pin(img)
423        if repo_key not in out:
424            out[repo_key] = img.strip()
425    return out
426
427
428def apply_compose_cross_fix(
429    consumer_data: Any,
430    consumer_path: Path,
431    prod_map: Dict[str, str],
432    fix: bool,
433    verbose: bool,
434) -> Tuple[List[str], bool]:
435    """
436    Porovná literální image ve spotřebitelském compose s mapou z produkce a případně je srovná.
437
438    :param consumer_data: Parsovaný compose spotřebitele (mění se in-place při ``fix=True``).
439    :param consumer_path: Cesta k souboru spotřebitele (logy a chyby).
440    :param prod_map: Výstup :func:`extract_prod_literal_map` z ``docker-compose.yml``.
441    :param fix: Pokud True, přepíše ``image`` na hodnotu z prod při nesouladu pinu.
442    :param verbose: Zapíná podrobné SKIP logy u neliterálních image (přes ``log_msg``).
443    :return: ``(seznam_chyb při fix=False, byl_proveden_zápis)``.
444    """
445    errors: List[str] = []
446    modified = False
447    seen_deferred: Set[Tuple[str, str]] = set()
448
449    for svc_name, img, spec in iter_compose_service_images(consumer_data):
450        if not is_literal_image_ref(img):
451            if verbose:
452                log_msg(
453                    f"SKIP: {consumer_path} služba {svc_name!r}: neliterální image (přeskočeno)",
454                    verbose_only=True,
455                    verbose=verbose,
456                )
457            continue
458        repo_key, pin_key = repository_and_pin(img)
459        if repo_key in prod_map:
460            prod_ref = prod_map[repo_key]
461            _, prod_pin = repository_and_pin(prod_ref)
462            if pin_key != prod_pin:
463                msg = f"{consumer_path}: služba {svc_name!r}: {img!r} ≠ prod {prod_ref!r} " f"(repo {repo_key!r})"
464                if not fix:
465                    errors.append(msg)
466                    log_msg(f"ERROR: {msg}")
467                else:
468                    log_msg(f"FIX: {consumer_path} {svc_name!r}: {img!r}{prod_ref!r} (prod)")
469                    spec["image"] = prod_ref
470                    modified = True
471        else:
472            key = (repo_key, str(consumer_path))
473            if key not in seen_deferred:
474                seen_deferred.add(key)
475                log_msg(
476                    f"NOTE: {consumer_path}: repo {repo_key!r} má literál {img!r}, "
477                    f"ale {COMPOSE_PRODUCTION[0]} neobsahuje srovnatelný literál pro stejný repozitář — "
478                    f"cross-file srovnání přeskočeno (zkontroluj ručně / proměnné v prod)."
479                )
480    return errors, modified
481
482
483def process_dockerfiles(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
484    """
485    Zpracuje všechny Dockerfile z konstanty ``DOCKERFILE_PATHS`` pod kořenem ``root``.
486
487    :param root: Kořen repozitáře (aktuální pracovní adresář při běhu z pre-commit).
488    :param fix: Předáno do :func:`fix_dockerfile_text`; při True zapíše změny na disk.
489    :param verbose: Předáno dál pro případné budoucí logování (část volání zachována kvůli API).
490    :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
491    """
492    errors: List[str] = []
493    modified_paths: List[Path] = []
494    for rel in DOCKERFILE_PATHS:
495        path = root / rel
496        if not path.is_file():
497            continue
498        text = path.read_text(encoding="utf-8")
499        new_text, errs, mod = fix_dockerfile_text(text, fix, path, verbose)
500        errors.extend(errs)
501        if mod and fix:
502            path.write_text(new_text, encoding="utf-8")
503            modified_paths.append(path)
504            log_msg(f"Upraven soubor {path}")
505    return errors, modified_paths
506
507
508def process_compose_files(root: Path, fix: bool, verbose: bool) -> Tuple[List[str], List[Path]]:
509    """
510    Načte prod a spotřebitelské compose, provede intra-file a cross-file kontrolu nebo opravu.
511
512    Zapisuje pouze soubory, u kterých došlo ke změně dat (ne přeformátovává prod zbytečně).
513
514    :param root: Kořen repozitáře.
515    :param fix: Zapne zápis YAML a cross-file úpravy spotřebitelů.
516    :param verbose: Statistiky přeskočených neliterálních referencí a SKIP u cross-file.
517    :return: ``(souhrnné_chyby, seznam_cest_upravených_souborů)``.
518    """
519    yaml = build_yaml()
520    errors: List[str] = []
521    modified_paths: List[Path] = []
522    loaded: Dict[Path, Any] = {}
523    originals: Dict[Path, str] = {}
524
525    all_compose = [root / p for p in COMPOSE_PRODUCTION + COMPOSE_CONSUMERS if (root / p).is_file()]
526
527    for path in all_compose:
528        raw = path.read_text(encoding="utf-8")
529        originals[path] = raw
530        loaded[path] = yaml.load(raw) or {}
531
532    compose_dirty: Set[Path] = set()
533
534    for path in all_compose:
535        data = loaded[path]
536        pins_by_repo, refs_by_repo, skipped = scan_compose_intra(data, path)
537        if verbose and skipped:
538            log_msg(
539                f"INFO: {path}: přeskočeno {skipped} neliterálních image referencí",
540                verbose_only=True,
541                verbose=verbose,
542            )
543        errs, intra_mod = apply_compose_intra_fix(data, path, pins_by_repo, refs_by_repo, fix, verbose)
544        errors.extend(errs)
545        if intra_mod:
546            compose_dirty.add(path)
547
548    prod_path = root / COMPOSE_PRODUCTION[0]
549    prod_map: Dict[str, str] = {}
550    if prod_path in loaded:
551        prod_map = extract_prod_literal_map(loaded[prod_path])
552
553    for rel in COMPOSE_CONSUMERS:
554        path = root / rel
555        if path not in loaded:
556            continue
557        errs, cross_mod = apply_compose_cross_fix(loaded[path], path, prod_map, fix, verbose)
558        errors.extend(errs)
559        if cross_mod:
560            compose_dirty.add(path)
561
562    if fix:
563        for path in sorted(compose_dirty, key=str):
564            out = io.StringIO()
565            yaml.dump(loaded[path], out)
566            path.write_text(out.getvalue(), encoding="utf-8")
567            modified_paths.append(path)
568            log_msg(f"Upraven soubor {path}")
569
570    return errors, modified_paths
571
572
573def main(argv: Optional[List[str]] = None) -> int:
574    """
575    Vstupní bod CLI: spustí kontrolu Dockerfile a compose, volitelně s opravami.
576
577    :param argv: Argumenty příkazové řádky (bez ``sys.argv[0]``); None znamená ``sys.argv[1:]``.
578    :return: 0 při úspěchu; 1 při nalezených chybách (bez ``--fix``) nebo po úpravě souborů (s ``--fix``, aby pre-commit znovu zstageoval).
579    """
580    parser = argparse.ArgumentParser(description="Kontrola shody container image pinů (Dockerfile + compose).")
581    parser.add_argument("files", nargs="*", help="Ignorováno — vždy se kontrolují pevné cesty v repozitáři.")
582    parser.add_argument(
583        "--fix",
584        action="store_true",
585        help="Opravit soubory na místě (intra + cross na spotřebitele).",
586    )
587    parser.add_argument(
588        "--verbose",
589        action="store_true",
590        help="Podrobnější výstup (v pre-commit zapnuto implicitně).",
591    )
592    args = parser.parse_args(argv)
593
594    verbose = bool(args.verbose) or is_pre_commit()
595    root = Path.cwd()
596
597    all_errors: List[str] = []
598    modified: List[Path] = []
599
600    de, md = process_dockerfiles(root, args.fix, verbose)
601    all_errors.extend(de)
602    modified.extend(md)
603
604    ce, mc = process_compose_files(root, args.fix, verbose)
605    all_errors.extend(ce)
606    modified.extend(mc)
607
608    for err in all_errors:
609        if not err.startswith(LOG_PREFIX):
610            print(err, file=sys.stderr)
611
612    if all_errors:
613        return 1
614
615    if args.fix and modified:
616        log_msg(f"HOTOVÁ ÚPRAVA: {len(modified)} soubor(ů); znovu přidej do commitu.")
617        return 1
618
619    return 0
620
621
622if __name__ == "__main__":
623    sys.exit(main())