Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 16, 2026
1 parent dba0a12 commit 9610bb7
Show file tree
Hide file tree
Showing 2 changed files with 325 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/bin/env python
import csv
import argparse
import sys
import glob
import os
import resource
from collections import Counter, defaultdict

__version__ = "1.0.1"

def get_args():
parser = argparse.ArgumentParser(description=f"AWS Tag Data Analyzer v{__version__}")
parser.add_argument("--tags-file", required=True, help="Original CSV with TagKey and Status")
parser.add_argument("--findings-file", nargs='+', required=True, help="One or more CSV findings files")
parser.add_argument("--output", help="Optional CSV file to save analysis results")
return parser.parse_args()

def analyze():
args = get_args()
start_time = datetime.now()

# 1. Load Tag Metadata (Status)
tag_metadata = {}
try:
with open(args.tags_file, mode='r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
key = row.get('TagKey', '').strip()
status = row.get('Status', 'Unknown').strip()
if key:
tag_metadata[key] = status
except Exception as e:
print(f"[!] Error reading tags-file: {e}")
sys.exit(1)

# 2. Process Findings
findings_count = Counter()
account_map = defaultdict(set)
all_seen_accounts = set()
total_hits = 0
max_tag_len = 20 # Minimum starting width

files_to_process = []
for pattern in args.findings_file:
files_to_process.extend(glob.glob(pattern))

if not files_to_process:
print("[!] No findings files found.")
sys.exit(1)

print(f"[*] Analyzing {len(files_to_process)} findings files...")

for file in files_to_process:
try:
with open(file, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
tag_name = row.get('tag_name', '').strip()
acc_id = row.get('account_id', '').strip()
acc_alias = row.get('account_alias', '').strip()

if tag_name:
findings_count[tag_name] += 1
max_tag_len = max(max_tag_len, len(tag_name))
if acc_id:
account_map[tag_name].add(acc_id)
all_seen_accounts.add(f"{acc_id} ({acc_alias})")
total_hits += 1
except Exception as e:
print(f"[!] Error reading {file}: {e}")

# 3. Final Table Formatting
col1_width = max_tag_len + 2
header = f"{'Tag Name'.ljust(col1_width)} | {'Status':<10} | {'Instances':<12} | {'Unique Accounts'}"
divider = "-" * (len(header) + 5)

print(f"\n{divider}")
print(header)
print(divider)

report_rows = []
sorted_tags = sorted(findings_count.items(), key=lambda x: x[1], reverse=True)

for tag, count in sorted_tags:
status = tag_metadata.get(tag, "Not in List")
unique_accs = len(account_map[tag])
print(f"{tag.ljust(col1_width)} | {status:<10} | {count:<12} | {unique_accs}")

report_rows.append({
"TagKey": tag,
"Status": status,
"TotalInstances": count,
"UniqueAccounts": unique_accs
})

print(divider)

# 4. Accounts with NO Hits
# Note: This logic assumes we want to see which accounts appeared in the CSVs but had no data.
# To see accounts that never even made it to the CSV, you would need to cross-ref with --list-accounts.
print(f"\n[SUMMARY STATS]")
print(f"Total Unique Tags Found : {len(findings_count)}")
print(f"Total Resource Tag Hits : {total_hits}")
print(f"Accounts with Hits : {len(all_seen_accounts)}")

# Peak Memory
mem_mb = round(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024, 2)
print(f"Analysis Memory Usage : {mem_mb} MB")

if args.output:
with open(args.output, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["TagKey", "Status", "TotalInstances", "UniqueAccounts"])
writer.writeheader()
writer.writerows(report_rows)
print(f"\n[+] Full analysis exported to: {args.output}")

if __name__ == "__main__":
from datetime import datetime
analyze()
205 changes: 205 additions & 0 deletions local-app/python-tools/cross-organization/tag-checker/tag-checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#!/bin/env python

import boto3
import csv
import json
import argparse
import sys
import time
import re
import os
import resource
import threading
from datetime import datetime, timezone, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from botocore.exceptions import ClientError
from botocore.credentials import RefreshableCredentials
from botocore.session import get_session as get_botocore_session
from tqdm import tqdm

__version__ = "1.1.17"

# Counter for global sequence tracking
ACCOUNT_COUNTER = 0
COUNTER_LOCK = threading.Lock()

def get_args():
parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}")
parser.add_argument("--role-name", required=False, help="Role to assume in member accounts")
parser.add_argument("--region", required=True, help="Management account region (e.g., us-gov-east-1)")
parser.add_argument("--profile", required=True, help="AWS CLI profile for Management Account")
parser.add_argument("--tags-file", required=False, help="CSV file with TagKey, Type, Status, etc.")
parser.add_argument("--max-workers", type=int, default=8, help="Max concurrent account scans")
parser.add_argument("--account-regex", help="Regex to filter accounts by alias")
parser.add_argument("--region-regex", help="Regex to filter regions (e.g., '^us-')")
parser.add_argument("--accounts-from", help="File of Account IDs to process")
parser.add_argument("--output", default="tag_checker", help="Prefix for output files")
parser.add_argument("--limit", type=int, default=0, help="Limit total accounts processed")
parser.add_argument("--verbose", action="store_true", help="Enable detailed logging")
parser.add_argument("--list-accounts", action="store_true", help="List Account IDs and exit")
return parser.parse_args()

def create_refreshable_session(profile_name, region_name):
bc_session = get_botocore_session()
def refresh_credentials():
temp_session = boto3.Session(profile_name=profile_name, region_name=region_name)
creds = temp_session.get_credentials()
return {
"access_key": creds.access_key,
"secret_key": creds.secret_key,
"token": creds.token,
"expiry_time": creds._expiry_time.isoformat() if creds._expiry_time else (datetime.now(timezone.utc) + timedelta(hours=1)).isoformat()
}
session_creds = RefreshableCredentials.create_from_metadata(
metadata=refresh_credentials(), refresh_using=refresh_credentials, method="sts-assume-role"
)
bc_session._credentials = session_creds
bc_session.set_config_variable("region", region_name)
return boto3.Session(botocore_session=bc_session)

def get_member_session(management_session, account_id, role_name, partition, region_name, verbose):
sts = management_session.client('sts', region_name=region_name)
role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}"
try:
response = sts.assume_role(RoleArn=role_arn, RoleSessionName="TagDiscoveryScanner", DurationSeconds=3600)
c = response['Credentials']
return boto3.Session(aws_access_key_id=c['AccessKeyId'], aws_secret_access_key=c['SecretAccessKey'],
aws_session_token=c['SessionToken'], region_name=region_name)
except Exception as e:
if verbose: tqdm.write(f"[!] Auth Error for {account_id}: {str(e)}")
return None

def scan_account(account, management_session, role_name, partition, tag_keys, active_tag_keys, region_name, lane_id, account_regex, region_regex_str, verbose, bar_width):
global ACCOUNT_COUNTER
with COUNTER_LOCK:
ACCOUNT_COUNTER += 1
current_index = ACCOUNT_COUNTER

acc_id = account['Id']
m_session = get_member_session(management_session, acc_id, role_name, partition, region_name, verbose)
if not m_session: return [], acc_id, "N/A", {}, "Auth Fail"

try:
alias_resp = m_session.client('iam', region_name=region_name).list_account_aliases()
alias = alias_resp.get('AccountAliases', ["N/A"])[0]
except Exception: alias = "N/A"

if account_regex and not re.search(account_regex, alias, re.IGNORECASE):
return [], acc_id, alias, {}, f"Regex Skip ({alias})"

try:
ec2 = m_session.client('ec2', region_name=region_name)
all_regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]
active_regions = [r for r in all_regions if re.search(region_regex_str, r, re.IGNORECASE)] if region_regex_str else all_regions
except: active_regions = [region_name]

acc_start, findings, global_resources, global_tags_found, regional_metrics = time.time(), [], set(), set(), []

# FORMAT: {lane_id} | {index} | {acc_id} {alias}
label = f"{lane_id:02d} | {current_index:03d} | {acc_id} {alias}".ljust(bar_width)
pbar = tqdm(total=len(tag_keys), desc=label, position=lane_id, leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}')

for key in tag_keys:
for r in active_regions:
r_start, r_hits, r_res_found, r_tags_found = time.perf_counter(), 0, set(), set()
client = m_session.client('resourcegroupstaggingapi', region_name=r)
try:
paginator = client.get_paginator('get_resources')
for page in paginator.paginate(TagFilters=[{'Key': key}]):
for mapping in page.get('ResourceTagMappingList', []):
arn = mapping['ResourceARN']
val = next((t['Value'] for t in mapping['Tags'] if t['Key'] == key), "N/A")
findings.append({"tag_name": key, "tag_value": val, "account_id": acc_id, "account_alias": alias, "region": r, "arn": arn})
global_resources.add(arn); global_tags_found.add(key); r_res_found.add(arn); r_tags_found.add(key); r_hits += 1
except ClientError as e:
if "Throttling" in str(e): time.sleep(1)

r_elapsed = round(time.perf_counter() - r_start, 4)
r_entry = next((m for m in regional_metrics if m['region'] == r), None)
if not r_entry:
regional_metrics.append({"region": r, "hits": r_hits, "unique_resources": len(r_res_found), "tags_found_count": len(r_tags_found), "tags_found_list": sorted(list(r_tags_found)), "tags_found_list_active": sorted(list(r_tags_found.intersection(active_tag_keys))), "tags_not_found_count": len(tag_keys) - len(r_tags_found), "elapsed_sec": r_elapsed})
else:
r_entry['hits'] += r_hits; current_tags = set(r_entry['tags_found_list']) | r_tags_found
r_entry['tags_found_list'] = sorted(list(current_tags)); r_entry['tags_found_list_active'] = sorted(list(current_tags.intersection(active_tag_keys)))
r_entry['tags_found_count'] = len(current_tags); r_entry['tags_not_found_count'] = len(tag_keys) - len(current_tags); r_entry['elapsed_sec'] = round(r_entry['elapsed_sec'] + r_elapsed, 4)
pbar.update(1)

pbar.close()
metrics = {"global": {"hits": len(findings), "unique_resources": len(global_resources), "tags_found_count": len(global_tags_found), "tags_found_list": sorted(list(global_tags_found)), "tags_found_list_active": sorted(list(global_tags_found.intersection(active_tag_keys))), "tags_not_found_count": len(tag_keys) - len(global_tags_found), "elapsed_sec": round(time.time() - acc_start, 2)}, "regions": regional_metrics}
return findings, acc_id, alias, metrics, "Success"

def main():
args = get_args()
cmd_line = " ".join(sys.argv)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
start_iso, start_ts = datetime.now().isoformat(), time.time()
try:
session = create_refreshable_session(args.profile, args.region)
org = session.client('organizations', region_name=args.region)
partition = session.client('sts', region_name=args.region).get_caller_identity()['Arn'].split(':')[1]

unique_accounts = {}
paginator = org.get_paginator('list_accounts')
for page in paginator.paginate():
for a in page['Accounts']:
if a['Status'] == 'ACTIVE': unique_accounts[a['Id']] = a

if args.list_accounts:
for aid in sorted(unique_accounts.keys()): print(aid)
sys.exit(0)

tag_keys, active_tag_keys = [], set()
with open(args.tags_file, mode='r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f, skipinitialspace=True)
for row in reader:
key = row.get('TagKey', '').strip()
if key:
tag_keys.append(key)
if row.get('Status', '').strip().lower() == 'active': active_tag_keys.add(key)

target_ids = []
if args.accounts_from:
with open(args.accounts_from, 'r') as f: target_ids = [l.strip() for l in f if l.strip()]

to_process = [v for k, v in unique_accounts.items() if not target_ids or k in target_ids]
if args.limit > 0: to_process = to_process[:args.limit]

# UI Width Calculation: "01 | 001 | 123456789012 MyAlias"
max_label_len = max([3 + 3 + 3 + 12 + 1 + len(a['Name']) for a in to_process]) + 2 if to_process else 50

print(f"\n{'='*85}\nAWS TAG CHECKER v{__version__}\n{'='*85}")
print(f"Profile: {args.profile} | Region: {args.region} | Role: {args.role_name}")
print(f"Tags Read: {len(tag_keys)} ({len(active_tag_keys)} active)")
print(f"Accounts Targeted: {len(to_process)} (Unique Total: {len(unique_accounts)})")
print(f"Thread Count: {args.max_workers}\n{'='*85}\n")

all_findings, account_results = [], []
overall_pbar = tqdm(total=len(to_process), desc="Overall Progress", position=0)

with ThreadPoolExecutor(max_workers=args.max_workers) as executor:
try:
futures = {executor.submit(scan_account, acc, session, args.role_name, partition, tag_keys, active_tag_keys, args.region, (i % args.max_workers) + 1, args.account_regex, args.region_regex, args.verbose, max_label_len): acc for i, acc in enumerate(to_process)}
for future in as_completed(futures):
res, acc_id, alias, m, status = future.result()
if status == "Success":
all_findings.extend(res); account_results.append({"account_id": acc_id, "alias": alias, "global_metrics": m["global"], "regional_metrics": m["regions"]})
overall_pbar.update(1)
except KeyboardInterrupt: executor.shutdown(wait=False, cancel_futures=True); sys.exit(130)

overall_pbar.close(); print("\n" * (args.max_workers + 1))
mem_mb = round(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024, 2)
total_unique_res = len(set(f['arn'] for f in all_findings))
all_found_keys = set(f['tag_name'] for f in all_findings)

output_summary = {"summary": {"version": __version__, "command_line": cmd_line, "aws_accounts_scanned": len(account_results), "tags_read_count": len(tag_keys), "execution_start": start_iso, "execution_end": datetime.now().isoformat(), "elapsed_sec_total": round(time.time() - start_ts, 2), "max_memory_mb": mem_mb, "total_hits": sum(a['global_metrics']['hits'] for a in account_results), "total_unique_resources": total_unique_res, "total_tags_found_count": len(all_found_keys)}, "accounts": account_results}

sum_f, fin_f = f"{args.output}_summary_{ts}.json", f"{args.output}_findings_{ts}.csv"
with open(sum_f, 'w') as f: json.dump(output_summary, f, indent=4)
if all_findings:
with open(fin_f, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=all_findings[0].keys()); writer.writeheader(); writer.writerows(all_findings)
print(f"[+] Summary: {sum_f}\n[+] Findings: {fin_f}")
except KeyboardInterrupt: sys.exit(130)

if __name__ == "__main__":
main()

0 comments on commit 9610bb7

Please sign in to comment.