Skip to content

Commit

Permalink
output rules as they are being removed
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Mar 19, 2026
1 parent 77e2865 commit b565750
Showing 1 changed file with 56 additions and 40 deletions.
96 changes: 56 additions & 40 deletions local-app/python-tools/cross-organization/purge_sg_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,88 @@
import boto3
import argparse
import sys
import json
from botocore.exceptions import ClientError

# --- VERSIONING ---
__version__ = "1.0.0"
__version__ = "1.1.0"

def format_rule(rule, direction):
"""Formats a rule for clear terminal output."""
proto = rule.get('IpProtocol', 'all')
from_port = rule.get('FromPort', 'all')
to_port = rule.get('ToPort', 'all')
port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port

targets = []
# Handle CIDR blocks
for ip in rule.get('IpRanges', []):
desc = f" ({ip['Description']})" if 'Description' in ip else ""
targets.append(f"{ip['CidrIp']}{desc}")
# Handle Security Group references
for pair in rule.get('UserIdGroupPairs', []):
desc = f" ({pair['Description']})" if 'Description' in pair else ""
targets.append(f"{pair['GroupId']}{desc}")

target_str = ", ".join(targets)
return f" [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}"

def purge_rules(group_id, region, profile, dry_run=True):
"""
Removes all ingress and egress rules from the specified Security Group.
"""
session = boto3.Session(profile_name=profile, region_name=region)
ec2 = session.client('ec2')

print(f"\n--- Purging Rules for {group_id} in {region} (Profile: {profile}) ---")
if dry_run:
print("[DRY-RUN] No changes will be made.")
print("-" * 100)
print(f"SG RULE PURGER v{__version__} | {group_id} | {region} | Profile: {profile}")
print("-" * 100)

try:
# 1. Describe the group to get current rules
response = ec2.describe_security_groups(GroupIds=[group_id])
sg = response['SecurityGroups'][0]

ingress = sg.get('IpPermissions', [])
egress = sg.get('IpPermissionsEgress', [])

# 2. Revoke Ingress
if ingress:
print(f" Found {len(ingress)} ingress rule sets. Revoking...")
if not dry_run:
ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress)
print(" Successfully revoked all ingress rules.")
else:
print(" No ingress rules found.")
if not ingress and not egress:
print(" No rules found. Nothing to purge.")
return

# --- PREVIEW RULES ---
print("\nIDENTIFIED RULES FOR REMOVAL:")
for rule in ingress:
print(format_rule(rule, "ingress"))
for rule in egress:
print(format_rule(rule, "egress"))

# 3. Revoke Egress
if dry_run:
print("\n[DRY-RUN] Verification complete. No changes were made.")
return

# --- EXECUTE PURGE ---
confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ")
if confirm.lower() != 'y':
print("Aborted.")
return

if ingress:
ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress)
print(" Successfully revoked all ingress rules.")
if egress:
print(f" Found {len(egress)} egress rule sets. Revoking...")
if not dry_run:
ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress)
print(" Successfully revoked all egress rules.")
else:
print(" No egress rules found.")
ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress)
print(" Successfully revoked all egress rules.")

except ClientError as e:
if e.response['Error']['Code'] == 'DryRunOperation':
print("[DRY-RUN] Permission check successful.")
else:
print(f"Error: {e}")
sys.exit(1)
print(f"Error: {e}")
sys.exit(1)

def main():
parser = argparse.ArgumentParser(description=f"SG Rule Purger v{__version__}")
parser.add_argument("--group-id", required=True, help="The ID of the security group (sg-xxxxxxxx)")
parser.add_argument("--region", required=True, help="AWS Region (e.g., us-east-1)")
parser.add_argument("--profile", required=True, help="AWS CLI Profile name")
parser.add_argument("--dry-run", action="store_true", default=False, help="Perform a dry run")
parser.add_argument("--group-id", required=True, help="SG ID (sg-xxxxxxxx)")
parser.add_argument("--region", required=True, help="AWS Region")
parser.add_argument("--profile", required=True, help="AWS Profile")
parser.add_argument("--dry-run", action="store_true", help="Perform a dry run")

args = parser.parse_args()

# If --dry-run is not passed, it defaults to False.
# But for safety, I've coded the function to default to True unless explicitly told otherwise.
purge_rules(args.group_id, args.region, args.profile, dry_run=args.dry_run)

if not args.dry_run:
print("\nPurge complete.")
else:
print("\nDry run finished. Use the same command without --dry-run to execute.")

if __name__ == "__main__":
main()

0 comments on commit b565750

Please sign in to comment.