From b565750c335c08c28a279ef3f2dfdc4c04ceae0b Mon Sep 17 00:00:00 2001 From: badra001 Date: Thu, 19 Mar 2026 16:07:50 -0400 Subject: [PATCH] output rules as they are being removed --- .../cross-organization/purge_sg_rules.py | 96 +++++++++++-------- 1 file changed, 56 insertions(+), 40 deletions(-) diff --git a/local-app/python-tools/cross-organization/purge_sg_rules.py b/local-app/python-tools/cross-organization/purge_sg_rules.py index 527fefdf..1f8d96e4 100755 --- a/local-app/python-tools/cross-organization/purge_sg_rules.py +++ b/local-app/python-tools/cross-organization/purge_sg_rules.py @@ -2,72 +2,88 @@ import boto3 import argparse import sys +import json from botocore.exceptions import ClientError # --- VERSIONING --- -__version__ = "1.0.0" +__version__ = "1.1.0" + +def format_rule(rule, direction): + """Formats a rule for clear terminal output.""" + proto = rule.get('IpProtocol', 'all') + from_port = rule.get('FromPort', 'all') + to_port = rule.get('ToPort', 'all') + port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port + + targets = [] + # Handle CIDR blocks + for ip in rule.get('IpRanges', []): + desc = f" ({ip['Description']})" if 'Description' in ip else "" + targets.append(f"{ip['CidrIp']}{desc}") + # Handle Security Group references + for pair in rule.get('UserIdGroupPairs', []): + desc = f" ({pair['Description']})" if 'Description' in pair else "" + targets.append(f"{pair['GroupId']}{desc}") + + target_str = ", ".join(targets) + return f" [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}" def purge_rules(group_id, region, profile, dry_run=True): - """ - Removes all ingress and egress rules from the specified Security Group. - """ session = boto3.Session(profile_name=profile, region_name=region) ec2 = session.client('ec2') - print(f"\n--- Purging Rules for {group_id} in {region} (Profile: {profile}) ---") - if dry_run: - print("[DRY-RUN] No changes will be made.") + print("-" * 100) + print(f"SG RULE PURGER v{__version__} | {group_id} | {region} | Profile: {profile}") + print("-" * 100) try: - # 1. Describe the group to get current rules response = ec2.describe_security_groups(GroupIds=[group_id]) sg = response['SecurityGroups'][0] ingress = sg.get('IpPermissions', []) egress = sg.get('IpPermissionsEgress', []) - # 2. Revoke Ingress - if ingress: - print(f" Found {len(ingress)} ingress rule sets. Revoking...") - if not dry_run: - ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress) - print(" Successfully revoked all ingress rules.") - else: - print(" No ingress rules found.") + if not ingress and not egress: + print(" No rules found. Nothing to purge.") + return + + # --- PREVIEW RULES --- + print("\nIDENTIFIED RULES FOR REMOVAL:") + for rule in ingress: + print(format_rule(rule, "ingress")) + for rule in egress: + print(format_rule(rule, "egress")) - # 3. Revoke Egress + if dry_run: + print("\n[DRY-RUN] Verification complete. No changes were made.") + return + + # --- EXECUTE PURGE --- + confirm = input(f"\nCONFIRM: Purge all {len(ingress) + len(egress)} rule sets? (y/n): ") + if confirm.lower() != 'y': + print("Aborted.") + return + + if ingress: + ec2.revoke_security_group_ingress(GroupId=group_id, IpPermissions=ingress) + print(" Successfully revoked all ingress rules.") if egress: - print(f" Found {len(egress)} egress rule sets. Revoking...") - if not dry_run: - ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress) - print(" Successfully revoked all egress rules.") - else: - print(" No egress rules found.") + ec2.revoke_security_group_egress(GroupId=group_id, IpPermissions=egress) + print(" Successfully revoked all egress rules.") except ClientError as e: - if e.response['Error']['Code'] == 'DryRunOperation': - print("[DRY-RUN] Permission check successful.") - else: - print(f"Error: {e}") - sys.exit(1) + print(f"Error: {e}") + sys.exit(1) def main(): parser = argparse.ArgumentParser(description=f"SG Rule Purger v{__version__}") - parser.add_argument("--group-id", required=True, help="The ID of the security group (sg-xxxxxxxx)") - parser.add_argument("--region", required=True, help="AWS Region (e.g., us-east-1)") - parser.add_argument("--profile", required=True, help="AWS CLI Profile name") - parser.add_argument("--dry-run", action="store_true", default=False, help="Perform a dry run") + parser.add_argument("--group-id", required=True, help="SG ID (sg-xxxxxxxx)") + parser.add_argument("--region", required=True, help="AWS Region") + parser.add_argument("--profile", required=True, help="AWS Profile") + parser.add_argument("--dry-run", action="store_true", help="Perform a dry run") args = parser.parse_args() - - # If --dry-run is not passed, it defaults to False. - # But for safety, I've coded the function to default to True unless explicitly told otherwise. purge_rules(args.group_id, args.region, args.profile, dry_run=args.dry_run) - - if not args.dry_run: - print("\nPurge complete.") - else: - print("\nDry run finished. Use the same command without --dry-run to execute.") if __name__ == "__main__": main()