diff --git a/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py b/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py index 45d09ff2..c2cae1a1 100755 --- a/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py +++ b/local-app/python-tools/cross-organization/tag-checker/analyze-tag-data.py @@ -12,7 +12,14 @@ from datetime import datetime from difflib import SequenceMatcher -__version__ = "1.0.8" +__version__ = "1.0.9" + +# Services that do NOT have a resource type in the 6th ARN field +# They use: arn:aws:service:region:account:resource-name +SERVICES_WITHOUT_TYPES = [ + "s3", "sns", "sqs", "codepipeline", "codebuild", + "cloudwatch", "events", "logs", "sns", "states" +] def get_args(): parser = argparse.ArgumentParser(description=f"AWS Tag Data Analyzer v{__version__}") @@ -27,21 +34,30 @@ def get_args(): def load_service_map(map_path): if map_path and os.path.exists(map_path): - with open(map_path, 'r') as f: return json.load(f) + try: + with open(map_path, 'r') as f: return json.load(f) + except: return None return None def parse_arn_details(arn): - """Extracts service and primary resource type (e.g., ec2 vpc).""" + """Extracts service and resource type, handling services with no type segment.""" try: parts = arn.split(':') - if len(parts) < 6: return "unknown", "unknown" + if len(parts) < 6: return "unknown", "" + service = parts[2] resource_part = parts[5] - # Break at first / or : to get type (e.g., volume/vol-123 -> volume) + + # If the service is known to skip the type field, just return the service + if service in SERVICES_WITHOUT_TYPES: + return service, "" + + # Otherwise, find the type (break at first / or :) + # Example: 'ec2' 'instance/i-123' -> 'ec2' 'instance' res_type = re.split(r'[:/]', resource_part)[0] return service, res_type except: - return "unknown", "unknown" + return "unknown", "" def get_similarity(a, b): a_norm = re.sub(r'[:_\-\s]', '', a.lower()) @@ -80,7 +96,6 @@ def analyze(): tag_values = defaultdict(Counter) account_map = defaultdict(set) resource_tags = defaultdict(dict) - resource_info = {} service_distribution = Counter() files = [] @@ -90,22 +105,22 @@ def analyze(): with open(file, mode='r', encoding='utf-8') as f: for row in csv.DictReader(f): arn, tag, val = row['arn'], row['tag_name'], row['tag_value'] - acc, reg = row['account_id'], row['region'] + acc = row['account_id'] findings_count[tag] += 1 tag_values[tag][val] += 1 account_map[tag].add(acc) resource_tags[arn][tag] = val - resource_info[arn] = {"acc": acc, "reg": reg} svc, r_type = parse_arn_details(arn) - service_distribution[f"{svc} {r_type}"] += 1 + display_name = f"{svc} {r_type}".strip() + service_distribution[display_name] += 1 max_tag_len = max([len(t) for t in findings_count.keys()] + [20]) col1 = max_tag_len + 2 div = "=" * (col1 + 65) - # --- SECTIONS 1-5 (Restored) --- + # --- SECTIONS 1-5 (Summary, Values, Similarity, Compliance, Legacy) --- print(f"\n{div}\nSECTION 1: GLOBAL SUMMARY\n{div}") print(f"Input Tags: {stats['total']} | Active: {stats['active']} | Found: {len(findings_count)}") print(f"Total Hits: {sum(findings_count.values())} | Accounts: {len(set().union(*account_map.values()))}") @@ -116,11 +131,11 @@ def analyze(): print(f"{tag.ljust(col1)} | {vals}") print(f"\n{div}\nSECTION 3: SUSPECTED DUPLICATES (TYPOS/CASE)\n{div}") - tags = sorted(findings_count.keys()) - for i in range(len(tags)): - for j in range(i + 1, len(tags)): - if get_similarity(tags[i], tags[j]) >= args.similarity_threshold: - print(f"[!] {tags[i]} <-> {tags[j]}") + tags_list = sorted(findings_count.keys()) + for i in range(len(tags_list)): + for j in range(i + 1, len(tags_list)): + if get_similarity(tags_list[i], tags_list[j]) >= args.similarity_threshold: + print(f"[!] {tags_list[i]} <-> {tags_list[j]}") if required_keys: print(f"\n{div}\nSECTION 4: COVERAGE & COMPLIANCE\n{div}")