diff --git a/local-app/python-tools/cross-organization/analyze-tag-data.py b/local-app/python-tools/cross-organization/analyze-tag-data.py new file mode 100755 index 00000000..bd62c437 --- /dev/null +++ b/local-app/python-tools/cross-organization/analyze-tag-data.py @@ -0,0 +1,100 @@ +#!/bin/env python + +import csv +import argparse +import sys +import glob +from collections import Counter, defaultdict + +def get_args(): + parser = argparse.ArgumentParser(description="AWS Tag Data Analyzer v1.0.0") + parser.add_argument("--tags-file", required=True, help="Original CSV with TagKey and Status") + parser.add_argument("--findings-file", nargs='+', required=True, help="One or more CSV findings files (supports wildcards)") + parser.add_argument("--output", help="Optional CSV file to save analysis results") + return parser.parse_args() + +def analyze(): + args = get_args() + + # 1. Load Tag Metadata (Status) + tag_metadata = {} + try: + with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: + reader = csv.DictReader(f) + for row in reader: + key = row.get('TagKey', '').strip() + status = row.get('Status', 'Unknown').strip() + if key: + tag_metadata[key] = status + except Exception as e: + print(f"[!] Error reading tags-file: {e}") + sys.exit(1) + + # 2. Load Findings + findings_count = Counter() + account_map = defaultdict(set) + total_rows = 0 + + # Expand wildcards if necessary (especially on Windows) + files_to_process = [] + for pattern in args.findings_file: + files_to_process.extend(glob.glob(pattern)) + + if not files_to_process: + print("[!] No findings files found matching the criteria.") + sys.exit(1) + + print(f"[*] Processing {len(files_to_process)} findings files...") + + for file in files_to_process: + try: + with open(file, mode='r', encoding='utf-8') as f: + reader = csv.DictReader(f) + for row in reader: + tag_name = row.get('tag_name') + acc_id = row.get('account_id') + if tag_name: + findings_count[tag_name] += 1 + if acc_id: + account_map[tag_name].add(acc_id) + total_rows += 1 + except Exception as e: + print(f"[!] Error reading {file}: {e}") + + # 3. Generate Report + print(f"\n{'='*85}") + print(f"{'Tag Name':<35} | {'Status':<10} | {'Instances':<12} | {'Unique Accounts'}") + print(f"{'-'*85}") + + report_rows = [] + # Sort by Instance count descending + sorted_tags = sorted(findings_count.items(), key=lambda x: x[1], reverse=True) + + for tag, count in sorted_tags: + status = tag_metadata.get(tag, "Not in List") + unique_accs = len(account_map[tag]) + print(f"{tag:<35} | {status:<10} | {count:<12} | {unique_accs}") + + report_rows.append({ + "TagKey": tag, + "Status": status, + "TotalInstances": count, + "UniqueAccounts": unique_accs + }) + + print(f"{'='*85}") + print(f"Total Resources Scanned with Hits: {total_rows}\n") + + # 4. Optional Export + if args.output: + try: + with open(args.output, 'w', newline='') as f: + writer = csv.DictWriter(f, fieldnames=["TagKey", "Status", "TotalInstances", "UniqueAccounts"]) + writer.writeheader() + writer.writerows(report_rows) + print(f"[+] Analysis saved to: {args.output}") + except Exception as e: + print(f"[!] Export failed: {e}") + +if __name__ == "__main__": + analyze()