Skip to content

Commit

Permalink
add --list, lookup prefix list
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Mar 19, 2026
1 parent b565750 commit d6c2a09
Showing 1 changed file with 47 additions and 40 deletions.
87 changes: 47 additions & 40 deletions local-app/python-tools/cross-organization/purge_sg_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,88 +2,95 @@
import boto3
import argparse
import sys
import json
from botocore.exceptions import ClientError

# --- VERSIONING ---
__version__ = "1.1.0"
__version__ = "1.2.0"

def format_rule(rule, direction):
"""Formats a rule for clear terminal output."""
def get_prefix_list_info(ec2_client, pl_id):
"""Looks up Prefix List metadata."""
try:
resp = ec2_client.describe_managed_prefix_lists(PrefixListIds=[pl_id])
if resp['PrefixLists']:
pl = resp['PrefixLists'][0]
# MaxEntries is the 'weight' against SG limits
return f"{pl['PrefixListName']} ({pl['MaxEntries']} entries)"
except Exception:
pass
return "Unknown Prefix List"

def format_rule(ec2_client, rule, direction):
"""Formats a rule with enhanced Prefix List details."""
proto = rule.get('IpProtocol', 'all')
from_port = rule.get('FromPort', 'all')
to_port = rule.get('ToPort', 'all')
port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port

targets = []
# Handle CIDR blocks
# 1. CIDR Blocks
for ip in rule.get('IpRanges', []):
desc = f" ({ip['Description']})" if 'Description' in ip else ""
targets.append(f"{ip['CidrIp']}{desc}")
# Handle Security Group references
targets.append(f"{ip['CidrIp']}{' ('+ip['Description']+')' if 'Description' in ip else ''}")

# 2. Security Groups
for pair in rule.get('UserIdGroupPairs', []):
desc = f" ({pair['Description']})" if 'Description' in pair else ""
targets.append(f"{pair['GroupId']}{desc}")
targets.append(f"{pair['GroupId']}{' ('+pair['Description']+')' if 'Description' in pair else ''}")

# 3. Managed Prefix Lists
for pl in rule.get('PrefixListIds', []):
pl_id = pl['PrefixListId']
pl_meta = get_prefix_list_info(ec2_client, pl_id)
targets.append(f"{pl_id} [{pl_meta}]{' ('+pl['Description']+')' if 'Description' in pl else ''}")

target_str = ", ".join(targets)
return f" [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}"

def purge_rules(group_id, region, profile, dry_run=True):
def process_sg(group_id, region, profile, dry_run=True, list_only=False):
session = boto3.Session(profile_name=profile, region_name=region)
ec2 = session.client('ec2')

print("-" * 100)
print(f"SG RULE PURGER v{__version__} | {group_id} | {region} | Profile: {profile}")
mode = "LIST MODE" if list_only else "PURGE MODE"
print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}")
print("-" * 100)

try:
response = ec2.describe_security_groups(GroupIds=[group_id])
sg = response['SecurityGroups'][0]

ingress = sg.get('IpPermissions', [])
egress = sg.get('IpPermissionsEgress', [])
ingress, egress = sg.get('IpPermissions', []), sg.get('IpPermissionsEgress', [])

if not ingress and not egress:
print(" No rules found. Nothing to purge.")
print(" No rules found.")
return

# --- PREVIEW RULES ---
print("\nIDENTIFIED RULES FOR REMOVAL:")
for rule in ingress:
print(format_rule(rule, "ingress"))
for rule in egress:
print(format_rule(rule, "egress"))
print("\nCURRENT RULES:")
for rule in ingress: print(format_rule(ec2, rule, "ingress"))
for rule in egress: print(format_rule(ec2, rule, "egress"))

if dry_run:
print("\n[DRY-RUN] Verification complete. No changes were made.")
if list_only:
return

# --- EXECUTE PURGE ---
confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ")
if confirm.lower() != 'y':
print("Aborted.")
if dry_run:
print("\n[DRY-RUN] No changes will be made.")
return

if ingress:
ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress)
print(" Successfully revoked all ingress rules.")
if egress:
ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress)
print(" Successfully revoked all egress rules.")
confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ")
if confirm.lower() == 'y':
if ingress: ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress)
if egress: ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress)
print(" Successfully revoked all rules.")

except ClientError as e:
print(f"Error: {e}")
sys.exit(1)
print(f"Error: {e}"); sys.exit(1)

def main():
parser = argparse.ArgumentParser(description=f"SG Rule Purger v{__version__}")
parser.add_argument("--group-id", required=True, help="SG ID (sg-xxxxxxxx)")
parser.add_argument("--group-id", required=True, help="SG ID")
parser.add_argument("--region", required=True, help="AWS Region")
parser.add_argument("--profile", required=True, help="AWS Profile")
parser.add_argument("--dry-run", action="store_true", help="Perform a dry run")
parser.add_argument("--list", action="store_true", help="Only list rules, do not purge")

args = parser.parse_args()
purge_rules(args.group_id, args.region, args.profile, dry_run=args.dry_run)
process_sg(args.group_id, args.region, args.profile, dry_run=args.dry_run, list_only=args.list)

if __name__ == "__main__":
main()
if __name__ == "__main__": main()

0 comments on commit d6c2a09

Please sign in to comment.