From 74cc3eafad3811f8114a8ac9c99160862f4ae523 Mon Sep 17 00:00:00 2001 From: badra001 Date: Fri, 9 Jan 2026 14:42:31 -0500 Subject: [PATCH] add new ecr check and assess --- .../cross-organization/assess_check_ecr.py | 54 +++++++++++++ .../cross-organization/check_ecr.py | 75 +++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100755 local-app/python-tools/cross-organization/assess_check_ecr.py create mode 100644 local-app/python-tools/cross-organization/check_ecr.py diff --git a/local-app/python-tools/cross-organization/assess_check_ecr.py b/local-app/python-tools/cross-organization/assess_check_ecr.py new file mode 100755 index 00000000..09ad867f --- /dev/null +++ b/local-app/python-tools/cross-organization/assess_check_ecr.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python + +import json, argparse, sys, os, glob +from datetime import datetime + +# --- VERSIONING --- +__version__ = "1.0.0" + +def find_latest_file(pattern): + files = glob.glob(pattern) + return max(files, key=os.path.getctime) if files else None + +def main(): + parser = argparse.ArgumentParser(description="AWS ECR Audit Assessor") + parser.add_argument("--input", help="JSON audit file") + args = parser.parse_args() + + input_file = args.input or find_latest_file("audit_results.check_ecr.*.json") + if not input_file: print("Error: No file found."); sys.exit(1) + + with open(input_file, 'r') as f: data = json.load(f) + + report_width = 220 + print("-" * report_width) + print(f"ECR REPOSITORY ASSESSMENT | Total Accounts: {len(data)}") + print("-" * report_width) + print(f"{'Idx':<5} | {'Account ID':<15} | {'Region':<12} | {'Repo Name':<40} | {'Mutability':<12} | {'Lifecycle':<10} | {'Img Count'}") + print("-" * report_width) + + stats = {"total_repos": 0, "total_images": 0, "no_lifecycle": 0} + + for idx, account in enumerate(data, 1): + checks = account.get("data", {}) + for key, val in checks.items(): + if key == "account_summary" or ":" not in key: continue + + region = key.split(":")[0] + stats["total_repos"] += 1 + img_count = len(val.get("images", [])) + stats["total_images"] += img_count + + lifecycle_status = "YES" if val.get("has_lifecycle") == "True" else "NO" + if lifecycle_status == "NO": stats["no_lifecycle"] += 1 + + print(f"{idx:<5} | {account['account_id']:<15} | {region:<12} | {val['repo_name']:<40} | {val['mutability']:<12} | {lifecycle_status:<10} | {img_count}") + + print("-" * report_width) + print(f"ORGANIZATION ECR FOOTPRINT SUMMARY") + print(f" Total Repositories: {stats['total_repos']}") + print(f" Total Images: {stats['total_images']}") + print(f" Repos w/o Lifecycle Policies: {stats['no_lifecycle']} (RISK)") + print("-" * report_width) + +if __name__ == "__main__": main() diff --git a/local-app/python-tools/cross-organization/check_ecr.py b/local-app/python-tools/cross-organization/check_ecr.py new file mode 100644 index 00000000..2f8f6100 --- /dev/null +++ b/local-app/python-tools/cross-organization/check_ecr.py @@ -0,0 +1,75 @@ +import boto3 +import time +import json +from datetime import datetime + +# --- VERSIONING --- +__version__ = "1.0.0" + +def get_repo_images(ecr_client, repo_name): + """Fetches details for all images in a repository.""" + images = [] + try: + paginator = ecr_client.get_paginator('describe_images') + for page in paginator.paginate(repositoryName=repo_name): + for img in page['imageDetails']: + images.append({ + "image_tags": img.get('imageTags', []), + "pushed_at": img['imagePushedAt'].isoformat() if 'imagePushedAt' in img else "N/A", + "last_pulled_at": img['lastRecordedPullTime'].isoformat() if 'lastRecordedPullTime' in img else "N/A", + "status": img.get('imageStatus', 'ACTIVE'), + "size_bytes": img.get('imageSizeInBytes', 0) + }) + except: pass + return images + +def get_lifecycle_policy(ecr_client, repo_name): + """Checks for lifecycle policy and counts rules.""" + try: + resp = ecr_client.get_lifecycle_policy(repositoryName=repo_name) + policy_text = json.loads(resp.get('lifecyclePolicyText', '{}')) + rules = policy_text.get('rules', []) + return True, len(rules) + except: + return False, 0 + +def account_task(account_session, account_id, account_name, region): + results = {"alias": "N/A", "data": {}} + try: + results["alias"] = account_session.client('iam').list_account_aliases().get('AccountAliases', ["N/A"])[0] + ec2 = account_session.client('ec2', region_name=region) + regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] + + sts = account_session.client('sts') + partition = sts.get_caller_identity()['Arn'].split(':')[1] + + for reg in regions: + ecr = account_session.client('ecr', region_name=reg) + try: + repos = ecr.describe_repositories().get('repositories', []) + for repo in repos: + name = repo['repositoryName'] + arn = repo['repositoryArn'] + + has_lifecycle, rule_count = get_lifecycle_policy(ecr, name) + tags_resp = ecr.list_tags_for_resource(resourceArn=arn) + + repo_data = { + "resource": arn, + "partition": partition, + "repo_name": name, + "repo_arn": arn, + "created_at": repo['createdAt'].isoformat(), + "mutability": repo.get('imageTagMutability', 'MUTABLE'), + "tags": tags_resp.get('tags', []), + "has_lifecycle": str(has_lifecycle), + "lifecycle_rule_count": rule_count, + "images": get_repo_images(ecr, name) + } + + results["data"][f"{reg}:{name}"] = repo_data + except: continue + + results["data"]["account_summary"] = {"_summary": "PROCESSED"} + except Exception as e: results["error"] = str(e) + return results