From 549b8bf2d43919cdc3305af1485e09ed8e4c1117 Mon Sep 17 00:00:00 2001 From: badra001 Date: Thu, 12 Mar 2026 11:15:03 -0400 Subject: [PATCH] add version, fix --- .../ipam/export_ipam_discovery.py | 60 ++++++++++++------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/local-app/python-tools/ipam/export_ipam_discovery.py b/local-app/python-tools/ipam/export_ipam_discovery.py index 6ae18fc2..5ff6b1fa 100755 --- a/local-app/python-tools/ipam/export_ipam_discovery.py +++ b/local-app/python-tools/ipam/export_ipam_discovery.py @@ -2,25 +2,29 @@ import boto3 import csv +import json import argparse -import sys +from datetime import datetime from botocore.exceptions import ClientError +__version__ = "1.0.1" + def get_args(): - parser = argparse.ArgumentParser(description="Export AWS IPAM discovered resources to CSV.") + parser = argparse.ArgumentParser(description=f"Export AWS IPAM discovery to CSV/JSON (v{__version__})") parser.add_argument("--profile", help="AWS CLI profile name", default=None) parser.add_argument("--region", help="AWS region (e.g., us-gov-west-1)", required=True) - parser.add_argument("--output", help="Output CSV filename", default="ipam_discovery.csv") return parser.parse_args() -def export_ipam_data(profile, region, output_file): - # Initialize session +def export_ipam_data(profile, region): session = boto3.Session(profile_name=profile, region_name=region) ec2 = session.client("ec2") + + timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") + csv_file = f"ipam-export.{timestamp}.csv" + json_file = f"ipam-export.{timestamp}.json" try: - # First, we need the IPAM Resource Discovery ID - # Most environments have one, but we'll fetch the first active one found + # Get the first active Discovery ID discoveries = ec2.describe_ipam_resource_discoveries() if not discoveries["IpamResourceDiscoveries"]: print(f"Error: No IPAM Resource Discoveries found in {region}.") @@ -32,17 +36,20 @@ def export_ipam_data(profile, region, output_file): resources = [] paginator = ec2.get_paginator("get_ipam_discovered_resource_cidrs") - # Fetching VPCs and Subnets + # Fixed parameters: ResourceRegion is required, ResourceIdScope removed for resource_type in ["vpc", "subnet"]: - print(f"Fetching discovered {resource_type}s...") + print(f"Fetching {resource_type}s...") page_iterator = paginator.paginate( IpamResourceDiscoveryId=discovery_id, - ResourceIdScope=region, + ResourceRegion=region, Filters=[{'Name': 'resource-type', 'Values': [resource_type]}] ) for page in page_iterator: for item in page["IpamDiscoveredResourceCidrs"]: + # Convert datetime to string for JSON serialization + sample_time = item.get("SampleTime").isoformat() if item.get("SampleTime") else None + resources.append({ "ResourceType": resource_type.upper(), "ResourceId": item.get("ResourceId"), @@ -50,20 +57,26 @@ def export_ipam_data(profile, region, output_file): "ResourceRegion": item.get("ResourceRegion"), "ResourceOwnerId": item.get("ResourceOwnerId"), "IpUsage": item.get("IpUsage"), - "VpcId": item.get("VpcId", "N/A"), # Only populates for subnets - "SampleTime": item.get("SampleTime").strftime("%Y-%m-%d %H:%M:%S") + "VpcId": item.get("VpcId", "N/A"), + "SampleTime": sample_time }) - # Write to CSV - if resources: - keys = resources[0].keys() - with open(output_file, "w", newline="") as f: - dict_writer = csv.DictWriter(f, fieldnames=keys) - dict_writer.writeheader() - dict_writer.writerows(resources) - print(f"Successfully exported {len(resources)} resources to {output_file}") - else: - print("No resources found to export.") + if not resources: + print("No resources found.") + return + + # 1. Export JSON + with open(json_file, "w") as jf: + json.dump(resources, jf, indent=4) + print(f"JSON exported to: {json_file}") + + # 2. Export CSV + keys = resources[0].keys() + with open(csv_file, "w", newline="") as cf: + writer = csv.DictWriter(cf, fieldnames=keys) + writer.writeheader() + writer.writerows(resources) + print(f"CSV exported to: {csv_file}") except ClientError as e: print(f"AWS Error: {e.response['Error']['Message']}") @@ -72,4 +85,5 @@ def export_ipam_data(profile, region, output_file): if __name__ == "__main__": args = get_args() - export_ipam_data(args.profile, args.region, args.output) + print(f"Running IPAM Export v{__version__}") + export_ipam_data(args.profile, args.region)