diff --git a/local-app/python-tools/cross-organization/check_ecr.py b/local-app/python-tools/cross-organization/check_ecr.py index e95e9b9e..d16bfcae 100644 --- a/local-app/python-tools/cross-organization/check_ecr.py +++ b/local-app/python-tools/cross-organization/check_ecr.py @@ -4,10 +4,10 @@ from datetime import datetime # --- VERSIONING --- -__version__ = "1.2.4" +__version__ = "1.2.5" def get_repo_images(ecr_client, repo_name): - """Fetches a flat list of image dictionaries with reliable scan lookups.""" + """Fetches images and forces discovery of Enhanced Scanning findings.""" images = [] try: paginator = ecr_client.get_paginator('describe_images') @@ -15,18 +15,32 @@ def get_repo_images(ecr_client, repo_name): for img in page['imageDetails']: digest = img.get('imageDigest') - # Use scan summary if available - scan_summary = img.get('imageScanFindingsSummary', {}) - severity_counts = scan_summary.get('findingSeverityCounts', {}) + # Enhanced Scanning often marks status as 'COMPLETE' or 'ACTIVE' + # but might leave the summary empty in describe_images. + scan_status_obj = img.get('imageScanStatus', {}) + status = scan_status_obj.get('status', 'NO_SCAN') - # FIXED: Force scan finding lookup for reliable CVE reporting - if not severity_counts and img.get('imageScanStatus', {}).get('status') == 'COMPLETE': + # Check for legacy summary first + severity_counts = img.get('imageScanFindingsSummary', {}).get('findingSeverityCounts', {}) + + # FIX: If summary is empty but scanning is enabled/complete, query findings API directly. + # This is the required path for Enhanced Scanning (Inspector) results. + if not severity_counts: try: findings = ecr_client.describe_image_scan_findings( repositoryName=repo_name, imageId={'imageDigest': digest} ) - severity_counts = findings.get('imageScanFindings', {}).get('findingSeverityCounts', {}) + # Extract counts from the findings report + f_summary = findings.get('imageScanFindings', {}) + severity_counts = f_summary.get('findingSeverityCounts', {}) + + # If we found findings via this API, update status to reflect it + if severity_counts: + status = 'COMPLETE' + except ecr_client.exceptions.ScanNotFoundException: + # No scan exists for this specific image digest + status = 'NO_SCAN' except: pass @@ -35,14 +49,13 @@ def get_repo_images(ecr_client, repo_name): "image_digest": digest, "pushed_at": img['imagePushedAt'].isoformat() if 'imagePushedAt' in img else "N/A", "last_pulled_at": img['lastRecordedPullTime'].isoformat() if 'lastRecordedPullTime' in img else "N/A", - "scan_status": img.get('imageScanStatus', {}).get('status', 'NO_SCAN'), + "scan_status": status, "severity_counts": severity_counts, "size_bytes": img.get('imageSizeInBytes', 0) }) - except: + except Exception: pass - # RESTORED: Now returns ONLY the flat images list to avoid JSON corruption return images def account_task(account_session, account_id, account_name, region): @@ -63,21 +76,29 @@ def account_task(account_session, account_id, account_name, region): name = repo['repositoryName'] arn = repo['repositoryArn'] - # Correctly assigned to the flat list + # Lifecycle logic placeholder or restoration point + has_lc = False + try: + ecr.get_lifecycle_policy(repositoryName=name) + has_lc = True + except: pass + img_list = get_repo_images(ecr, name) - repo_data = { + results["data"][f"{reg}:{name}"] = { "resource": arn, "partition": partition, "repo_name": name, "repo_arn": arn, "created_at": repo['createdAt'].isoformat(), "mutability": repo.get('imageTagMutability', 'MUTABLE'), - "has_lifecycle": str(True), # Placeholder for logic - "images": img_list # CLEAN FLAT LIST + "has_lifecycle": str(has_lc), + "images": img_list } - results["data"][f"{reg}:{name}"] = repo_data except: continue + results["data"]["account_summary"] = {"_summary": "PROCESSED"} - except Exception as e: results["error"] = str(e) + except Exception as e: + results["error"] = str(e) + return results