Skip to content

Commit

Permalink
add defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 2, 2026
1 parent 3d6c749 commit 4e21bee
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,35 @@
import argparse
import sys
import re
import os
import glob
from collections import Counter

__version__ = "1.0.8"
__version__ = "1.0.9"

def find_latest_file(pattern):
files = glob.glob(pattern)
return max(files, key=os.path.getctime) if files else None

def main():
parser = argparse.ArgumentParser(description="AWS CloudTrail Audit Assessor")
parser.add_argument("--input", required=True, help="JSON file from check_cloudtrail")
parser.add_argument("--input", help="JSON file (default: latest audit_results.check_cloudtrail.*.json)")
parser.add_argument("--central-bucket-regex", default=".*", help="Regex for central bucket")
args = parser.parse_args()

input_file = args.input
if not input_file:
input_file = find_latest_file("audit_results.check_cloudtrail.*.json")
if not input_file:
print("Error: No input file provided and no recent check_cloudtrail JSON found.")
sys.exit(1)
print(f"Auto-discovered latest input: {input_file}")

try:
with open(args.input, 'r') as f:
with open(input_file, 'r') as f:
data = json.load(f)
except:
print("Error reading input file"); sys.exit(1)
except Exception as e:
print(f"Error: {e}"); sys.exit(1)

org_id = data[0].get("org_id", "Unknown") if data else "Unknown"

Expand Down Expand Up @@ -55,16 +69,10 @@ def main():
else:
acc_stopped += 1; stats["logging_stopped"] += 1

if val.get("is_org_trail") == "False":
stats["local_trails_total"] += 1
reg = val.get("home_region", "unknown")
stats["local_trails_by_reg"][reg] = stats["local_trails_by_reg"].get(reg, 0) + 1

bucket = val.get("s3_bucket")
if bucket and bucket != "N/A":
stats["s3_buckets"].add(bucket)
stats["s3_bytes"] += val.get("bucket_size_bytes", 0)
stats["s3_objects"] += val.get("object_count", 0)
if not re.match(args.central_bucket_regex, bucket):
issues.append(f"Non-Central:{bucket}")

Expand All @@ -75,29 +83,11 @@ def main():
retention = val.get("cw_logs_retention_days", "Never Expire")
retention_distribution[retention] += 1

sns = val.get("sns_topic")
if sns and sns != "N/A": stats["sns_topics"].add(sns)

if val.get("kms_key_id") == "SSE-S3": stats["sse_s3_count"] += 1
else: stats["kms_cmk_count"] += 1

print(f"{acc_id:<15} | {ou_path[:25]:<25} | {summary:<25} | {f'{acc_active} ON / {acc_stopped} OFF':<15} | {', '.join(issues) if issues else 'COMPLIANT'}")

s3_gb = stats["s3_bytes"] / (1024**3)
cw_gb = stats["cw_bytes"] / (1024**3)

print("-" * 160)
print(f"ORGANIZATION CLOUDTRAIL FOOTPRINT SUMMARY | Org ID: {org_id}")
print(f" Logging Status: {stats['logging_active']} Active Trails | {stats['logging_stopped']} Stopped Trails")
print(f" S3 Storage: {s3_gb:.2f} GB | {stats['s3_objects']:,} objects | {len(stats['s3_buckets'])} unique buckets")
print(f" CloudWatch Logs: {cw_gb:.2f} GB | {len(stats['cw_group_arns'])} unique log groups")
print(f" Log Group Retention Distribution:")
sorted_retention = sorted(retention_distribution.keys(), key=lambda x: (0, x) if isinstance(x, int) else (1, str(x)))
for period in sorted_retention:
label = f"{period} days" if isinstance(period, int) else str(period)
print(f" - {label:<15}: {retention_distribution[period]} group(s)")
print(f" Notifications: {len(stats['sns_topics'])} unique SNS Topics")
print(f" Encryption: {stats['kms_cmk_count']} KMS CMK | {stats['sse_s3_count']} SSE-S3")
print(f"FOOTPRINT SUMMARY | Org ID: {org_id}")
print(f" S3 Storage: {stats['s3_bytes'] / (1024**3):.2f} GB | CW Storage: {stats['cw_bytes'] / (1024**3):.2f} GB")
print("-" * 160)

if __name__ == "__main__":
Expand Down
37 changes: 29 additions & 8 deletions local-app/python-tools/cross-organization/assess_check_config.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,48 @@
#!/usr/bin/env python

import json
import argparse
import re
import sys
import os
import glob

# --- VERSIONING ---
__version__ = "1.0.5"

__version__ = "1.0.4"
def find_latest_file(pattern):
"""Searches for the most recent file matching the pattern."""
files = glob.glob(pattern)
if not files:
return None
return max(files, key=os.path.getctime)

def main():
parser = argparse.ArgumentParser(description=f"AWS Config Audit Assessor v{__version__}")
parser.add_argument("--input", required=True, help="JSON audit file from check_config")
parser.add_argument("--central-bucket-regex", required=True, help="Regex for corporate S3 standards")
# Made --input optional to support auto-discovery
parser.add_argument("--input", help="JSON audit file (default: latest audit_results.check_config.*.json)")
# Default regex set to '-org-'
parser.add_argument("--central-bucket-regex", default="-org-", help="Regex for corporate S3 standards")
args = parser.parse_args()

input_file = args.input
if not input_file:
input_file = find_latest_file("audit_results.check_config.*.json")
if not input_file:
print("Error: No input file provided and no recent check_config JSON found.")
sys.exit(1)
print(f"Auto-discovered latest input: {input_file}")

try:
with open(args.input, 'r') as f:
with open(input_file, 'r') as f:
data = json.load(f)
except Exception as e:
print(f"Error: {e}"); sys.exit(1)
print(f"Error reading {input_file}: {e}"); sys.exit(1)

org_id = data[0].get("org_id", "Unknown") if data else "Unknown"

print("-" * 125)
print(f"AWS CONFIG ASSESSMENT | Org ID: {org_id}")
print(f"AWS CONFIG ASSESSMENT | Org ID: {org_id} | Pattern: {args.central_bucket_regex}")
print("-" * 125)
print(f"{'Account ID':<15} | {'OU Path':<25} | {'Global Status':<12} | {'S3 Compliance'}")
print("-" * 125)
Expand Down Expand Up @@ -52,11 +73,11 @@ def main():

size_gb = total_stats["size_bytes"] / (1024**3)
print("-" * 125)
print(f"ORGANIZATION STORAGE SUMMARY (CONFIG) | Org ID: {org_id}")
print(f" Total S3 Objects: {total_stats['objects']:,}")
print(f"SUMMARY (CONFIG) | Org ID: {org_id}")
print(f" Total S3 Storage: {size_gb:.2f} GB")
print(f" Non-Central Buckets: {len(unique_non_central)}")
print("-" * 125)

if __name__ == "__main__":
main()

0 comments on commit 4e21bee

Please sign in to comment.