From 7cf8ce92d34ba077901d2b8cdcb2860b404b56c4 Mon Sep 17 00:00:00 2001 From: badra001 Date: Thu, 12 Mar 2026 11:17:23 -0400 Subject: [PATCH] sort --- .../ipam/export_ipam_discovery.py | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/local-app/python-tools/ipam/export_ipam_discovery.py b/local-app/python-tools/ipam/export_ipam_discovery.py index 3599efd2..7efe6413 100755 --- a/local-app/python-tools/ipam/export_ipam_discovery.py +++ b/local-app/python-tools/ipam/export_ipam_discovery.py @@ -4,17 +4,30 @@ import csv import json import argparse +import ipaddress from datetime import datetime from botocore.exceptions import ClientError -__version__ = "1.0.2" +__version__ = "1.0.3" def get_args(): parser = argparse.ArgumentParser(description=f"Global AWS IPAM Export (v{__version__})") parser.add_argument("--profile", help="AWS CLI profile name", default=None) - parser.add_argument("--region", help="IPAM Home Region (e.g., us-gov-west-1)", required=True) + parser.add_argument("--region", help="IPAM Home Region", required=True) return parser.parse_args() +def cidr_sort_key(resource): + """ + Helper to convert CIDR string into a sortable tuple: + (Version, Network Address as Integer, Prefix Length) + """ + try: + net = ipaddress.ip_network(resource["ResourceCidr"]) + return (net.version, int(net.network_address), net.prefixlen) + except ValueError: + # Fallback for invalid/empty CIDRs + return (0, 0, 0) + def export_ipam_data(profile, region): session = boto3.Session(profile_name=profile, region_name=region) ec2 = session.client("ec2") @@ -24,7 +37,6 @@ def export_ipam_data(profile, region): json_file = f"ipam-export.{timestamp}.json" try: - # 1. Identify the Resource Discovery and its Operating Regions discoveries = ec2.describe_ipam_resource_discoveries() if not discoveries["IpamResourceDiscoveries"]: print(f"Error: No IPAM Resource Discoveries found in {region}.") @@ -32,7 +44,6 @@ def export_ipam_data(profile, region): discovery = discoveries["IpamResourceDiscoveries"][0] discovery_id = discovery["IpamResourceDiscoveryId"] - # Extract all regions this IPAM is actually watching operating_regions = [r["RegionName"] for r in discovery.get("OperatingRegions", [])] print(f"Using Discovery ID: {discovery_id}") @@ -41,7 +52,6 @@ def export_ipam_data(profile, region): all_resources = [] paginator = ec2.get_paginator("get_ipam_discovered_resource_cidrs") - # 2. Loop through every operating region to get the full picture for op_region in operating_regions: print(f" > Querying region: {op_region}...", end="\r") @@ -55,8 +65,6 @@ def export_ipam_data(profile, region): for page in page_iterator: for item in page["IpamDiscoveredResourceCidrs"]: - sample_time = item.get("SampleTime").isoformat() if item.get("SampleTime") else None - all_resources.append({ "ResourceType": resource_type.upper(), "ResourceId": item.get("ResourceId"), @@ -65,38 +73,38 @@ def export_ipam_data(profile, region): "ResourceOwnerId": item.get("ResourceOwnerId"), "IpUsage": item.get("IpUsage"), "VpcId": item.get("VpcId", "N/A"), - "SampleTime": sample_time + "SampleTime": item.get("SampleTime").isoformat() if item.get("SampleTime") else None }) except ClientError as e: - # Handle cases where a region might be disabled or restricted print(f"\n ! Skipping {op_region}: {e.response['Error']['Code']}") continue - print(f"\nTotal resources discovered across all regions: {len(all_resources)}") - if not all_resources: - print("No resources found.") + print("\nNo resources found.") return - # 3. Export JSON + # --- SORTING LOGIC --- + # Sort by ResourceType first, then by CIDR numerically + print("\nSorting records by type and CIDR...") + all_resources.sort(key=lambda x: (x["ResourceType"], cidr_sort_key(x))) + + # Export JSON with open(json_file, "w") as jf: json.dump(all_resources, jf, indent=4) print(f"JSON exported: {json_file}") - # 4. Export CSV + # Export CSV keys = all_resources[0].keys() with open(csv_file, "w", newline="") as cf: writer = csv.DictWriter(cf, fieldnames=keys) writer.writeheader() writer.writerows(all_resources) print(f"CSV exported: {csv_file}") + print(f"Total records processed: {len(all_resources)}") - except ClientError as e: - print(f"\nAWS Error: {e.response['Error']['Message']}") except Exception as e: print(f"\nAn error occurred: {e}") if __name__ == "__main__": args = get_args() - print(f"Running Global IPAM Export v{__version__}") export_ipam_data(args.profile, args.region)