diff --git a/local-app/python-tools/cross-organization/tag-checker.py b/local-app/python-tools/cross-organization/tag-checker.py index 191c214e..3dd15601 100755 --- a/local-app/python-tools/cross-organization/tag-checker.py +++ b/local-app/python-tools/cross-organization/tag-checker.py @@ -12,7 +12,7 @@ from botocore.exceptions import ClientError from tqdm import tqdm -__version__ = "1.1.5" +__version__ = "1.1.6" def get_args(): parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}") @@ -49,9 +49,8 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re m_session = get_session(management_session, acc_id, role_name, partition, region_name, verbose) if not m_session: - return [], acc_id, "N/A", {"hits": 0, "res": 0, "found": 0, "miss": len(tag_keys), "time": 0, "regions": []}, "Auth Fail" + return [], acc_id, "N/A", {}, "Auth Fail" - # Get Definitive Alias try: alias_resp = m_session.client('iam', region_name=region_name).list_account_aliases() alias = alias_resp.get('AccountAliases', ["N/A"])[0] @@ -59,9 +58,8 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re alias = "N/A" if account_regex and not re.search(account_regex, alias, re.IGNORECASE): - return [], acc_id, alias, {"hits": 0, "res": 0, "found": 0, "miss": len(tag_keys), "time": 0, "regions": []}, f"Regex Skip ({alias})" + return [], acc_id, alias, {}, f"Regex Skip ({alias})" - # Discover Enabled Regions try: ec2 = m_session.client('ec2', region_name=region_name) active_regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] @@ -70,16 +68,20 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re acc_start = time.time() findings = [] - unique_resources = set() - tags_found_keys = set() - - # Progress Bar Alignment - label = f"{acc_id} ({alias})".ljust(bar_width) - pbar = tqdm(total=len(tag_keys), desc=f"Lane {lane_id} | {label}", + global_resources = set() + global_tags_found = set() + regional_data = [] + + # Fixed-width header alignment: "Lane 01 | {acc_id} {alias}" + label = f"{acc_id} {alias}".ljust(bar_width) + pbar = tqdm(total=len(tag_keys), desc=f"Lane {lane_id:02d} | {label}", position=lane_id, leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}') for key in tag_keys: + tag_key_found_globally = False for r in active_regions: + reg_hits = 0 + reg_resources = set() client = m_session.client('resourcegroupstaggingapi', region_name=r) try: paginator = client.get_paginator('get_resources') @@ -91,20 +93,39 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re "tag_name": key, "tag_value": val, "account_id": acc_id, "account_alias": alias, "region": r, "arn": arn }) - unique_resources.add(arn) - tags_found_keys.add(key) + global_resources.add(arn) + global_tags_found.add(key) + reg_resources.add(arn) + reg_hits += 1 + tag_key_found_globally = True except ClientError as e: if "Throttling" in str(e): time.sleep(1) pbar.update(1) pbar.close() + + # Regional Metrics Breakdown + for r in active_regions: + r_findings = [f for f in findings if f['region'] == r] + r_tags = set([f['tag_name'] for f in r_findings]) + r_res = set([f['arn'] for f in r_findings]) + regional_data.append({ + "region": r, + "hits": len(r_findings), + "unique_resources": len(r_res), + "tags_found": len(r_tags), + "tags_not_found": len(tag_keys) - len(r_tags) + }) + metrics = { - "hits": len(findings), - "unique_resources": len(unique_resources), - "tags_found_count": len(tags_found_keys), - "tags_not_found_count": len(tag_keys) - len(tags_found_keys), - "elapsed": round(time.time() - acc_start, 2), - "regions_scanned": active_regions + "global": { + "hits": len(findings), + "unique_resources": len(global_resources), + "tags_found": len(global_tags_found), + "tags_not_found": len(tag_keys) - len(global_tags_found), + "elapsed_sec": round(time.time() - acc_start, 2) + }, + "regions": regional_data } return findings, acc_id, alias, metrics, "Success" @@ -134,9 +155,9 @@ def main(): if args.limit > 0: all_accs = all_accs[:args.limit] - # Calculate Max Alignment Width (Account ID + max potential alias length) - # We estimate alias length from Org Name as a fallback for alignment - max_label_len = max([len(a['Name']) for a in all_accs]) + 15 if all_accs else 40 + # Calculate bar width based on ID (12) + Name (alias fallback) + padding + # We add 1 for the space between ID and Alias as requested + max_label_len = max([12 + 1 + len(a['Name']) for a in all_accs]) if all_accs else 40 print(f"\n{'='*80}\nAWS TAG CHECKER v{__version__}\n{'='*80}") print(f"Workers: {args.max_workers} | Tags: {len(tag_keys)} | Targeted Accounts: {len(all_accs)}") @@ -151,14 +172,13 @@ def main(): args.account_regex, args.verbose, max_label_len): acc for i, acc in enumerate(all_accs)} for future in as_completed(futures): - res, acc_id, alias, metrics, status = future.result() + res, acc_id, alias, m, status = future.result() if status == "Success": all_findings.extend(res) summary_data.append({ "account_id": acc_id, "alias": alias, - "hits": metrics["hits"], "unique_resources": metrics["unique_resources"], - "tags_found": metrics["tags_found_count"], "tags_not_found": metrics["tags_not_found_count"], - "elapsed_sec": metrics["elapsed"], "regions": metrics["regions_scanned"] + "global_metrics": m["global"], + "regional_metrics": m["regions"] }) else: overall_pbar.write(f"[-] {acc_id}: {status}") @@ -167,19 +187,17 @@ def main(): overall_pbar.close() print("\n" * (args.max_workers + 1)) - # Synchronized Output Files - findings_csv = f"{args.output}_findings_{ts}.csv" - summary_json = f"{args.output}_summary_{ts}.json" + sum_file = f"{args.output}_summary_{ts}.json" + fin_file = f"{args.output}_findings_{ts}.csv" - with open(summary_json, 'w') as f: json.dump(summary_data, f, indent=4) + with open(sum_file, 'w') as f: json.dump(summary_data, f, indent=4) if all_findings: - with open(findings_csv, 'w', newline='') as f: + with open(fin_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=all_findings[0].keys()) writer.writeheader(); writer.writerows(all_findings) print(f"[+] Scan Complete in {round(time.time()-start_overall, 2)}s") - print(f"[+] Summary: {summary_json}") - print(f"[+] Findings: {findings_csv}") + print(f"[+] Summary: {sum_file} | Findings: {fin_file}") if __name__ == "__main__": main()