diff --git a/local-app/python-tools/cross-organization/purge_sg_rules.py b/local-app/python-tools/cross-organization/purge_sg_rules.py index c63ca657..f49d7728 100755 --- a/local-app/python-tools/cross-organization/purge_sg_rules.py +++ b/local-app/python-tools/cross-organization/purge_sg_rules.py @@ -5,7 +5,7 @@ from botocore.exceptions import ClientError # --- VERSIONING --- -__version__ = "1.2.2" +__version__ = "1.2.4" def get_prefix_list_info(ec2_client, pl_id): """Looks up Prefix List metadata.""" @@ -18,12 +18,18 @@ def get_prefix_list_info(ec2_client, pl_id): pass return "Unknown Prefix List" +def sort_rules(rules): + """Sorts rules by 'FromPort'. Rules with no port (-1) are treated as 0.""" + return sorted(rules, key=lambda x: x.get('FromPort', 0)) + def format_rule(ec2_client, rule, direction, index): - """Formats a rule with an index and enhanced Prefix List details.""" + """Formats a rule with explicit From/To ports and enhanced Prefix List details.""" proto = rule.get('IpProtocol', 'all') from_port = rule.get('FromPort', 'all') to_port = rule.get('ToPort', 'all') - port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port + + # Format the port display specifically as From -> To + port_display = f"{from_port} -> {to_port}" if from_port != "all" else "ALL" targets = [] for ip in rule.get('IpRanges', []): @@ -36,7 +42,7 @@ def format_rule(ec2_client, rule, direction, index): targets.append(f"{pl_id} [{pl_meta}]{' ('+pl['Description']+')' if 'Description' in pl else ''}") target_str = ", ".join(targets) - return f" {index:>2}. [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}" + return f" {index:>2}. [{direction.upper()}] Proto: {proto:<5} | Ports: {port_display:<15} | Targets: {target_str}" def process_sg(group_id, region, profile, dry_run=True, list_only=False): session = boto3.Session(profile_name=profile, region_name=region) @@ -46,36 +52,42 @@ def process_sg(group_id, region, profile, dry_run=True, list_only=False): response = ec2.describe_security_groups(GroupIds=[group_id]) sg = response['SecurityGroups'][0] - # Extract Metadata for Header tags = {t['Key']: t['Value'] for t in sg.get('Tags', [])} sg_name = tags.get('Name', 'N/A') - print("-" * 110) + print("-" * 115) mode = "LIST MODE" if list_only else "PURGE MODE" print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}") print(f"NAME: {sg_name}") + if tags: - tag_str = ", ".join([f"{k}:{v}" for k,v in tags.items() if k != 'Name']) - print(f"TAGS: {tag_str}") - print("-" * 110) + print("TAGS:") + for k, v in sorted(tags.items()): + if k == 'Name': continue + print(f" {k:<20}: {v}") + print("-" * 115) - ingress, egress = sg.get('IpPermissions', []), sg.get('IpPermissionsEgress', []) + ingress_raw = sg.get('IpPermissions', []) + egress_raw = sg.get('IpPermissionsEgress', []) - if not ingress and not egress: + if not ingress_raw and not egress_raw: print(" No rules found.") return + # Sort the rules before display + ingress = sort_rules(ingress_raw) + egress = sort_rules(egress_raw) + if ingress: - print("\nINGRESS RULES:") + print("\nINGRESS RULES (Sorted by Port):") for i, rule in enumerate(ingress, 1): print(format_rule(ec2, rule, "ingress", i)) if egress: - print("\nEGRESS RULES:") + print("\nEGRESS RULES (Sorted by Port):") for i, rule in enumerate(egress, 1): print(format_rule(ec2, rule, "egress", i)) - # Footer Summary print("\n" + "-" * 40) print(f"SUMMARY OF RULES FOUND:") print(f" Total Ingress: {len(ingress)}") @@ -110,4 +122,5 @@ def main(): args = parser.parse_args() process_sg(args.group_id, args.region, args.profile, dry_run=args.dry_run, list_only=args.list) -if __name__ == "__main__": main() +if __name__ == "__main__": + main()