diff --git a/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py b/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py index 4e7d0aa3..fc2f4e8d 100755 --- a/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py +++ b/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py @@ -1,120 +1,92 @@ #!/bin/env python + import csv import argparse import sys import glob import os import resource +import json from collections import Counter, defaultdict +from datetime import datetime +import re -__version__ = "1.0.1" +__version__ = "1.0.5" def get_args(): parser = argparse.ArgumentParser(description=f"AWS Tag Data Analyzer v{__version__}") - parser.add_argument("--tags-file", required=True, help="Original CSV with TagKey and Status") - parser.add_argument("--findings-file", nargs='+', required=True, help="One or more CSV findings files") - parser.add_argument("--output", help="Optional CSV file to save analysis results") + parser.add_argument("--tags-file", required=True, help="Inventory of all tags") + parser.add_argument("--required-tags-file", required=True, help="CSV with mandatory TagKey list") + parser.add_argument("--findings-file", nargs='+', required=True, help="Scanner CSV findings") + parser.add_argument("--legacy-map", help="JSON file with {'LegacyKey': 'NewKey'} mapping") + parser.add_argument("--output", help="Output prefix") return parser.parse_args() def analyze(): args = get_args() - start_time = datetime.now() - # 1. Load Tag Metadata (Status) - tag_metadata = {} - try: - with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: + # 1. Load Mapping & Required Tags + required_keys = set() + legacy_map = {} + + if args.required_tags_file: + with open(args.required_tags_file, mode='r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) - for row in reader: - key = row.get('TagKey', '').strip() - status = row.get('Status', 'Unknown').strip() - if key: - tag_metadata[key] = status - except Exception as e: - print(f"[!] Error reading tags-file: {e}") - sys.exit(1) + required_keys = {row.get('TagKey', '').strip() for row in reader if row.get('TagKey')} + + if args.legacy_map: + with open(args.legacy_map, 'r') as f: + legacy_map = json.load(f) # 2. Process Findings - findings_count = Counter() - account_map = defaultdict(set) - all_seen_accounts = set() - total_hits = 0 - max_tag_len = 20 # Minimum starting width - - files_to_process = [] - for pattern in args.findings_file: - files_to_process.extend(glob.glob(pattern)) - - if not files_to_process: - print("[!] No findings files found.") - sys.exit(1) - - print(f"[*] Analyzing {len(files_to_process)} findings files...") - - for file in files_to_process: - try: - with open(file, mode='r', encoding='utf-8') as f: - reader = csv.DictReader(f) - for row in reader: - tag_name = row.get('tag_name', '').strip() - acc_id = row.get('account_id', '').strip() - acc_alias = row.get('account_alias', '').strip() - - if tag_name: - findings_count[tag_name] += 1 - max_tag_len = max(max_tag_len, len(tag_name)) - if acc_id: - account_map[tag_name].add(acc_id) - all_seen_accounts.add(f"{acc_id} ({acc_alias})") - total_hits += 1 - except Exception as e: - print(f"[!] Error reading {file}: {e}") - - # 3. Final Table Formatting - col1_width = max_tag_len + 2 - header = f"{'Tag Name'.ljust(col1_width)} | {'Status':<10} | {'Instances':<12} | {'Unique Accounts'}" - divider = "-" * (len(header) + 5) - - print(f"\n{divider}") - print(header) - print(divider) - - report_rows = [] - sorted_tags = sorted(findings_count.items(), key=lambda x: x[1], reverse=True) - - for tag, count in sorted_tags: - status = tag_metadata.get(tag, "Not in List") - unique_accs = len(account_map[tag]) - print(f"{tag.ljust(col1_width)} | {status:<10} | {count:<12} | {unique_accs}") - - report_rows.append({ - "TagKey": tag, - "Status": status, - "TotalInstances": count, - "UniqueAccounts": unique_accs - }) + resource_tags = defaultdict(dict) # ARN -> {Key: Value} + resource_info = {} # ARN -> {Account, Region} + + files = [] + for p in args.findings_file: files.extend(glob.glob(p)) + + for file in files: + with open(file, mode='r', encoding='utf-8') as f: + for row in csv.DictReader(f): + arn, tag, val = row['arn'], row['tag_name'], row['tag_value'] + resource_tags[arn][tag] = val + resource_info[arn] = {"acc": row['account_id'], "reg": row['region']} - print(divider) + # --- SECTION 5: NORMALIZATION & MIGRATION --- + print(f"\n{'='*80}\nSECTION 5: NORMALIZATION (LEGACY MAPPING)\n{'='*80}") + migration_tasks = [] - # 4. Accounts with NO Hits - # Note: This logic assumes we want to see which accounts appeared in the CSVs but had no data. - # To see accounts that never even made it to the CSV, you would need to cross-ref with --list-accounts. - print(f"\n[SUMMARY STATS]") - print(f"Total Unique Tags Found : {len(findings_count)}") - print(f"Total Resource Tag Hits : {total_hits}") - print(f"Accounts with Hits : {len(all_seen_accounts)}") + for arn, tags in resource_tags.items(): + for legacy_key, target_key in legacy_map.items(): + if legacy_key in tags: + has_target = target_key in tags + migration_tasks.append({ + "arn": arn, + "account": resource_info[arn]['acc'], + "legacy_key": legacy_key, + "legacy_value": tags[legacy_key], + "target_key": target_key, + "status": "In Sync" if has_target else "PENDING MIGRATION" + }) + + pending = [t for t in migration_tasks if t['status'] == "PENDING MIGRATION"] + print(f"Total Legacy Tags Found : {len(migration_tasks)}") + print(f"Migration Tasks Pending : {len(pending)}") + print(f"{'-'*80}") - # Peak Memory - mem_mb = round(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024, 2) - print(f"Analysis Memory Usage : {mem_mb} MB") + if pending: + print("Top Pending Migrations (Sample):") + for p in pending[:5]: + print(f" [!] {p['arn']}\n Move '{p['legacy_key']}' -> '{p['target_key']}' (Value: {p['legacy_value']})") + # Export Logic if args.output: - with open(args.output, 'w', newline='') as f: - writer = csv.DictWriter(f, fieldnames=["TagKey", "Status", "TotalInstances", "UniqueAccounts"]) + mig_file = f"{args.output}_migration_todo.csv" + with open(mig_file, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=["arn", "account", "legacy_key", "legacy_value", "target_key", "status"]) writer.writeheader() - writer.writerows(report_rows) - print(f"\n[+] Full analysis exported to: {args.output}") + writer.writerows(migration_tasks) + print(f"\n[+] Migration report saved to: {mig_file}") if __name__ == "__main__": - from datetime import datetime analyze()