From e683af7b835b828ec25fbc4f07932b1e36439a80 Mon Sep 17 00:00:00 2001 From: badra001 Date: Fri, 9 Jan 2026 16:46:06 -0500 Subject: [PATCH] revert --- .../cross-organization/assess_check_ecr.py | 120 +++++------------- 1 file changed, 32 insertions(+), 88 deletions(-) diff --git a/local-app/python-tools/cross-organization/assess_check_ecr.py b/local-app/python-tools/cross-organization/assess_check_ecr.py index ea192d50..f52a407e 100755 --- a/local-app/python-tools/cross-organization/assess_check_ecr.py +++ b/local-app/python-tools/cross-organization/assess_check_ecr.py @@ -1,36 +1,24 @@ #!/usr/bin/env python - -import json -import argparse -import sys -import os -import glob +import json, argparse, sys, os, glob from datetime import datetime, timezone from collections import Counter, defaultdict # --- VERSIONING --- -__version__ = "1.1.3" +__version__ = "1.3.1" def find_latest_file(pattern): - """Locates the most recent ECR audit JSON file.""" - files = glob.glob("audit_results.check_ecr.*.json") + files = glob.glob(pattern) return max(files, key=os.path.getctime) if files else None def get_days_ago(iso_str): - """Calculates days between now and an ISO date string, handling timezones.""" - if not iso_str or iso_str == "N/A": - return None + if not iso_str or iso_str == "N/A": return None try: dt = datetime.fromisoformat(iso_str) - if dt.tzinfo is None: - dt = dt.replace(tzinfo=timezone.utc) - now = datetime.now(timezone.utc) - return (now - dt).days - except: - return None + if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) + return (datetime.now(timezone.utc) - dt).days + except: return None def bucket_age(days, counters): - """Increments the appropriate age bucket counter for distribution reporting.""" if days is None: return if days >= 365: counters['365+'] += 1 elif days >= 180: counters['180-364'] += 1 @@ -40,32 +28,24 @@ def bucket_age(days, counters): else: counters['<30'] += 1 def main(): - parser = argparse.ArgumentParser(description=f"AWS ECR Comprehensive Assessor v{__version__}") - parser.add_argument("--input", help="JSON audit file path") + parser = argparse.ArgumentParser(description="ECR Full Spectrum Assessor - v1.3.1") + parser.add_argument("--input", help="JSON audit file") args = parser.parse_args() input_file = args.input or find_latest_file("audit_results.check_ecr.*.json") - if not input_file: - print("Error: No ECR audit file found."); sys.exit(1) + if not input_file: print("Error: No file found."); sys.exit(1) - try: - with open(input_file, 'r') as f: - data = json.load(f) - except Exception as e: - print(f"Error reading file: {e}"); sys.exit(1) + with open(input_file, 'r') as f: data = json.load(f) report_width = 240 print("-" * report_width) - print(f"ECR COMPREHENSIVE ASSESSMENT | Accounts: {len(data)} | Input: {os.path.basename(input_file)}") + print(f"ECR COMPREHENSIVE AUDIT | Accounts: {len(data)} | Input: {os.path.basename(input_file)}") print("-" * report_width) - - # Main Table Header - print(f"{'Idx':<5} | {'Account ID':<15} | {'Region':<12} | {'Repo Name':<40} | {'Size (GB)':<10} | {'Mutability':<11} | {'L/Cycle':<7} | {'CRITICAL':<8} | {'HIGH':<8}") + print(f"{'Idx':<4} | {'Account ID':<15} | {'Region':<12} | {'Repo Name':<40} | {'Size (GB)':<10} | {'Mutability':<11} | {'L/Cycle':<7} | {'CRITICAL':<8} | {'HIGH':<8}") print("-" * report_width) - # Global Statistics Tracking stats = { - "repos": 0, "total_images": 0, "total_bytes": 0, "no_lc": 0, + "total_repos": 0, "total_images": 0, "total_bytes": 0, "no_lc": 0, "mut": {"IMMUTABLE": 0, "MUTABLE": 0}, "region_bytes": defaultdict(int), "push_ages": Counter(), "pull_ages": Counter(), @@ -73,24 +53,16 @@ def main(): "org_vulns": Counter(), "scanned_imgs": 0 } - # Data for the Stale Image Audit table - stale_images_list = [] - for idx, account in enumerate(data, 1): - acc_id = account.get("account_id") checks = account.get("data", {}) - for key, val in checks.items(): if key == "account_summary" or ":" not in key: continue - region = key.split(":")[0] - stats["repos"] += 1 + stats["total_repos"] += 1 - # Repository Config repo_mut = val.get("mutability", "MUTABLE") stats["mut"][repo_mut] += 1 - has_lc = val.get("has_lifecycle") == "True" - if not has_lc: stats["no_lc"] += 1 + if val.get("has_lifecycle") == "False": stats["no_lc"] += 1 repo_size = val.get("repo_size_bytes", 0) stats["total_bytes"] += repo_size @@ -101,75 +73,47 @@ def main(): stats["total_images"] += len(images) for img in images: - # Security (CVE) Processing + # Security + # FIX: Ensure img is a dictionary before calling .get() + # If img is a single-element list containing a dict, use img[0] + if isinstance(img, list) and len(img) > 0: + img = img[0] + counts = img.get("severity_counts", {}) if counts: stats["scanned_imgs"] += 1 for sev, count in counts.items(): repo_vulns[sev] += count stats["org_vulns"][sev] += count - - # Aging Analysis + # Aging p_days = get_days_ago(img.get("pushed_at")) - l_days = get_days_ago(img.get("last_pulled_at")) - if p_days is not None: bucket_age(p_days, stats["push_ages"]) stats["total_push_days"] += p_days stats["push_count"] += 1 - - # Capture Stale Image Data (Pushed > 365 Days) - if p_days > 365: - stale_images_list.append({ - "account": acc_id, "region": region, "repo": val.get("repo_name"), - "tag": ", ".join(img.get("image_tags", []))[:25] or "untagged", - "p_days": p_days, "l_days": l_days - }) - - bucket_age(l_days, stats["pull_ages"]) + bucket_age(get_days_ago(img.get("last_pulled_at")), stats["pull_ages"]) - # Print Row for Main Assessment Table - print(f"{idx:<5} | {acc_id:<15} | {region:<12} | {val.get('repo_name', 'N/A')[:40]:<40} | {repo_size/(1024**3):<10.2f} | {repo_mut:<11} | {'YES' if has_lc else 'NO':<7} | {repo_vulns['CRITICAL']:<8} | {repo_vulns['HIGH']:<8}") + print(f"{idx:<4} | {account['account_id']:<15} | {region:<12} | {val['repo_name']:<40} | {repo_size/(1024**3):<10.2f} | {repo_mut:<11} | {'YES' if val.get('has_lifecycle')=='True' else 'NO':<7} | {repo_vulns['CRITICAL']:<8} | {repo_vulns['HIGH']:<8}") - # Primary Footers + # Footers avg_img_mb = (stats["total_bytes"] / stats["total_images"]) / (1024**2) if stats["total_images"] > 0 else 0 avg_push_age = stats["total_push_days"] / stats["push_count"] if stats["push_count"] > 0 else 0 print("-" * report_width) - print(f"ORGANIZATION ECR SUMMARY") + print(f"ORGANIZATION ECR SUMMARY\n") print(f" --- Config & Storage ---") - print(f" Repos: {stats['repos']} ({stats['mut']['IMMUTABLE']} Immutable / {stats['mut']['MUTABLE']} Mutable) | Missing Lifecycle: {stats['no_lc']}") - print(f" Total Data: {stats['total_bytes']/(1024**3):.2f} GB | Average Image Size: {avg_img_mb:.2f} MB") + print(f" Repos: {stats['total_repos']} ({stats['mut']['IMMUTABLE']} Immutable / {stats['mut']['MUTABLE']} Mutable) | Missing Lifecycle: {stats['no_lc']}") + print(f" Total Data: {stats['total_bytes']/(1024**3):.2f} GB | Average Image Size: {avg_img_mb:.2f} MB\n") - print(f"\n --- Security & Vulnerabilities ---") + print(f" --- Security & Vulnerabilities ---") print(f" Scan Coverage: {stats['scanned_imgs']} of {stats['total_images']} images scanned") print(f" Org Totals: " + " | ".join([f"{s}: {stats['org_vulns'][s]:,}" for s in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]])) - print(f"\n --- Age Distribution (Push vs Pull) ---") + print(f"\n --- Age Distribution (Days since Push vs Pull) ---") print(f" Average Image Age: {avg_push_age:.1f} days") print(f" Bucket | Pushed Count | Pulled Count") for b in ['<30', '30-59', '60-89', '90-179', '180-364', '365+']: print(f" {b:<11} | {stats['push_ages'][b]:<12} | {stats['pull_ages'][b]}") - - # --- NEW STALE IMAGE AUDIT TABLE --- - print("\n" + "=" * report_width) - print(f"STALE IMAGE AUDIT (PUSHED > 365 DAYS AGO)") - print("=" * report_width) - print(f"{'Idx':<5} | {'Account ID':<15} | {'Region':<12} | {'Repo Name':<45} | {'Image Tag':<25} | {'Pushed Days':<12} | {'Last Pull Days'}") - print("-" * report_width) - - for s_idx, s_img in enumerate(stale_images_list, 1): - pull_str = f"{s_img['l_days']}" if s_img['l_days'] is not None else "NEVER" - print(f"{s_idx:<5} | {s_img['account']:<15} | {s_img['region']:<12} | {s_img['repo']:<45} | {s_img['tag']:<25} | {s_img['p_days']:<12} | {pull_str}") - - if not stale_images_list: - print(f"{' ' * 100} NO STALE IMAGES FOUND") - - print("-" * report_width) - print(f"STALE AUDIT SUMMARY") - print(f" Total Images > 365 Days Old: {len(stale_images_list)}") - print(f" Recommendation: Review images with 'NEVER' or high 'Last Pull Days' for cleanup.") print("-" * report_width) -if __name__ == "__main__": - main() +if __name__ == "__main__": main()