diff --git a/local-app/python-tools/cross-organization/assess_security_groups.py b/local-app/python-tools/cross-organization/assess_security_groups.py index 279195d4..6a0769c9 100755 --- a/local-app/python-tools/cross-organization/assess_security_groups.py +++ b/local-app/python-tools/cross-organization/assess_security_groups.py @@ -6,15 +6,15 @@ from collections import defaultdict # --- VERSIONING --- -__version__ = "1.0.0" +__version__ = "1.1.0" def generate_rule_hash(sg_data): """ Creates a unique hash based on the ingress and egress rules. Sorts rules to ensure consistent hashing regardless of order. """ - # Helper to serialize rules for hashing def serialize(rules): + # Sort keys and rules to ensure the hash is identical for the same rules return json.dumps(rules, sort_keys=True) ingress = serialize(sg_data.get('ip_permissions', [])) @@ -25,7 +25,7 @@ def serialize(rules): def main(): print("-" * 100) - print(f"SECURITY GROUP DUPLICATION ASSESSOR - v{__version__}") + print(f"SECURITY GROUP CLUSTER ANALYSER - v{__version__}") print("-" * 100) # hash -> list of group metadata @@ -40,47 +40,54 @@ def main(): try: data = json.load(f) rule_hash = generate_rule_hash(data) + + # Capture metadata for each instance clusters[rule_hash].append({ - "account": data['account_id'], - "region": data['region'], - "name": data['group_name'], - "id": data['group_id'], - "vpc": data['vpc_id'] + "account": data.get('account_id'), + "region": data.get('region'), + "name": data.get('group_name'), + "id": data.get('group_id'), + "vpc": data.get('vpc_id'), + "path": file_path }) except Exception as e: print(f" Error processing {file_path}: {e}") # Reporting Logic - duplicates = {k: v for k, v in clusters.items() if len(v) > 1} - print(f"Total Security Groups Scanned: {total_files}") - print(f"Unique Rule Sets Found: {len(clusters)}") - print(f"Identified Clusters (Dupes): {len(duplicates)}") + print(f"Unique Rule Sets Discovered: {len(clusters)}") print("-" * 100) # Sort clusters by size (most frequent first) - sorted_clusters = sorted(duplicates.items(), key=lambda x: len(x[1]), reverse=True) + sorted_clusters = sorted(clusters.items(), key=lambda x: len(x[1]), reverse=True) + # Display the Top 20 most frequent rule sets for i, (r_hash, instances) in enumerate(sorted_clusters[:20], 1): - print(f"\nCLUSTER #{i} | Unique Rule Hash: {r_hash[:12]}") - print(f" Found {len(instances)} identical instances across accounts.") + # Reference the first instance found as the "Source" + ref = instances[0] + + status = "DUPLICATED" if len(instances) > 1 else "UNIQUE" + + print(f"\n[{status}] HASH: {r_hash[:12]}... ({len(instances)} instances)") + print(f" Example Source: {ref['account']} / {ref['region']} / {ref['id']}") + print(f" Example Name: {ref['name']}") - # Group by account for the summary - acc_counts = defaultdict(int) - for inst in instances: - acc_counts[inst['account']] += 1 + if len(instances) > 1: + # Group by account for the summary + acc_counts = defaultdict(int) + for inst in instances: + acc_counts[inst['account']] += 1 - print(f" Account Distribution:") - for acc, count in acc_counts.items(): - print(f" - {acc}: {count} instances") + acc_summary = ", ".join([f"{acc}({count})" for acc, count in acc_counts.items()]) + print(f" Distribution: {acc_summary}") # Save detailed report to JSON - report_file = "sg_duplicate_report.json" + report_file = "sg_cluster_report.json" with open(report_file, 'w') as rf: json.dump(clusters, rf, indent=2) print("\n" + "=" * 100) - print(f"DETAILED CLUSTER DATA SAVED TO: {report_file}") + print(f"FULL CLUSTER REPORT (800+ GROUPS) SAVED TO: {report_file}") print("=" * 100) if __name__ == "__main__": diff --git a/local-app/python-tools/cross-organization/generate_fms_payload.py b/local-app/python-tools/cross-organization/generate_fms_payload.py new file mode 100644 index 00000000..b585e612 --- /dev/null +++ b/local-app/python-tools/cross-organization/generate_fms_payload.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +import json +import os +import sys + +# --- VERSIONING --- +__version__ = "1.0.0" + +def main(): + report_file = "sg_cluster_report.json" + if not os.path.exists(report_file): + print(f"Error: {report_file} not found. Run assess_security_groups.py first.") + sys.exit(1) + + with open(report_file, 'r') as f: + clusters = json.load(f) + + # Sort clusters by size to show the best candidates for FMS first + sorted_hashes = sorted(clusters.keys(), key=lambda x: len(clusters[x]), reverse=True) + + print("-" * 80) + print(f"FMS POLICY PAYLOAD GENERATOR - v{__version__}") + print("-" * 80) + print(f"{'#':<3} | {'Hash ID':<15} | {'Instances':<10} | {'Example Name'}") + + for i, h in enumerate(sorted_hashes[:10], 1): + example = clusters[h][0] + print(f"{i:<3} | {h[:12]:<15} | {len(clusters[h]):<10} | {example['name']}") + + choice = input("\nSelect a Cluster Number to generate FMS payload: ") + try: + selected_hash = sorted_hashes[int(choice)-1] + # Load the raw SG data from the example path + example_path = clusters[selected_hash][0]['path'] + with open(example_path, 'r') as f: + sg_data = json.load(f) + except (ValueError, IndexError): + print("Invalid selection."); sys.exit(1) + + # Construct the FMS ManagedServiceData JSON + # This matches the 'SECURITY_GROUPS_CONTENT_AUDIT' type requirement + fms_payload = { + "type": "SECURITY_GROUPS_CONTENT_AUDIT", + "securityGroups": [ + { + "id": "REPLACE_WITH_MASTER_SG_ID" + } + ], + "securityGroupAction": { + "type": "ALLOW_ONLY_MANAGED_RULES" + } + } + + print("\n" + "="*80) + print("TERRAFORM FMS POLICY HCL SNIPPET") + print("="*80) + print(f""" +resource "aws_fms_policy" "remediated_policy" {{ + name = "FMS-Policy-{selected_hash[:8]}" + resource_type = "AWS::EC2::SecurityGroup" + remediation_enabled = false # Set to true after verifying audit results + + security_service_policy_data {{ + type = "SECURITY_GROUPS_CONTENT_AUDIT" + managed_service_data = jsonencode({json.dumps(fms_payload, indent=2)}) + }} + + include_map {{ + account = {json.dumps([inst['account'] for inst in clusters[selected_hash]])} + }} +}} +""") + print("="*80) + print("NOTE: You must create a 'Master SG' in your FMS Admin account with the") + print("rules found in the JSON audit, then replace REPLACE_WITH_MASTER_SG_ID.") + +if __name__ == "__main__": + main()