diff --git a/local-app/python-tools/cross-organization/purge_sg_rules.py b/local-app/python-tools/cross-organization/purge_sg_rules.py index 718c4c6d..c63ca657 100755 --- a/local-app/python-tools/cross-organization/purge_sg_rules.py +++ b/local-app/python-tools/cross-organization/purge_sg_rules.py @@ -5,7 +5,7 @@ from botocore.exceptions import ClientError # --- VERSIONING --- -__version__ = "1.2.0" +__version__ = "1.2.2" def get_prefix_list_info(ec2_client, pl_id): """Looks up Prefix List metadata.""" @@ -13,67 +13,84 @@ def get_prefix_list_info(ec2_client, pl_id): resp = ec2_client.describe_managed_prefix_lists(PrefixListIds=[pl_id]) if resp['PrefixLists']: pl = resp['PrefixLists'][0] - # MaxEntries is the 'weight' against SG limits return f"{pl['PrefixListName']} ({pl['MaxEntries']} entries)" except Exception: pass return "Unknown Prefix List" -def format_rule(ec2_client, rule, direction): - """Formats a rule with enhanced Prefix List details.""" +def format_rule(ec2_client, rule, direction, index): + """Formats a rule with an index and enhanced Prefix List details.""" proto = rule.get('IpProtocol', 'all') from_port = rule.get('FromPort', 'all') to_port = rule.get('ToPort', 'all') port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port targets = [] - # 1. CIDR Blocks for ip in rule.get('IpRanges', []): targets.append(f"{ip['CidrIp']}{' ('+ip['Description']+')' if 'Description' in ip else ''}") - - # 2. Security Groups for pair in rule.get('UserIdGroupPairs', []): targets.append(f"{pair['GroupId']}{' ('+pair['Description']+')' if 'Description' in pair else ''}") - - # 3. Managed Prefix Lists for pl in rule.get('PrefixListIds', []): pl_id = pl['PrefixListId'] pl_meta = get_prefix_list_info(ec2_client, pl_id) targets.append(f"{pl_id} [{pl_meta}]{' ('+pl['Description']+')' if 'Description' in pl else ''}") target_str = ", ".join(targets) - return f" [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}" + return f" {index:>2}. [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}" def process_sg(group_id, region, profile, dry_run=True, list_only=False): session = boto3.Session(profile_name=profile, region_name=region) ec2 = session.client('ec2') - print("-" * 100) - mode = "LIST MODE" if list_only else "PURGE MODE" - print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}") - print("-" * 100) - try: response = ec2.describe_security_groups(GroupIds=[group_id]) sg = response['SecurityGroups'][0] + + # Extract Metadata for Header + tags = {t['Key']: t['Value'] for t in sg.get('Tags', [])} + sg_name = tags.get('Name', 'N/A') + + print("-" * 110) + mode = "LIST MODE" if list_only else "PURGE MODE" + print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}") + print(f"NAME: {sg_name}") + if tags: + tag_str = ", ".join([f"{k}:{v}" for k,v in tags.items() if k != 'Name']) + print(f"TAGS: {tag_str}") + print("-" * 110) + ingress, egress = sg.get('IpPermissions', []), sg.get('IpPermissionsEgress', []) if not ingress and not egress: print(" No rules found.") return - print("\nCURRENT RULES:") - for rule in ingress: print(format_rule(ec2, rule, "ingress")) - for rule in egress: print(format_rule(ec2, rule, "egress")) + if ingress: + print("\nINGRESS RULES:") + for i, rule in enumerate(ingress, 1): + print(format_rule(ec2, rule, "ingress", i)) + + if egress: + print("\nEGRESS RULES:") + for i, rule in enumerate(egress, 1): + print(format_rule(ec2, rule, "egress", i)) + + # Footer Summary + print("\n" + "-" * 40) + print(f"SUMMARY OF RULES FOUND:") + print(f" Total Ingress: {len(ingress)}") + print(f" Total Egress: {len(egress)}") + print("-" * 40) if list_only: return if dry_run: - print("\n[DRY-RUN] No changes will be made.") + print("\n[DRY-RUN] Verification complete. No changes were made.") return - confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ") + total = len(ingress) + len(egress) + confirm = input(f"\nCONFIRM: Purge all {total} rule sets? (y/n): ") if confirm.lower() == 'y': if ingress: ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress) if egress: ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress)