Skip to content

Commit

Permalink
refine header
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Mar 19, 2026
1 parent 7188ff6 commit ba9ca4d
Showing 1 changed file with 28 additions and 15 deletions.
43 changes: 28 additions & 15 deletions local-app/python-tools/cross-organization/purge_sg_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from botocore.exceptions import ClientError

# --- VERSIONING ---
__version__ = "1.2.2"
__version__ = "1.2.4"

def get_prefix_list_info(ec2_client, pl_id):
"""Looks up Prefix List metadata."""
Expand All @@ -18,12 +18,18 @@ def get_prefix_list_info(ec2_client, pl_id):
pass
return "Unknown Prefix List"

def sort_rules(rules):
"""Sorts rules by 'FromPort'. Rules with no port (-1) are treated as 0."""
return sorted(rules, key=lambda x: x.get('FromPort', 0))

def format_rule(ec2_client, rule, direction, index):
"""Formats a rule with an index and enhanced Prefix List details."""
"""Formats a rule with explicit From/To ports and enhanced Prefix List details."""
proto = rule.get('IpProtocol', 'all')
from_port = rule.get('FromPort', 'all')
to_port = rule.get('ToPort', 'all')
port_range = f"{from_port}-{to_port}" if from_port != to_port else from_port

# Format the port display specifically as From -> To
port_display = f"{from_port} -> {to_port}" if from_port != "all" else "ALL"

targets = []
for ip in rule.get('IpRanges', []):
Expand All @@ -36,7 +42,7 @@ def format_rule(ec2_client, rule, direction, index):
targets.append(f"{pl_id} [{pl_meta}]{' ('+pl['Description']+')' if 'Description' in pl else ''}")

target_str = ", ".join(targets)
return f" {index:>2}. [{direction.upper()}] Proto: {proto:<5} | Ports: {port_range:<10} | Targets: {target_str}"
return f" {index:>2}. [{direction.upper()}] Proto: {proto:<5} | Ports: {port_display:<15} | Targets: {target_str}"

def process_sg(group_id, region, profile, dry_run=True, list_only=False):
session = boto3.Session(profile_name=profile, region_name=region)
Expand All @@ -46,36 +52,42 @@ def process_sg(group_id, region, profile, dry_run=True, list_only=False):
response = ec2.describe_security_groups(GroupIds=[group_id])
sg = response['SecurityGroups'][0]

# Extract Metadata for Header
tags = {t['Key']: t['Value'] for t in sg.get('Tags', [])}
sg_name = tags.get('Name', 'N/A')

print("-" * 110)
print("-" * 115)
mode = "LIST MODE" if list_only else "PURGE MODE"
print(f"SG RULE {mode} v{__version__} | {group_id} | {region} | Profile: {profile}")
print(f"NAME: {sg_name}")

if tags:
tag_str = ", ".join([f"{k}:{v}" for k,v in tags.items() if k != 'Name'])
print(f"TAGS: {tag_str}")
print("-" * 110)
print("TAGS:")
for k, v in sorted(tags.items()):
if k == 'Name': continue
print(f" {k:<20}: {v}")
print("-" * 115)

ingress, egress = sg.get('IpPermissions', []), sg.get('IpPermissionsEgress', [])
ingress_raw = sg.get('IpPermissions', [])
egress_raw = sg.get('IpPermissionsEgress', [])

if not ingress and not egress:
if not ingress_raw and not egress_raw:
print(" No rules found.")
return

# Sort the rules before display
ingress = sort_rules(ingress_raw)
egress = sort_rules(egress_raw)

if ingress:
print("\nINGRESS RULES:")
print("\nINGRESS RULES (Sorted by Port):")
for i, rule in enumerate(ingress, 1):
print(format_rule(ec2, rule, "ingress", i))

if egress:
print("\nEGRESS RULES:")
print("\nEGRESS RULES (Sorted by Port):")
for i, rule in enumerate(egress, 1):
print(format_rule(ec2, rule, "egress", i))

# Footer Summary
print("\n" + "-" * 40)
print(f"SUMMARY OF RULES FOUND:")
print(f" Total Ingress: {len(ingress)}")
Expand Down Expand Up @@ -110,4 +122,5 @@ def main():
args = parser.parse_args()
process_sg(args.group_id, args.region, args.profile, dry_run=args.dry_run, list_only=args.list)

if __name__ == "__main__": main()
if __name__ == "__main__":
main()

0 comments on commit ba9ca4d

Please sign in to comment.