Skip to content

Commit

Permalink
use all regions
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 15, 2026
1 parent 6836315 commit 7b92dbd
Showing 1 changed file with 51 additions and 33 deletions.
84 changes: 51 additions & 33 deletions local-app/python-tools/cross-organization/tag-checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from botocore.exceptions import ClientError
from tqdm import tqdm

__version__ = "1.1.5"
__version__ = "1.1.6"

def get_args():
parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}")
Expand Down Expand Up @@ -49,19 +49,17 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re
m_session = get_session(management_session, acc_id, role_name, partition, region_name, verbose)

if not m_session:
return [], acc_id, "N/A", {"hits": 0, "res": 0, "found": 0, "miss": len(tag_keys), "time": 0, "regions": []}, "Auth Fail"
return [], acc_id, "N/A", {}, "Auth Fail"

# Get Definitive Alias
try:
alias_resp = m_session.client('iam', region_name=region_name).list_account_aliases()
alias = alias_resp.get('AccountAliases', ["N/A"])[0]
except Exception:
alias = "N/A"

if account_regex and not re.search(account_regex, alias, re.IGNORECASE):
return [], acc_id, alias, {"hits": 0, "res": 0, "found": 0, "miss": len(tag_keys), "time": 0, "regions": []}, f"Regex Skip ({alias})"
return [], acc_id, alias, {}, f"Regex Skip ({alias})"

# Discover Enabled Regions
try:
ec2 = m_session.client('ec2', region_name=region_name)
active_regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]
Expand All @@ -70,16 +68,20 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re

acc_start = time.time()
findings = []
unique_resources = set()
tags_found_keys = set()

# Progress Bar Alignment
label = f"{acc_id} ({alias})".ljust(bar_width)
pbar = tqdm(total=len(tag_keys), desc=f"Lane {lane_id} | {label}",
global_resources = set()
global_tags_found = set()
regional_data = []

# Fixed-width header alignment: "Lane 01 | {acc_id} {alias}"
label = f"{acc_id} {alias}".ljust(bar_width)
pbar = tqdm(total=len(tag_keys), desc=f"Lane {lane_id:02d} | {label}",
position=lane_id, leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}')

for key in tag_keys:
tag_key_found_globally = False
for r in active_regions:
reg_hits = 0
reg_resources = set()
client = m_session.client('resourcegroupstaggingapi', region_name=r)
try:
paginator = client.get_paginator('get_resources')
Expand All @@ -91,20 +93,39 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re
"tag_name": key, "tag_value": val, "account_id": acc_id,
"account_alias": alias, "region": r, "arn": arn
})
unique_resources.add(arn)
tags_found_keys.add(key)
global_resources.add(arn)
global_tags_found.add(key)
reg_resources.add(arn)
reg_hits += 1
tag_key_found_globally = True
except ClientError as e:
if "Throttling" in str(e): time.sleep(1)
pbar.update(1)

pbar.close()

# Regional Metrics Breakdown
for r in active_regions:
r_findings = [f for f in findings if f['region'] == r]
r_tags = set([f['tag_name'] for f in r_findings])
r_res = set([f['arn'] for f in r_findings])
regional_data.append({
"region": r,
"hits": len(r_findings),
"unique_resources": len(r_res),
"tags_found": len(r_tags),
"tags_not_found": len(tag_keys) - len(r_tags)
})

metrics = {
"hits": len(findings),
"unique_resources": len(unique_resources),
"tags_found_count": len(tags_found_keys),
"tags_not_found_count": len(tag_keys) - len(tags_found_keys),
"elapsed": round(time.time() - acc_start, 2),
"regions_scanned": active_regions
"global": {
"hits": len(findings),
"unique_resources": len(global_resources),
"tags_found": len(global_tags_found),
"tags_not_found": len(tag_keys) - len(global_tags_found),
"elapsed_sec": round(time.time() - acc_start, 2)
},
"regions": regional_data
}
return findings, acc_id, alias, metrics, "Success"

Expand Down Expand Up @@ -134,9 +155,9 @@ def main():

if args.limit > 0: all_accs = all_accs[:args.limit]

# Calculate Max Alignment Width (Account ID + max potential alias length)
# We estimate alias length from Org Name as a fallback for alignment
max_label_len = max([len(a['Name']) for a in all_accs]) + 15 if all_accs else 40
# Calculate bar width based on ID (12) + Name (alias fallback) + padding
# We add 1 for the space between ID and Alias as requested
max_label_len = max([12 + 1 + len(a['Name']) for a in all_accs]) if all_accs else 40

print(f"\n{'='*80}\nAWS TAG CHECKER v{__version__}\n{'='*80}")
print(f"Workers: {args.max_workers} | Tags: {len(tag_keys)} | Targeted Accounts: {len(all_accs)}")
Expand All @@ -151,14 +172,13 @@ def main():
args.account_regex, args.verbose, max_label_len): acc for i, acc in enumerate(all_accs)}

for future in as_completed(futures):
res, acc_id, alias, metrics, status = future.result()
res, acc_id, alias, m, status = future.result()
if status == "Success":
all_findings.extend(res)
summary_data.append({
"account_id": acc_id, "alias": alias,
"hits": metrics["hits"], "unique_resources": metrics["unique_resources"],
"tags_found": metrics["tags_found_count"], "tags_not_found": metrics["tags_not_found_count"],
"elapsed_sec": metrics["elapsed"], "regions": metrics["regions_scanned"]
"global_metrics": m["global"],
"regional_metrics": m["regions"]
})
else:
overall_pbar.write(f"[-] {acc_id}: {status}")
Expand All @@ -167,19 +187,17 @@ def main():
overall_pbar.close()
print("\n" * (args.max_workers + 1))

# Synchronized Output Files
findings_csv = f"{args.output}_findings_{ts}.csv"
summary_json = f"{args.output}_summary_{ts}.json"
sum_file = f"{args.output}_summary_{ts}.json"
fin_file = f"{args.output}_findings_{ts}.csv"

with open(summary_json, 'w') as f: json.dump(summary_data, f, indent=4)
with open(sum_file, 'w') as f: json.dump(summary_data, f, indent=4)
if all_findings:
with open(findings_csv, 'w', newline='') as f:
with open(fin_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=all_findings[0].keys())
writer.writeheader(); writer.writerows(all_findings)

print(f"[+] Scan Complete in {round(time.time()-start_overall, 2)}s")
print(f"[+] Summary: {summary_json}")
print(f"[+] Findings: {findings_csv}")
print(f"[+] Summary: {sum_file} | Findings: {fin_file}")

if __name__ == "__main__":
main()

0 comments on commit 7b92dbd

Please sign in to comment.