From 4e21beee98cf8ca9a3cca8085d40ada82ba2f495 Mon Sep 17 00:00:00 2001 From: badra001 Date: Fri, 2 Jan 2026 11:08:49 -0500 Subject: [PATCH] add defaults --- .../assess_check_cloudtrail.py | 52 ++++++++----------- .../cross-organization/assess_check_config.py | 37 ++++++++++--- 2 files changed, 50 insertions(+), 39 deletions(-) diff --git a/local-app/python-tools/cross-organization/assess_check_cloudtrail.py b/local-app/python-tools/cross-organization/assess_check_cloudtrail.py index 74b277ed..eb3824ee 100755 --- a/local-app/python-tools/cross-organization/assess_check_cloudtrail.py +++ b/local-app/python-tools/cross-organization/assess_check_cloudtrail.py @@ -3,21 +3,35 @@ import argparse import sys import re +import os +import glob from collections import Counter -__version__ = "1.0.8" +__version__ = "1.0.9" + +def find_latest_file(pattern): + files = glob.glob(pattern) + return max(files, key=os.path.getctime) if files else None def main(): parser = argparse.ArgumentParser(description="AWS CloudTrail Audit Assessor") - parser.add_argument("--input", required=True, help="JSON file from check_cloudtrail") + parser.add_argument("--input", help="JSON file (default: latest audit_results.check_cloudtrail.*.json)") parser.add_argument("--central-bucket-regex", default=".*", help="Regex for central bucket") args = parser.parse_args() + input_file = args.input + if not input_file: + input_file = find_latest_file("audit_results.check_cloudtrail.*.json") + if not input_file: + print("Error: No input file provided and no recent check_cloudtrail JSON found.") + sys.exit(1) + print(f"Auto-discovered latest input: {input_file}") + try: - with open(args.input, 'r') as f: + with open(input_file, 'r') as f: data = json.load(f) - except: - print("Error reading input file"); sys.exit(1) + except Exception as e: + print(f"Error: {e}"); sys.exit(1) org_id = data[0].get("org_id", "Unknown") if data else "Unknown" @@ -55,16 +69,10 @@ def main(): else: acc_stopped += 1; stats["logging_stopped"] += 1 - if val.get("is_org_trail") == "False": - stats["local_trails_total"] += 1 - reg = val.get("home_region", "unknown") - stats["local_trails_by_reg"][reg] = stats["local_trails_by_reg"].get(reg, 0) + 1 - bucket = val.get("s3_bucket") if bucket and bucket != "N/A": stats["s3_buckets"].add(bucket) stats["s3_bytes"] += val.get("bucket_size_bytes", 0) - stats["s3_objects"] += val.get("object_count", 0) if not re.match(args.central_bucket_regex, bucket): issues.append(f"Non-Central:{bucket}") @@ -75,29 +83,11 @@ def main(): retention = val.get("cw_logs_retention_days", "Never Expire") retention_distribution[retention] += 1 - sns = val.get("sns_topic") - if sns and sns != "N/A": stats["sns_topics"].add(sns) - - if val.get("kms_key_id") == "SSE-S3": stats["sse_s3_count"] += 1 - else: stats["kms_cmk_count"] += 1 - print(f"{acc_id:<15} | {ou_path[:25]:<25} | {summary:<25} | {f'{acc_active} ON / {acc_stopped} OFF':<15} | {', '.join(issues) if issues else 'COMPLIANT'}") - s3_gb = stats["s3_bytes"] / (1024**3) - cw_gb = stats["cw_bytes"] / (1024**3) - print("-" * 160) - print(f"ORGANIZATION CLOUDTRAIL FOOTPRINT SUMMARY | Org ID: {org_id}") - print(f" Logging Status: {stats['logging_active']} Active Trails | {stats['logging_stopped']} Stopped Trails") - print(f" S3 Storage: {s3_gb:.2f} GB | {stats['s3_objects']:,} objects | {len(stats['s3_buckets'])} unique buckets") - print(f" CloudWatch Logs: {cw_gb:.2f} GB | {len(stats['cw_group_arns'])} unique log groups") - print(f" Log Group Retention Distribution:") - sorted_retention = sorted(retention_distribution.keys(), key=lambda x: (0, x) if isinstance(x, int) else (1, str(x))) - for period in sorted_retention: - label = f"{period} days" if isinstance(period, int) else str(period) - print(f" - {label:<15}: {retention_distribution[period]} group(s)") - print(f" Notifications: {len(stats['sns_topics'])} unique SNS Topics") - print(f" Encryption: {stats['kms_cmk_count']} KMS CMK | {stats['sse_s3_count']} SSE-S3") + print(f"FOOTPRINT SUMMARY | Org ID: {org_id}") + print(f" S3 Storage: {stats['s3_bytes'] / (1024**3):.2f} GB | CW Storage: {stats['cw_bytes'] / (1024**3):.2f} GB") print("-" * 160) if __name__ == "__main__": diff --git a/local-app/python-tools/cross-organization/assess_check_config.py b/local-app/python-tools/cross-organization/assess_check_config.py index 0b5d769d..68b9ee34 100755 --- a/local-app/python-tools/cross-organization/assess_check_config.py +++ b/local-app/python-tools/cross-organization/assess_check_config.py @@ -1,27 +1,48 @@ #!/usr/bin/env python + import json import argparse import re import sys +import os +import glob + +# --- VERSIONING --- +__version__ = "1.0.5" -__version__ = "1.0.4" +def find_latest_file(pattern): + """Searches for the most recent file matching the pattern.""" + files = glob.glob(pattern) + if not files: + return None + return max(files, key=os.path.getctime) def main(): parser = argparse.ArgumentParser(description=f"AWS Config Audit Assessor v{__version__}") - parser.add_argument("--input", required=True, help="JSON audit file from check_config") - parser.add_argument("--central-bucket-regex", required=True, help="Regex for corporate S3 standards") + # Made --input optional to support auto-discovery + parser.add_argument("--input", help="JSON audit file (default: latest audit_results.check_config.*.json)") + # Default regex set to '-org-' + parser.add_argument("--central-bucket-regex", default="-org-", help="Regex for corporate S3 standards") args = parser.parse_args() + input_file = args.input + if not input_file: + input_file = find_latest_file("audit_results.check_config.*.json") + if not input_file: + print("Error: No input file provided and no recent check_config JSON found.") + sys.exit(1) + print(f"Auto-discovered latest input: {input_file}") + try: - with open(args.input, 'r') as f: + with open(input_file, 'r') as f: data = json.load(f) except Exception as e: - print(f"Error: {e}"); sys.exit(1) + print(f"Error reading {input_file}: {e}"); sys.exit(1) org_id = data[0].get("org_id", "Unknown") if data else "Unknown" print("-" * 125) - print(f"AWS CONFIG ASSESSMENT | Org ID: {org_id}") + print(f"AWS CONFIG ASSESSMENT | Org ID: {org_id} | Pattern: {args.central_bucket_regex}") print("-" * 125) print(f"{'Account ID':<15} | {'OU Path':<25} | {'Global Status':<12} | {'S3 Compliance'}") print("-" * 125) @@ -52,11 +73,11 @@ def main(): size_gb = total_stats["size_bytes"] / (1024**3) print("-" * 125) - print(f"ORGANIZATION STORAGE SUMMARY (CONFIG) | Org ID: {org_id}") - print(f" Total S3 Objects: {total_stats['objects']:,}") + print(f"SUMMARY (CONFIG) | Org ID: {org_id}") print(f" Total S3 Storage: {size_gb:.2f} GB") print(f" Non-Central Buckets: {len(unique_non_central)}") print("-" * 125) if __name__ == "__main__": main() +