From 40baeeba67d50a9b4e0a1ed2998c140ea19e222e Mon Sep 17 00:00:00 2001 From: badra001 Date: Thu, 12 Mar 2026 11:16:45 -0400 Subject: [PATCH] fix --- .../ipam/export_ipam_discovery.py | 93 +++++++++++-------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/local-app/python-tools/ipam/export_ipam_discovery.py b/local-app/python-tools/ipam/export_ipam_discovery.py index 5ff6b1fa..3599efd2 100755 --- a/local-app/python-tools/ipam/export_ipam_discovery.py +++ b/local-app/python-tools/ipam/export_ipam_discovery.py @@ -7,12 +7,12 @@ from datetime import datetime from botocore.exceptions import ClientError -__version__ = "1.0.1" +__version__ = "1.0.2" def get_args(): - parser = argparse.ArgumentParser(description=f"Export AWS IPAM discovery to CSV/JSON (v{__version__})") + parser = argparse.ArgumentParser(description=f"Global AWS IPAM Export (v{__version__})") parser.add_argument("--profile", help="AWS CLI profile name", default=None) - parser.add_argument("--region", help="AWS region (e.g., us-gov-west-1)", required=True) + parser.add_argument("--region", help="IPAM Home Region (e.g., us-gov-west-1)", required=True) return parser.parse_args() def export_ipam_data(profile, region): @@ -24,66 +24,79 @@ def export_ipam_data(profile, region): json_file = f"ipam-export.{timestamp}.json" try: - # Get the first active Discovery ID + # 1. Identify the Resource Discovery and its Operating Regions discoveries = ec2.describe_ipam_resource_discoveries() if not discoveries["IpamResourceDiscoveries"]: print(f"Error: No IPAM Resource Discoveries found in {region}.") return - discovery_id = discoveries["IpamResourceDiscoveries"][0]["IpamResourceDiscoveryId"] + discovery = discoveries["IpamResourceDiscoveries"][0] + discovery_id = discovery["IpamResourceDiscoveryId"] + # Extract all regions this IPAM is actually watching + operating_regions = [r["RegionName"] for r in discovery.get("OperatingRegions", [])] + print(f"Using Discovery ID: {discovery_id}") + print(f"Scanning {len(operating_regions)} operating regions...") - resources = [] + all_resources = [] paginator = ec2.get_paginator("get_ipam_discovered_resource_cidrs") - # Fixed parameters: ResourceRegion is required, ResourceIdScope removed - for resource_type in ["vpc", "subnet"]: - print(f"Fetching {resource_type}s...") - page_iterator = paginator.paginate( - IpamResourceDiscoveryId=discovery_id, - ResourceRegion=region, - Filters=[{'Name': 'resource-type', 'Values': [resource_type]}] - ) + # 2. Loop through every operating region to get the full picture + for op_region in operating_regions: + print(f" > Querying region: {op_region}...", end="\r") + + for resource_type in ["vpc", "subnet"]: + try: + page_iterator = paginator.paginate( + IpamResourceDiscoveryId=discovery_id, + ResourceRegion=op_region, + Filters=[{'Name': 'resource-type', 'Values': [resource_type]}] + ) - for page in page_iterator: - for item in page["IpamDiscoveredResourceCidrs"]: - # Convert datetime to string for JSON serialization - sample_time = item.get("SampleTime").isoformat() if item.get("SampleTime") else None - - resources.append({ - "ResourceType": resource_type.upper(), - "ResourceId": item.get("ResourceId"), - "ResourceCidr": item.get("ResourceCidr"), - "ResourceRegion": item.get("ResourceRegion"), - "ResourceOwnerId": item.get("ResourceOwnerId"), - "IpUsage": item.get("IpUsage"), - "VpcId": item.get("VpcId", "N/A"), - "SampleTime": sample_time - }) + for page in page_iterator: + for item in page["IpamDiscoveredResourceCidrs"]: + sample_time = item.get("SampleTime").isoformat() if item.get("SampleTime") else None + + all_resources.append({ + "ResourceType": resource_type.upper(), + "ResourceId": item.get("ResourceId"), + "ResourceCidr": item.get("ResourceCidr"), + "ResourceRegion": item.get("ResourceRegion"), + "ResourceOwnerId": item.get("ResourceOwnerId"), + "IpUsage": item.get("IpUsage"), + "VpcId": item.get("VpcId", "N/A"), + "SampleTime": sample_time + }) + except ClientError as e: + # Handle cases where a region might be disabled or restricted + print(f"\n ! Skipping {op_region}: {e.response['Error']['Code']}") + continue - if not resources: + print(f"\nTotal resources discovered across all regions: {len(all_resources)}") + + if not all_resources: print("No resources found.") return - # 1. Export JSON + # 3. Export JSON with open(json_file, "w") as jf: - json.dump(resources, jf, indent=4) - print(f"JSON exported to: {json_file}") + json.dump(all_resources, jf, indent=4) + print(f"JSON exported: {json_file}") - # 2. Export CSV - keys = resources[0].keys() + # 4. Export CSV + keys = all_resources[0].keys() with open(csv_file, "w", newline="") as cf: writer = csv.DictWriter(cf, fieldnames=keys) writer.writeheader() - writer.writerows(resources) - print(f"CSV exported to: {csv_file}") + writer.writerows(all_resources) + print(f"CSV exported: {csv_file}") except ClientError as e: - print(f"AWS Error: {e.response['Error']['Message']}") + print(f"\nAWS Error: {e.response['Error']['Message']}") except Exception as e: - print(f"An error occurred: {e}") + print(f"\nAn error occurred: {e}") if __name__ == "__main__": args = get_args() - print(f"Running IPAM Export v{__version__}") + print(f"Running Global IPAM Export v{__version__}") export_ipam_data(args.profile, args.region)