-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
63 additions
and
91 deletions.
There are no files selected for viewing
154 changes: 63 additions & 91 deletions
154
local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,120 +1,92 @@ | ||
| #!/bin/env python | ||
|
|
||
| import csv | ||
| import argparse | ||
| import sys | ||
| import glob | ||
| import os | ||
| import resource | ||
| import json | ||
| from collections import Counter, defaultdict | ||
| from datetime import datetime | ||
| import re | ||
|
|
||
| __version__ = "1.0.1" | ||
| __version__ = "1.0.5" | ||
|
|
||
| def get_args(): | ||
| parser = argparse.ArgumentParser(description=f"AWS Tag Data Analyzer v{__version__}") | ||
| parser.add_argument("--tags-file", required=True, help="Original CSV with TagKey and Status") | ||
| parser.add_argument("--findings-file", nargs='+', required=True, help="One or more CSV findings files") | ||
| parser.add_argument("--output", help="Optional CSV file to save analysis results") | ||
| parser.add_argument("--tags-file", required=True, help="Inventory of all tags") | ||
| parser.add_argument("--required-tags-file", required=True, help="CSV with mandatory TagKey list") | ||
| parser.add_argument("--findings-file", nargs='+', required=True, help="Scanner CSV findings") | ||
| parser.add_argument("--legacy-map", help="JSON file with {'LegacyKey': 'NewKey'} mapping") | ||
| parser.add_argument("--output", help="Output prefix") | ||
| return parser.parse_args() | ||
|
|
||
| def analyze(): | ||
| args = get_args() | ||
| start_time = datetime.now() | ||
|
|
||
| # 1. Load Tag Metadata (Status) | ||
| tag_metadata = {} | ||
| try: | ||
| with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: | ||
| # 1. Load Mapping & Required Tags | ||
| required_keys = set() | ||
| legacy_map = {} | ||
|
|
||
| if args.required_tags_file: | ||
| with open(args.required_tags_file, mode='r', encoding='utf-8-sig') as f: | ||
| reader = csv.DictReader(f) | ||
| for row in reader: | ||
| key = row.get('TagKey', '').strip() | ||
| status = row.get('Status', 'Unknown').strip() | ||
| if key: | ||
| tag_metadata[key] = status | ||
| except Exception as e: | ||
| print(f"[!] Error reading tags-file: {e}") | ||
| sys.exit(1) | ||
| required_keys = {row.get('TagKey', '').strip() for row in reader if row.get('TagKey')} | ||
|
|
||
| if args.legacy_map: | ||
| with open(args.legacy_map, 'r') as f: | ||
| legacy_map = json.load(f) | ||
|
|
||
| # 2. Process Findings | ||
| findings_count = Counter() | ||
| account_map = defaultdict(set) | ||
| all_seen_accounts = set() | ||
| total_hits = 0 | ||
| max_tag_len = 20 # Minimum starting width | ||
|
|
||
| files_to_process = [] | ||
| for pattern in args.findings_file: | ||
| files_to_process.extend(glob.glob(pattern)) | ||
|
|
||
| if not files_to_process: | ||
| print("[!] No findings files found.") | ||
| sys.exit(1) | ||
|
|
||
| print(f"[*] Analyzing {len(files_to_process)} findings files...") | ||
|
|
||
| for file in files_to_process: | ||
| try: | ||
| with open(file, mode='r', encoding='utf-8') as f: | ||
| reader = csv.DictReader(f) | ||
| for row in reader: | ||
| tag_name = row.get('tag_name', '').strip() | ||
| acc_id = row.get('account_id', '').strip() | ||
| acc_alias = row.get('account_alias', '').strip() | ||
|
|
||
| if tag_name: | ||
| findings_count[tag_name] += 1 | ||
| max_tag_len = max(max_tag_len, len(tag_name)) | ||
| if acc_id: | ||
| account_map[tag_name].add(acc_id) | ||
| all_seen_accounts.add(f"{acc_id} ({acc_alias})") | ||
| total_hits += 1 | ||
| except Exception as e: | ||
| print(f"[!] Error reading {file}: {e}") | ||
|
|
||
| # 3. Final Table Formatting | ||
| col1_width = max_tag_len + 2 | ||
| header = f"{'Tag Name'.ljust(col1_width)} | {'Status':<10} | {'Instances':<12} | {'Unique Accounts'}" | ||
| divider = "-" * (len(header) + 5) | ||
|
|
||
| print(f"\n{divider}") | ||
| print(header) | ||
| print(divider) | ||
|
|
||
| report_rows = [] | ||
| sorted_tags = sorted(findings_count.items(), key=lambda x: x[1], reverse=True) | ||
|
|
||
| for tag, count in sorted_tags: | ||
| status = tag_metadata.get(tag, "Not in List") | ||
| unique_accs = len(account_map[tag]) | ||
| print(f"{tag.ljust(col1_width)} | {status:<10} | {count:<12} | {unique_accs}") | ||
|
|
||
| report_rows.append({ | ||
| "TagKey": tag, | ||
| "Status": status, | ||
| "TotalInstances": count, | ||
| "UniqueAccounts": unique_accs | ||
| }) | ||
| resource_tags = defaultdict(dict) # ARN -> {Key: Value} | ||
| resource_info = {} # ARN -> {Account, Region} | ||
|
|
||
| files = [] | ||
| for p in args.findings_file: files.extend(glob.glob(p)) | ||
|
|
||
| for file in files: | ||
| with open(file, mode='r', encoding='utf-8') as f: | ||
| for row in csv.DictReader(f): | ||
| arn, tag, val = row['arn'], row['tag_name'], row['tag_value'] | ||
| resource_tags[arn][tag] = val | ||
| resource_info[arn] = {"acc": row['account_id'], "reg": row['region']} | ||
|
|
||
| print(divider) | ||
| # --- SECTION 5: NORMALIZATION & MIGRATION --- | ||
| print(f"\n{'='*80}\nSECTION 5: NORMALIZATION (LEGACY MAPPING)\n{'='*80}") | ||
| migration_tasks = [] | ||
|
|
||
| # 4. Accounts with NO Hits | ||
| # Note: This logic assumes we want to see which accounts appeared in the CSVs but had no data. | ||
| # To see accounts that never even made it to the CSV, you would need to cross-ref with --list-accounts. | ||
| print(f"\n[SUMMARY STATS]") | ||
| print(f"Total Unique Tags Found : {len(findings_count)}") | ||
| print(f"Total Resource Tag Hits : {total_hits}") | ||
| print(f"Accounts with Hits : {len(all_seen_accounts)}") | ||
| for arn, tags in resource_tags.items(): | ||
| for legacy_key, target_key in legacy_map.items(): | ||
| if legacy_key in tags: | ||
| has_target = target_key in tags | ||
| migration_tasks.append({ | ||
| "arn": arn, | ||
| "account": resource_info[arn]['acc'], | ||
| "legacy_key": legacy_key, | ||
| "legacy_value": tags[legacy_key], | ||
| "target_key": target_key, | ||
| "status": "In Sync" if has_target else "PENDING MIGRATION" | ||
| }) | ||
|
|
||
| pending = [t for t in migration_tasks if t['status'] == "PENDING MIGRATION"] | ||
| print(f"Total Legacy Tags Found : {len(migration_tasks)}") | ||
| print(f"Migration Tasks Pending : {len(pending)}") | ||
| print(f"{'-'*80}") | ||
|
|
||
| # Peak Memory | ||
| mem_mb = round(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024, 2) | ||
| print(f"Analysis Memory Usage : {mem_mb} MB") | ||
| if pending: | ||
| print("Top Pending Migrations (Sample):") | ||
| for p in pending[:5]: | ||
| print(f" [!] {p['arn']}\n Move '{p['legacy_key']}' -> '{p['target_key']}' (Value: {p['legacy_value']})") | ||
|
|
||
| # Export Logic | ||
| if args.output: | ||
| with open(args.output, 'w', newline='') as f: | ||
| writer = csv.DictWriter(f, fieldnames=["TagKey", "Status", "TotalInstances", "UniqueAccounts"]) | ||
| mig_file = f"{args.output}_migration_todo.csv" | ||
| with open(mig_file, 'w', newline='') as f: | ||
| writer = csv.DictWriter(f, fieldnames=["arn", "account", "legacy_key", "legacy_value", "target_key", "status"]) | ||
| writer.writeheader() | ||
| writer.writerows(report_rows) | ||
| print(f"\n[+] Full analysis exported to: {args.output}") | ||
| writer.writerows(migration_tasks) | ||
| print(f"\n[+] Migration report saved to: {mig_file}") | ||
|
|
||
| if __name__ == "__main__": | ||
| from datetime import datetime | ||
| analyze() |