From 692b2cf3d32f6b90ae7a6011b3e311af08ac5cbe Mon Sep 17 00:00:00 2001 From: badra001 Date: Wed, 18 Mar 2026 13:09:22 -0400 Subject: [PATCH] initial --- .../assess_security_groups.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100755 local-app/python-tools/cross-organization/assess_security_groups.py diff --git a/local-app/python-tools/cross-organization/assess_security_groups.py b/local-app/python-tools/cross-organization/assess_security_groups.py new file mode 100755 index 00000000..279195d4 --- /dev/null +++ b/local-app/python-tools/cross-organization/assess_security_groups.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +import json +import os +import glob +import hashlib +from collections import defaultdict + +# --- VERSIONING --- +__version__ = "1.0.0" + +def generate_rule_hash(sg_data): + """ + Creates a unique hash based on the ingress and egress rules. + Sorts rules to ensure consistent hashing regardless of order. + """ + # Helper to serialize rules for hashing + def serialize(rules): + return json.dumps(rules, sort_keys=True) + + ingress = serialize(sg_data.get('ip_permissions', [])) + egress = serialize(sg_data.get('ip_permissions_egress', [])) + + combined = f"INGRESS:{ingress}|EGRESS:{egress}" + return hashlib.sha256(combined.encode()).hexdigest() + +def main(): + print("-" * 100) + print(f"SECURITY GROUP DUPLICATION ASSESSOR - v{__version__}") + print("-" * 100) + + # hash -> list of group metadata + clusters = defaultdict(list) + total_files = 0 + + # Crawl the security_groups directory structure + search_path = os.path.join("security_groups", "**", "*.json") + for file_path in glob.iglob(search_path, recursive=True): + total_files += 1 + with open(file_path, 'r') as f: + try: + data = json.load(f) + rule_hash = generate_rule_hash(data) + clusters[rule_hash].append({ + "account": data['account_id'], + "region": data['region'], + "name": data['group_name'], + "id": data['group_id'], + "vpc": data['vpc_id'] + }) + except Exception as e: + print(f" Error processing {file_path}: {e}") + + # Reporting Logic + duplicates = {k: v for k, v in clusters.items() if len(v) > 1} + + print(f"Total Security Groups Scanned: {total_files}") + print(f"Unique Rule Sets Found: {len(clusters)}") + print(f"Identified Clusters (Dupes): {len(duplicates)}") + print("-" * 100) + + # Sort clusters by size (most frequent first) + sorted_clusters = sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True) + + for i, (r_hash, instances) in enumerate(sorted_clusters[:20], 1): + print(f"\nCLUSTER #{i} | Unique Rule Hash: {r_hash[:12]}") + print(f" Found {len(instances)} identical instances across accounts.") + + # Group by account for the summary + acc_counts = defaultdict(int) + for inst in instances: + acc_counts[inst['account']] += 1 + + print(f" Account Distribution:") + for acc, count in acc_counts.items(): + print(f" - {acc}: {count} instances") + + # Save detailed report to JSON + report_file = "sg_duplicate_report.json" + with open(report_file, 'w') as rf: + json.dump(clusters, rf, indent=2) + + print("\n" + "=" * 100) + print(f"DETAILED CLUSTER DATA SAVED TO: {report_file}") + print("=" * 100) + +if __name__ == "__main__": + main()