Skip to content

Commit

Permalink
restore capability
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 29, 2026
1 parent 7fe6af8 commit 68d17d7
Showing 1 changed file with 94 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,73 @@
import os
import resource
import json
import re
from collections import Counter, defaultdict
from datetime import datetime
import re
from difflib import SequenceMatcher

__version__ = "1.0.5"
__version__ = "1.0.7"

def get_args():
parser = argparse.ArgumentParser(description=f"AWS Tag Data Analyzer v{__version__}")
parser.add_argument("--tags-file", required=True, help="Inventory of all tags")
parser.add_argument("--required-tags-file", required=True, help="CSV with mandatory TagKey list")
parser.add_argument("--tags-file", required=True, help="Original CSV with TagKey, Status, etc.")
parser.add_argument("--required-tags-file", help="CSV with mandatory TagKey list")
parser.add_argument("--findings-file", nargs='+', required=True, help="Scanner CSV findings")
parser.add_argument("--legacy-map", help="JSON file with {'LegacyKey': 'NewKey'} mapping")
parser.add_argument("--service-map", help="Path to aws_service_map.json (optional)")
parser.add_argument("--output", help="Output prefix")
parser.add_argument("--similarity-threshold", type=float, default=0.85)
return parser.parse_args()

def load_service_map(map_path):
"""Loads mapping from local JSON if available, else returns minimal defaults."""
if map_path and os.path.exists(map_path):
with open(map_path, 'r') as f: return json.load(f)
return {"ec2": "Amazon EC2", "s3": "Amazon S3", "lambda": "AWS Lambda", "iam": "AWS IAM"}

def get_similarity(a, b):
a_norm = re.sub(r'[:_\-\s]', '', a.lower())
b_norm = re.sub(r'[:_\-\s]', '', b.lower())
if a_norm == b_norm: return 1.0
return SequenceMatcher(None, a, b).ratio()

def analyze():
args = get_args()
start_ts = datetime.now()

# 1. Load Mapping & Required Tags
# --- LOAD METADATA ---
input_tags = {}
stats = {"total": 0, "active": 0, "inactive": 0, "aws": 0, "cust": 0}
with open(args.tags_file, mode='r', encoding='utf-8-sig') as f:
for row in csv.DictReader(f):
k = row.get('TagKey', '').strip().replace('"', '')
if not k: continue
s = row.get('Status', 'Unknown').strip().lower()
input_tags[k] = s
stats["total"] += 1
if s == 'active': stats["active"] += 1
elif s == 'inactive': stats["inactive"] += 1
if k.lower().startswith('aws:'): stats["aws"] += 1
else: stats["cust"] += 1

required_keys = set()
legacy_map = {}

if args.required_tags_file:
with open(args.required_tags_file, mode='r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
required_keys = {row.get('TagKey', '').strip() for row in reader if row.get('TagKey')}
with open(args.required_tags_file, 'r', encoding='utf-8-sig') as f:
required_keys = {row['TagKey'].strip() for row in csv.DictReader(f) if row.get('TagKey')}

legacy_map = {}
if args.legacy_map:
with open(args.legacy_map, 'r') as f:
legacy_map = json.load(f)
with open(args.legacy_map, 'r') as f: legacy_map = json.load(f)

# 2. Process Findings
resource_tags = defaultdict(dict) # ARN -> {Key: Value}
resource_info = {} # ARN -> {Account, Region}
service_map = load_service_map(args.service_map)

# --- PROCESS FINDINGS ---
findings_count = Counter()
tag_values = defaultdict(Counter)
account_map = defaultdict(set)
resource_tags = defaultdict(dict) # ARN -> {Key: Val}
resource_info = {} # ARN -> {Acc, Reg}
service_counts = Counter()

files = []
for p in args.findings_file: files.extend(glob.glob(p))
Expand All @@ -49,44 +82,50 @@ def analyze():
with open(file, mode='r', encoding='utf-8') as f:
for row in csv.DictReader(f):
arn, tag, val = row['arn'], row['tag_name'], row['tag_value']
acc, reg = row['account_id'], row['region']

findings_count[tag] += 1
tag_values[tag][val] += 1
account_map[tag].add(acc)
resource_tags[arn][tag] = val
resource_info[arn] = {"acc": row['account_id'], "reg": row['region']}
resource_info[arn] = {"acc": acc, "reg": reg}

svc = arn.split(':')[2] if len(arn.split(':')) > 2 else "unknown"
service_counts[svc] = service_counts.get(svc, set())
service_counts[svc].add(arn)

# --- SECTION 5: NORMALIZATION & MIGRATION ---
print(f"\n{'='*80}\nSECTION 5: NORMALIZATION (LEGACY MAPPING)\n{'='*80}")
migration_tasks = []

for arn, tags in resource_tags.items():
for legacy_key, target_key in legacy_map.items():
if legacy_key in tags:
has_target = target_key in tags
migration_tasks.append({
"arn": arn,
"account": resource_info[arn]['acc'],
"legacy_key": legacy_key,
"legacy_value": tags[legacy_key],
"target_key": target_key,
"status": "In Sync" if has_target else "PENDING MIGRATION"
})

pending = [t for t in migration_tasks if t['status'] == "PENDING MIGRATION"]
print(f"Total Legacy Tags Found : {len(migration_tasks)}")
print(f"Migration Tasks Pending : {len(pending)}")
print(f"{'-'*80}")

if pending:
print("Top Pending Migrations (Sample):")
for p in pending[:5]:
print(f" [!] {p['arn']}\n Move '{p['legacy_key']}' -> '{p['target_key']}' (Value: {p['legacy_value']})")

# Export Logic
if args.output:
mig_file = f"{args.output}_migration_todo.csv"
with open(mig_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["arn", "account", "legacy_key", "legacy_value", "target_key", "status"])
writer.writeheader()
writer.writerows(migration_tasks)
print(f"\n[+] Migration report saved to: {mig_file}")

if __name__ == "__main__":
analyze()
# Convert service sets to counts
svc_final_counts = {k: len(v) for k, v in service_counts.items()}
max_tag_len = max([len(t) for t in findings_count.keys()] + [20])
col1 = max_tag_len + 2
div = "=" * (col1 + 65)

# --- SECTION 1 & 2: SUMMARY & VALUES ---
print(f"\n{div}\nSECTION 1: GLOBAL SUMMARY\n{div}")
print(f"Input Tags: {stats['total']} | Active: {stats['active']} | Found: {len(findings_count)}")
print(f"Total Resource Hits: {sum(findings_count.values())} | Accounts with Hits: {len(set().union(*account_map.values()))}")

print(f"\n{div}\nSECTION 2: TAG VALUE DISTRIBUTION (TOP 5)\n{div}")
for tag in sorted(findings_count.keys()):
vals = ", ".join([f"{v}({c})" for v, c in tag_values[tag].most_common(5)])
print(f"{tag.ljust(col1)} | {vals}")

# --- SECTION 3: SIMILARITY ---
print(f"\n{div}\nSECTION 3: SUSPECTED DUPLICATES (TYPOS/CASE)\n{div}")
tags = sorted(findings_count.keys())
for i in range(len(tags)):
for j in range(i + 1, len(tags)):
if get_similarity(tags[i], tags[j]) >= args.similarity_threshold:
print(f"[!] {tags[i]} <-> {tags[j]}")

# --- SECTION 4: COVERAGE ---
if required_keys:
print(f"\n{div}\nSECTION 4: COVERAGE & COMPLIANCE\n{div}")
non_compliant = []
for arn, tags in resource_tags.items():
missing = required_keys - set(tags.keys())
if missing: non_compliant.append((arn, missing))
print(f"Total Resources: {len(resource_tags)} | Fully Compliant: {len(resource_tags)-len(non_compliant)}")
print(f"Compliance Rate: {((len(resource_tags)-len(non_compliant))/len(resource_tags)*100):.2f}%")

# --- SECTION 5: LEGACY M

0 comments on commit 68d17d7

Please sign in to comment.