diff --git a/local-app/python-tools/ipam/summarize_ipam.py b/local-app/python-tools/ipam/summarize_ipam.py new file mode 100755 index 00000000..3cac88dc --- /dev/null +++ b/local-app/python-tools/ipam/summarize_ipam.py @@ -0,0 +1,63 @@ +#!/bin/env python3 + +import json +import argparse +import ipaddress +import sys +from datetime import datetime + +__version__ = "1.0.0" + +def get_args(): + parser = argparse.ArgumentParser(description="Aggregate IPAM CIDRs into supernets.") + parser.add_argument("input_file", help="The JSON file generated by the IPAM exporter") + return parser.parse_args() + +def summarize_cidrs(input_file): + try: + with open(input_file, 'r') as f: + data = json.load(f) + except Exception as e: + print(f"Error reading file: {e}") + sys.exit(1) + + # Extract CIDRs specifically from VPCs (to avoid subnet overlap issues) + # Using a set to handle any potential duplicate CIDR entries + vpc_cidrs = {item['ResourceCidr'] for item in data if item['ResourceType'] == 'VPC'} + + if not vpc_cidrs: + print("No VPC CIDRs found in the input file.") + return + + # Convert strings to IPv4Network objects + networks = [] + for cidr in vpc_cidrs: + try: + networks.append(ipaddress.ip_network(cidr)) + except ValueError as e: + print(f"Skipping invalid CIDR {cidr}: {e}") + + # The 'collapse_addresses' function does the magic + # It returns an iterator of the most collapsed (summarized) networks + summarized = list(ipaddress.collapse_addresses(networks)) + + # Print results + timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") + output_filename = f"ipam-summary.{timestamp}.txt" + + with open(output_filename, 'w') as f: + f.write(f"# IPAM Aggregation Summary - Generated {timestamp}\n") + f.write(f"# Input: {input_file}\n") + f.write(f"# Original VPC Count: {len(vpc_cidrs)}\n") + f.write(f"# Summarized Block Count: {len(summarized)}\n") + f.write("-" * 40 + "\n") + for net in summarized: + print(net) + f.write(f"{net}\n") + + print(f"\nSuccessfully summarized {len(vpc_cidrs)} VPCs into {len(summarized)} blocks.") + print(f"Results saved to: {output_filename}") + +if __name__ == "__main__": + args = get_args() + summarize_cidrs(args.input_file)