-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
87 additions
and
0 deletions.
There are no files selected for viewing
87 changes: 87 additions & 0 deletions
87
local-app/python-tools/cross-organization/assess_security_groups.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| #!/usr/bin/env python | ||
| import json | ||
| import os | ||
| import glob | ||
| import hashlib | ||
| from collections import defaultdict | ||
|
|
||
| # --- VERSIONING --- | ||
| __version__ = "1.0.0" | ||
|
|
||
| def generate_rule_hash(sg_data): | ||
| """ | ||
| Creates a unique hash based on the ingress and egress rules. | ||
| Sorts rules to ensure consistent hashing regardless of order. | ||
| """ | ||
| # Helper to serialize rules for hashing | ||
| def serialize(rules): | ||
| return json.dumps(rules, sort_keys=True) | ||
|
|
||
| ingress = serialize(sg_data.get('ip_permissions', [])) | ||
| egress = serialize(sg_data.get('ip_permissions_egress', [])) | ||
|
|
||
| combined = f"INGRESS:{ingress}|EGRESS:{egress}" | ||
| return hashlib.sha256(combined.encode()).hexdigest() | ||
|
|
||
| def main(): | ||
| print("-" * 100) | ||
| print(f"SECURITY GROUP DUPLICATION ASSESSOR - v{__version__}") | ||
| print("-" * 100) | ||
|
|
||
| # hash -> list of group metadata | ||
| clusters = defaultdict(list) | ||
| total_files = 0 | ||
|
|
||
| # Crawl the security_groups directory structure | ||
| search_path = os.path.join("security_groups", "**", "*.json") | ||
| for file_path in glob.iglob(search_path, recursive=True): | ||
| total_files += 1 | ||
| with open(file_path, 'r') as f: | ||
| try: | ||
| data = json.load(f) | ||
| rule_hash = generate_rule_hash(data) | ||
| clusters[rule_hash].append({ | ||
| "account": data['account_id'], | ||
| "region": data['region'], | ||
| "name": data['group_name'], | ||
| "id": data['group_id'], | ||
| "vpc": data['vpc_id'] | ||
| }) | ||
| except Exception as e: | ||
| print(f" Error processing {file_path}: {e}") | ||
|
|
||
| # Reporting Logic | ||
| duplicates = {k: v for k, v in clusters.items() if len(v) > 1} | ||
|
|
||
| print(f"Total Security Groups Scanned: {total_files}") | ||
| print(f"Unique Rule Sets Found: {len(clusters)}") | ||
| print(f"Identified Clusters (Dupes): {len(duplicates)}") | ||
| print("-" * 100) | ||
|
|
||
| # Sort clusters by size (most frequent first) | ||
| sorted_clusters = sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True) | ||
|
|
||
| for i, (r_hash, instances) in enumerate(sorted_clusters[:20], 1): | ||
| print(f"\nCLUSTER #{i} | Unique Rule Hash: {r_hash[:12]}") | ||
| print(f" Found {len(instances)} identical instances across accounts.") | ||
|
|
||
| # Group by account for the summary | ||
| acc_counts = defaultdict(int) | ||
| for inst in instances: | ||
| acc_counts[inst['account']] += 1 | ||
|
|
||
| print(f" Account Distribution:") | ||
| for acc, count in acc_counts.items(): | ||
| print(f" - {acc}: {count} instances") | ||
|
|
||
| # Save detailed report to JSON | ||
| report_file = "sg_duplicate_report.json" | ||
| with open(report_file, 'w') as rf: | ||
| json.dump(clusters, rf, indent=2) | ||
|
|
||
| print("\n" + "=" * 100) | ||
| print(f"DETAILED CLUSTER DATA SAVED TO: {report_file}") | ||
| print("=" * 100) | ||
|
|
||
| if __name__ == "__main__": | ||
| main() |