Skip to content

Commit

Permalink
add header, count index of rules
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Mar 19, 2026
1 parent d6c2a09 commit 7188ff6
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions local-app/python-tools/cross-organization/purge_sg_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,92 @@
from botocore.exceptions import ClientError

# --- VERSIONING ---
__version__ = "1.2.0"
__version__ = "1.2.2"

def get_prefix_list_info(ec2_client, pl_id):
"""Looks up Prefix List metadata."""
try:
resp = ec2_client.describe_managed_prefix_lists(PrefixListIds=[pl_id])
if resp['PrefixLists']:
pl = resp['PrefixLists'][0]
# MaxEntries is the 'weight' against SG limits
return f"{pl['PrefixListName']} ({pl['MaxEntries']} entries)"
except Exception:
pass
return "Unknown Prefix List"

def format_rule(ec2_client, rule, direction):
"""Formats a rule with enhanced Prefix List details."""
def format_rule(ec2_client, rule, direction, index):
"""Formats a rule with an index and enhanced Prefix List details."""
proto = rule.get('IpProtocol', 'all')
from_port = rule.get('FromPort', 'all')
to_port = rule.get('ToPort', 'all')
port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port

targets = []
# 1. CIDR Blocks
for ip in rule.get('IpRanges', []):
targets.append(f"{ip['CidrIp']}{' ('+ip['Description']+')' if 'Description' in ip else ''}")

# 2. Security Groups
for pair in rule.get('UserIdGroupPairs', []):
targets.append(f"{pair['GroupId']}{' ('+pair['Description']+')' if 'Description' in pair else ''}")

# 3. Managed Prefix Lists
for pl in rule.get('PrefixListIds', []):
pl_id = pl['PrefixListId']
pl_meta = get_prefix_list_info(ec2_client, pl_id)
targets.append(f"{pl_id} [{pl_meta}]{' ('+pl['Description']+')' if 'Description' in pl else ''}")

target_str = ", ".join(targets)
return f" [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}"
return f" {index:>2}. [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}"

def process_sg(group_id, region, profile, dry_run=True, list_only=False):
session = boto3.Session(profile_name=profile, region_name=region)
ec2 = session.client('ec2')

print("-" * 100)
mode = "LIST MODE" if list_only else "PURGE MODE"
print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}")
print("-" * 100)

try:
response = ec2.describe_security_groups(GroupIds=[group_id])
sg = response['SecurityGroups'][0]

# Extract Metadata for Header
tags = {t['Key']: t['Value'] for t in sg.get('Tags', [])}
sg_name = tags.get('Name', 'N/A')

print("-" * 110)
mode = "LIST MODE" if list_only else "PURGE MODE"
print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}")
print(f"NAME: {sg_name}")
if tags:
tag_str = ", ".join([f"{k}:{v}" for k,v in tags.items() if k != 'Name'])
print(f"TAGS: {tag_str}")
print("-" * 110)

ingress, egress = sg.get('IpPermissions', []), sg.get('IpPermissionsEgress', [])

if not ingress and not egress:
print(" No rules found.")
return

print("\nCURRENT RULES:")
for rule in ingress: print(format_rule(ec2, rule, "ingress"))
for rule in egress: print(format_rule(ec2, rule, "egress"))
if ingress:
print("\nINGRESS RULES:")
for i, rule in enumerate(ingress, 1):
print(format_rule(ec2, rule, "ingress", i))

if egress:
print("\nEGRESS RULES:")
for i, rule in enumerate(egress, 1):
print(format_rule(ec2, rule, "egress", i))

# Footer Summary
print("\n" + "-" * 40)
print(f"SUMMARY OF RULES FOUND:")
print(f" Total Ingress: {len(ingress)}")
print(f" Total Egress: {len(egress)}")
print("-" * 40)

if list_only:
return

if dry_run:
print("\n[DRY-RUN] No changes will be made.")
print("\n[DRY-RUN] Verification complete. No changes were made.")
return

confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ")
total = len(ingress) + len(egress)
confirm = input(f"\nCONFIRM: Purge all {total} rule sets? (y/n): ")
if confirm.lower() == 'y':
if ingress: ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress)
if egress: ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress)
Expand Down

0 comments on commit 7188ff6

Please sign in to comment.