From 68d17d78981b2f78f3e3710bc7f99b230074f4e1 Mon Sep 17 00:00:00 2001 From: badra001 Date: Thu, 29 Jan 2026 15:05:50 -0500 Subject: [PATCH] restore capability --- .../tag-checker/analyze-tag-data.py | 149 +++++++++++------- 1 file changed, 94 insertions(+), 55 deletions(-) diff --git a/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py b/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py index fc2f4e8d..03ea852e 100755 --- a/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py +++ b/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py @@ -7,40 +7,73 @@ import os import resource import json +import re from collections import Counter, defaultdict from datetime import datetime -import re +from difflib import SequenceMatcher -__version__ = "1.0.5" +__version__ = "1.0.7" def get_args(): parser = argparse.ArgumentParser(description=f"AWS Tag Data Analyzer v{__version__}") - parser.add_argument("--tags-file", required=True, help="Inventory of all tags") - parser.add_argument("--required-tags-file", required=True, help="CSV with mandatory TagKey list") + parser.add_argument("--tags-file", required=True, help="Original CSV with TagKey, Status, etc.") + parser.add_argument("--required-tags-file", help="CSV with mandatory TagKey list") parser.add_argument("--findings-file", nargs='+', required=True, help="Scanner CSV findings") parser.add_argument("--legacy-map", help="JSON file with {'LegacyKey': 'NewKey'} mapping") + parser.add_argument("--service-map", help="Path to aws_service_map.json (optional)") parser.add_argument("--output", help="Output prefix") + parser.add_argument("--similarity-threshold", type=float, default=0.85) return parser.parse_args() +def load_service_map(map_path): + """Loads mapping from local JSON if available, else returns minimal defaults.""" + if map_path and os.path.exists(map_path): + with open(map_path, 'r') as f: return json.load(f) + return {"ec2": "Amazon EC2", "s3": "Amazon S3", "lambda": "AWS Lambda", "iam": "AWS IAM"} + +def get_similarity(a, b): + a_norm = re.sub(r'[:_\-\s]', '', a.lower()) + b_norm = re.sub(r'[:_\-\s]', '', b.lower()) + if a_norm == b_norm: return 1.0 + return SequenceMatcher(None, a, b).ratio() + def analyze(): args = get_args() + start_ts = datetime.now() - # 1. Load Mapping & Required Tags + # --- LOAD METADATA --- + input_tags = {} + stats = {"total": 0, "active": 0, "inactive": 0, "aws": 0, "cust": 0} + with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: + for row in csv.DictReader(f): + k = row.get('TagKey', '').strip().replace('"', '') + if not k: continue + s = row.get('Status', 'Unknown').strip().lower() + input_tags[k] = s + stats["total"] += 1 + if s == 'active': stats["active"] += 1 + elif s == 'inactive': stats["inactive"] += 1 + if k.lower().startswith('aws:'): stats["aws"] += 1 + else: stats["cust"] += 1 + required_keys = set() - legacy_map = {} - if args.required_tags_file: - with open(args.required_tags_file, mode='r', encoding='utf-8-sig') as f: - reader = csv.DictReader(f) - required_keys = {row.get('TagKey', '').strip() for row in reader if row.get('TagKey')} - + with open(args.required_tags_file, 'r', encoding='utf-8-sig') as f: + required_keys = {row['TagKey'].strip() for row in csv.DictReader(f) if row.get('TagKey')} + + legacy_map = {} if args.legacy_map: - with open(args.legacy_map, 'r') as f: - legacy_map = json.load(f) + with open(args.legacy_map, 'r') as f: legacy_map = json.load(f) - # 2. Process Findings - resource_tags = defaultdict(dict) # ARN -> {Key: Value} - resource_info = {} # ARN -> {Account, Region} + service_map = load_service_map(args.service_map) + + # --- PROCESS FINDINGS --- + findings_count = Counter() + tag_values = defaultdict(Counter) + account_map = defaultdict(set) + resource_tags = defaultdict(dict) # ARN -> {Key: Val} + resource_info = {} # ARN -> {Acc, Reg} + service_counts = Counter() files = [] for p in args.findings_file: files.extend(glob.glob(p)) @@ -49,44 +82,50 @@ def analyze(): with open(file, mode='r', encoding='utf-8') as f: for row in csv.DictReader(f): arn, tag, val = row['arn'], row['tag_name'], row['tag_value'] + acc, reg = row['account_id'], row['region'] + + findings_count[tag] += 1 + tag_values[tag][val] += 1 + account_map[tag].add(acc) resource_tags[arn][tag] = val - resource_info[arn] = {"acc": row['account_id'], "reg": row['region']} + resource_info[arn] = {"acc": acc, "reg": reg} + + svc = arn.split(':')[2] if len(arn.split(':')) > 2 else "unknown" + service_counts[svc] = service_counts.get(svc, set()) + service_counts[svc].add(arn) - # --- SECTION 5: NORMALIZATION & MIGRATION --- - print(f"\n{'='*80}\nSECTION 5: NORMALIZATION (LEGACY MAPPING)\n{'='*80}") - migration_tasks = [] - - for arn, tags in resource_tags.items(): - for legacy_key, target_key in legacy_map.items(): - if legacy_key in tags: - has_target = target_key in tags - migration_tasks.append({ - "arn": arn, - "account": resource_info[arn]['acc'], - "legacy_key": legacy_key, - "legacy_value": tags[legacy_key], - "target_key": target_key, - "status": "In Sync" if has_target else "PENDING MIGRATION" - }) - - pending = [t for t in migration_tasks if t['status'] == "PENDING MIGRATION"] - print(f"Total Legacy Tags Found : {len(migration_tasks)}") - print(f"Migration Tasks Pending : {len(pending)}") - print(f"{'-'*80}") - - if pending: - print("Top Pending Migrations (Sample):") - for p in pending[:5]: - print(f" [!] {p['arn']}\n Move '{p['legacy_key']}' -> '{p['target_key']}' (Value: {p['legacy_value']})") - - # Export Logic - if args.output: - mig_file = f"{args.output}_migration_todo.csv" - with open(mig_file, 'w', newline='') as f: - writer = csv.DictWriter(f, fieldnames=["arn", "account", "legacy_key", "legacy_value", "target_key", "status"]) - writer.writeheader() - writer.writerows(migration_tasks) - print(f"\n[+] Migration report saved to: {mig_file}") - -if __name__ == "__main__": - analyze() + # Convert service sets to counts + svc_final_counts = {k: len(v) for k, v in service_counts.items()} + max_tag_len = max([len(t) for t in findings_count.keys()] + [20]) + col1 = max_tag_len + 2 + div = "=" * (col1 + 65) + + # --- SECTION 1 & 2: SUMMARY & VALUES --- + print(f"\n{div}\nSECTION 1: GLOBAL SUMMARY\n{div}") + print(f"Input Tags: {stats['total']} | Active: {stats['active']} | Found: {len(findings_count)}") + print(f"Total Resource Hits: {sum(findings_count.values())} | Accounts with Hits: {len(set().union(*account_map.values()))}") + + print(f"\n{div}\nSECTION 2: TAG VALUE DISTRIBUTION (TOP 5)\n{div}") + for tag in sorted(findings_count.keys()): + vals = ", ".join([f"{v}({c})" for v, c in tag_values[tag].most_common(5)]) + print(f"{tag.ljust(col1)} | {vals}") + + # --- SECTION 3: SIMILARITY --- + print(f"\n{div}\nSECTION 3: SUSPECTED DUPLICATES (TYPOS/CASE)\n{div}") + tags = sorted(findings_count.keys()) + for i in range(len(tags)): + for j in range(i + 1, len(tags)): + if get_similarity(tags[i], tags[j]) >= args.similarity_threshold: + print(f"[!] {tags[i]} <-> {tags[j]}") + + # --- SECTION 4: COVERAGE --- + if required_keys: + print(f"\n{div}\nSECTION 4: COVERAGE & COMPLIANCE\n{div}") + non_compliant = [] + for arn, tags in resource_tags.items(): + missing = required_keys - set(tags.keys()) + if missing: non_compliant.append((arn, missing)) + print(f"Total Resources: {len(resource_tags)} | Fully Compliant: {len(resource_tags)-len(non_compliant)}") + print(f"Compliance Rate: {((len(resource_tags)-len(non_compliant))/len(resource_tags)*100):.2f}%") + + # --- SECTION 5: LEGACY M