-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
129 additions
and
0 deletions.
There are no files selected for viewing
54 changes: 54 additions & 0 deletions
54
local-app/python-tools/cross-organization/assess_check_ecr.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| #!/usr/bin/env python | ||
|
|
||
| import json, argparse, sys, os, glob | ||
| from datetime import datetime | ||
|
|
||
| # --- VERSIONING --- | ||
| __version__ = "1.0.0" | ||
|
|
||
| def find_latest_file(pattern): | ||
| files = glob.glob(pattern) | ||
| return max(files, key=os.path.getctime) if files else None | ||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser(description="AWS ECR Audit Assessor") | ||
| parser.add_argument("--input", help="JSON audit file") | ||
| args = parser.parse_args() | ||
|
|
||
| input_file = args.input or find_latest_file("audit_results.check_ecr.*.json") | ||
| if not input_file: print("Error: No file found."); sys.exit(1) | ||
|
|
||
| with open(input_file, 'r') as f: data = json.load(f) | ||
|
|
||
| report_width = 220 | ||
| print("-" * report_width) | ||
| print(f"ECR REPOSITORY ASSESSMENT | Total Accounts: {len(data)}") | ||
| print("-" * report_width) | ||
| print(f"{'Idx':<5} | {'Account ID':<15} | {'Region':<12} | {'Repo Name':<40} | {'Mutability':<12} | {'Lifecycle':<10} | {'Img Count'}") | ||
| print("-" * report_width) | ||
|
|
||
| stats = {"total_repos": 0, "total_images": 0, "no_lifecycle": 0} | ||
|
|
||
| for idx, account in enumerate(data, 1): | ||
| checks = account.get("data", {}) | ||
| for key, val in checks.items(): | ||
| if key == "account_summary" or ":" not in key: continue | ||
|
|
||
| region = key.split(":")[0] | ||
| stats["total_repos"] += 1 | ||
| img_count = len(val.get("images", [])) | ||
| stats["total_images"] += img_count | ||
|
|
||
| lifecycle_status = "YES" if val.get("has_lifecycle") == "True" else "NO" | ||
| if lifecycle_status == "NO": stats["no_lifecycle"] += 1 | ||
|
|
||
| print(f"{idx:<5} | {account['account_id']:<15} | {region:<12} | {val['repo_name']:<40} | {val['mutability']:<12} | {lifecycle_status:<10} | {img_count}") | ||
|
|
||
| print("-" * report_width) | ||
| print(f"ORGANIZATION ECR FOOTPRINT SUMMARY") | ||
| print(f" Total Repositories: {stats['total_repos']}") | ||
| print(f" Total Images: {stats['total_images']}") | ||
| print(f" Repos w/o Lifecycle Policies: {stats['no_lifecycle']} (RISK)") | ||
| print("-" * report_width) | ||
|
|
||
| if __name__ == "__main__": main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| import boto3 | ||
| import time | ||
| import json | ||
| from datetime import datetime | ||
|
|
||
| # --- VERSIONING --- | ||
| __version__ = "1.0.0" | ||
|
|
||
| def get_repo_images(ecr_client, repo_name): | ||
| """Fetches details for all images in a repository.""" | ||
| images = [] | ||
| try: | ||
| paginator = ecr_client.get_paginator('describe_images') | ||
| for page in paginator.paginate(repositoryName=repo_name): | ||
| for img in page['imageDetails']: | ||
| images.append({ | ||
| "image_tags": img.get('imageTags', []), | ||
| "pushed_at": img['imagePushedAt'].isoformat() if 'imagePushedAt' in img else "N/A", | ||
| "last_pulled_at": img['lastRecordedPullTime'].isoformat() if 'lastRecordedPullTime' in img else "N/A", | ||
| "status": img.get('imageStatus', 'ACTIVE'), | ||
| "size_bytes": img.get('imageSizeInBytes', 0) | ||
| }) | ||
| except: pass | ||
| return images | ||
|
|
||
| def get_lifecycle_policy(ecr_client, repo_name): | ||
| """Checks for lifecycle policy and counts rules.""" | ||
| try: | ||
| resp = ecr_client.get_lifecycle_policy(repositoryName=repo_name) | ||
| policy_text = json.loads(resp.get('lifecyclePolicyText', '{}')) | ||
| rules = policy_text.get('rules', []) | ||
| return True, len(rules) | ||
| except: | ||
| return False, 0 | ||
|
|
||
| def account_task(account_session, account_id, account_name, region): | ||
| results = {"alias": "N/A", "data": {}} | ||
| try: | ||
| results["alias"] = account_session.client('iam').list_account_aliases().get('AccountAliases', ["N/A"])[0] | ||
| ec2 = account_session.client('ec2', region_name=region) | ||
| regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] | ||
|
|
||
| sts = account_session.client('sts') | ||
| partition = sts.get_caller_identity()['Arn'].split(':')[1] | ||
|
|
||
| for reg in regions: | ||
| ecr = account_session.client('ecr', region_name=reg) | ||
| try: | ||
| repos = ecr.describe_repositories().get('repositories', []) | ||
| for repo in repos: | ||
| name = repo['repositoryName'] | ||
| arn = repo['repositoryArn'] | ||
|
|
||
| has_lifecycle, rule_count = get_lifecycle_policy(ecr, name) | ||
| tags_resp = ecr.list_tags_for_resource(resourceArn=arn) | ||
|
|
||
| repo_data = { | ||
| "resource": arn, | ||
| "partition": partition, | ||
| "repo_name": name, | ||
| "repo_arn": arn, | ||
| "created_at": repo['createdAt'].isoformat(), | ||
| "mutability": repo.get('imageTagMutability', 'MUTABLE'), | ||
| "tags": tags_resp.get('tags', []), | ||
| "has_lifecycle": str(has_lifecycle), | ||
| "lifecycle_rule_count": rule_count, | ||
| "images": get_repo_images(ecr, name) | ||
| } | ||
|
|
||
| results["data"][f"{reg}:{name}"] = repo_data | ||
| except: continue | ||
|
|
||
| results["data"]["account_summary"] = {"_summary": "PROCESSED"} | ||
| except Exception as e: results["error"] = str(e) | ||
| return results |