From d6c2a09594b3a7dbcba3d704f5e3396b9373ee26 Mon Sep 17 00:00:00 2001 From: badra001 Date: Thu, 19 Mar 2026 16:11:05 -0400 Subject: [PATCH] add --list, lookup prefix list --- .../cross-organization/purge_sg_rules.py | 87 ++++++++++--------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/local-app/python-tools/cross-organization/purge_sg_rules.py b/local-app/python-tools/cross-organization/purge_sg_rules.py index 1f8d96e4..718c4c6d 100755 --- a/local-app/python-tools/cross-organization/purge_sg_rules.py +++ b/local-app/python-tools/cross-organization/purge_sg_rules.py @@ -2,88 +2,95 @@ import boto3 import argparse import sys -import json from botocore.exceptions import ClientError # --- VERSIONING --- -__version__ = "1.1.0" +__version__ = "1.2.0" -def format_rule(rule, direction): - """Formats a rule for clear terminal output.""" +def get_prefix_list_info(ec2_client, pl_id): + """Looks up Prefix List metadata.""" + try: + resp = ec2_client.describe_managed_prefix_lists(PrefixListIds=[pl_id]) + if resp['PrefixLists']: + pl = resp['PrefixLists'][0] + # MaxEntries is the 'weight' against SG limits + return f"{pl['PrefixListName']} ({pl['MaxEntries']} entries)" + except Exception: + pass + return "Unknown Prefix List" + +def format_rule(ec2_client, rule, direction): + """Formats a rule with enhanced Prefix List details.""" proto = rule.get('IpProtocol', 'all') from_port = rule.get('FromPort', 'all') to_port = rule.get('ToPort', 'all') port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port targets = [] - # Handle CIDR blocks + # 1. CIDR Blocks for ip in rule.get('IpRanges', []): - desc = f" ({ip['Description']})" if 'Description' in ip else "" - targets.append(f"{ip['CidrIp']}{desc}") - # Handle Security Group references + targets.append(f"{ip['CidrIp']}{' ('+ip['Description']+')' if 'Description' in ip else ''}") + + # 2. Security Groups for pair in rule.get('UserIdGroupPairs', []): - desc = f" ({pair['Description']})" if 'Description' in pair else "" - targets.append(f"{pair['GroupId']}{desc}") + targets.append(f"{pair['GroupId']}{' ('+pair['Description']+')' if 'Description' in pair else ''}") + + # 3. Managed Prefix Lists + for pl in rule.get('PrefixListIds', []): + pl_id = pl['PrefixListId'] + pl_meta = get_prefix_list_info(ec2_client, pl_id) + targets.append(f"{pl_id} [{pl_meta}]{' ('+pl['Description']+')' if 'Description' in pl else ''}") target_str = ", ".join(targets) return f" [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}" -def purge_rules(group_id, region, profile, dry_run=True): +def process_sg(group_id, region, profile, dry_run=True, list_only=False): session = boto3.Session(profile_name=profile, region_name=region) ec2 = session.client('ec2') print("-" * 100) - print(f"SG RULE PURGER v{__version__} | {group_id} | {region} | Profile: {profile}") + mode = "LIST MODE" if list_only else "PURGE MODE" + print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}") print("-" * 100) try: response = ec2.describe_security_groups(GroupIds=[group_id]) sg = response['SecurityGroups'][0] - - ingress = sg.get('IpPermissions', []) - egress = sg.get('IpPermissionsEgress', []) + ingress, egress = sg.get('IpPermissions', []), sg.get('IpPermissionsEgress', []) if not ingress and not egress: - print(" No rules found. Nothing to purge.") + print(" No rules found.") return - # --- PREVIEW RULES --- - print("\nIDENTIFIED RULES FOR REMOVAL:") - for rule in ingress: - print(format_rule(rule, "ingress")) - for rule in egress: - print(format_rule(rule, "egress")) + print("\nCURRENT RULES:") + for rule in ingress: print(format_rule(ec2, rule, "ingress")) + for rule in egress: print(format_rule(ec2, rule, "egress")) - if dry_run: - print("\n[DRY-RUN] Verification complete. No changes were made.") + if list_only: return - # --- EXECUTE PURGE --- - confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ") - if confirm.lower() != 'y': - print("Aborted.") + if dry_run: + print("\n[DRY-RUN] No changes will be made.") return - if ingress: - ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress) - print(" Successfully revoked all ingress rules.") - if egress: - ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress) - print(" Successfully revoked all egress rules.") + confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ") + if confirm.lower() == 'y': + if ingress: ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress) + if egress: ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress) + print(" Successfully revoked all rules.") except ClientError as e: - print(f"Error: {e}") - sys.exit(1) + print(f"Error: {e}"); sys.exit(1) def main(): parser = argparse.ArgumentParser(description=f"SG Rule Purger v{__version__}") - parser.add_argument("--group-id", required=True, help="SG ID (sg-xxxxxxxx)") + parser.add_argument("--group-id", required=True, help="SG ID") parser.add_argument("--region", required=True, help="AWS Region") parser.add_argument("--profile", required=True, help="AWS Profile") parser.add_argument("--dry-run", action="store_true", help="Perform a dry run") + parser.add_argument("--list", action="store_true", help="Only list rules, do not purge") args = parser.parse_args() - purge_rules(args.group_id, args.region, args.profile, dry_run=args.dry_run) + process_sg(args.group_id, args.region, args.profile, dry_run=args.dry_run, list_only=args.list) -if __name__ == "__main__": - main() +if __name__ == "__main__": main()