diff --git a/local-app/python-tools/cross-organization/tag-checker.py b/local-app/python-tools/cross-organization/tag-checker.py index 84cbf4eb..191c214e 100755 --- a/local-app/python-tools/cross-organization/tag-checker.py +++ b/local-app/python-tools/cross-organization/tag-checker.py @@ -12,12 +12,12 @@ from botocore.exceptions import ClientError from tqdm import tqdm -__version__ = "1.1.4" +__version__ = "1.1.5" def get_args(): parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}") parser.add_argument("--role-name", required=True, help="Role to assume in member accounts") - parser.add_argument("--region", required=True, help="Primary region (e.g., us-gov-east-1)") + parser.add_argument("--region", required=True, help="Management account region (e.g., us-gov-east-1)") parser.add_argument("--profile", required=True, help="AWS CLI profile for Management Account") parser.add_argument("--tags-file", required=True, help="CSV file with Tag Key in the first column") parser.add_argument("--max-workers", type=int, default=8, help="Max concurrent account scans") @@ -49,9 +49,9 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re m_session = get_session(management_session, acc_id, role_name, partition, region_name, verbose) if not m_session: - return [], acc_id, "N/A", 0, 0, len(tag_keys), 0, "Auth Failure" + return [], acc_id, "N/A", {"hits": 0, "res": 0, "found": 0, "miss": len(tag_keys), "time": 0, "regions": []}, "Auth Fail" - # Precise Alias Retrieval + # Get Definitive Alias try: alias_resp = m_session.client('iam', region_name=region_name).list_account_aliases() alias = alias_resp.get('AccountAliases', ["N/A"])[0] @@ -59,9 +59,9 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re alias = "N/A" if account_regex and not re.search(account_regex, alias, re.IGNORECASE): - return [], acc_id, alias, 0, 0, len(tag_keys), 0, f"Regex Mismatch ({alias})" + return [], acc_id, alias, {"hits": 0, "res": 0, "found": 0, "miss": len(tag_keys), "time": 0, "regions": []}, f"Regex Skip ({alias})" - # Multi-Region Discovery + # Discover Enabled Regions try: ec2 = m_session.client('ec2', region_name=region_name) active_regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] @@ -73,7 +73,7 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re unique_resources = set() tags_found_keys = set() - # Aligned Progress Bar + # Progress Bar Alignment label = f"{acc_id} ({alias})".ljust(bar_width) pbar = tqdm(total=len(tag_keys), desc=f"Lane {lane_id} | {label}", position=lane_id, leave=False, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}') @@ -98,8 +98,15 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re pbar.update(1) pbar.close() - elapsed = round(time.time() - acc_start, 2) - return findings, acc_id, alias, len(findings), len(unique_resources), (len(tag_keys) - len(tags_found_keys)), elapsed, "Success" + metrics = { + "hits": len(findings), + "unique_resources": len(unique_resources), + "tags_found_count": len(tags_found_keys), + "tags_not_found_count": len(tag_keys) - len(tags_found_keys), + "elapsed": round(time.time() - acc_start, 2), + "regions_scanned": active_regions + } + return findings, acc_id, alias, metrics, "Success" def main(): args = get_args() @@ -118,8 +125,6 @@ def main(): with open(args.accounts_from, 'r') as f: target_ids = [l.strip() for l in f if l.strip()] - print(f"\n{'='*75}\nAWS TAG CHECKER v{__version__}\n{'='*75}") - all_accs = [] paginator = org.get_paginator('list_accounts') for page in paginator.paginate(): @@ -129,14 +134,15 @@ def main(): if args.limit > 0: all_accs = all_accs[:args.limit] - # Calculate bar width for alignment (12 chars for ID + max alias length + padding) - max_label_len = max([len(a['Name']) for a in all_accs]) + 15 if all_accs else 30 + # Calculate Max Alignment Width (Account ID + max potential alias length) + # We estimate alias length from Org Name as a fallback for alignment + max_label_len = max([len(a['Name']) for a in all_accs]) + 15 if all_accs else 40 + print(f"\n{'='*80}\nAWS TAG CHECKER v{__version__}\n{'='*80}") print(f"Workers: {args.max_workers} | Tags: {len(tag_keys)} | Targeted Accounts: {len(all_accs)}") - print(f"{'='*75}\n") - + all_findings = [] - summary_list = [] + summary_data = [] overall_pbar = tqdm(total=len(all_accs), desc="Total Org Progress", position=0) with ThreadPoolExecutor(max_workers=args.max_workers) as executor: @@ -145,12 +151,14 @@ def main(): args.account_regex, args.verbose, max_label_len): acc for i, acc in enumerate(all_accs)} for future in as_completed(futures): - res, acc_id, alias, hits, res_count, miss_count, elapsed, status = future.result() - if "Success" in status: + res, acc_id, alias, metrics, status = future.result() + if status == "Success": all_findings.extend(res) - summary_list.append({ - "account_id": acc_id, "alias": alias, "hits": hits, - "unique_resources": res_count, "tags_not_found": miss_count, "elapsed": elapsed + summary_data.append({ + "account_id": acc_id, "alias": alias, + "hits": metrics["hits"], "unique_resources": metrics["unique_resources"], + "tags_found": metrics["tags_found_count"], "tags_not_found": metrics["tags_not_found_count"], + "elapsed_sec": metrics["elapsed"], "regions": metrics["regions_scanned"] }) else: overall_pbar.write(f"[-] {acc_id}: {status}") @@ -159,18 +167,19 @@ def main(): overall_pbar.close() print("\n" * (args.max_workers + 1)) - # Final Summary and Findings - sum_file = f"{args.output}_summary_{ts}.json" - fin_file = f"{args.output}_findings_{ts}.csv" + # Synchronized Output Files + findings_csv = f"{args.output}_findings_{ts}.csv" + summary_json = f"{args.output}_summary_{ts}.json" - with open(sum_file, 'w') as f: json.dump(summary_list, f, indent=4) + with open(summary_json, 'w') as f: json.dump(summary_data, f, indent=4) if all_findings: - with open(fin_file, 'w', newline='') as f: + with open(findings_csv, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=all_findings[0].keys()) writer.writeheader(); writer.writerows(all_findings) print(f"[+] Scan Complete in {round(time.time()-start_overall, 2)}s") - print(f"[+] Summary: {sum_file} | Findings: {fin_file}") + print(f"[+] Summary: {summary_json}") + print(f"[+] Findings: {findings_csv}") if __name__ == "__main__": main()