From 7f7682287aa943ac7ffa9d12b7dd16d3a374c2ac Mon Sep 17 00:00:00 2001 From: badra001 Date: Wed, 14 Jan 2026 12:28:05 -0500 Subject: [PATCH] initil --- .../cross-organization/tag-checker.py | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100755 local-app/python-tools/cross-organization/tag-checker.py diff --git a/local-app/python-tools/cross-organization/tag-checker.py b/local-app/python-tools/cross-organization/tag-checker.py new file mode 100755 index 00000000..32029956 --- /dev/null +++ b/local-app/python-tools/cross-organization/tag-checker.py @@ -0,0 +1,132 @@ +#!/bin/env python + +import boto3 +import csv +import json +import argparse +import sys +from botocore.exceptions import ClientError + +def get_args(): + parser = argparse.ArgumentParser(description="Scan AWS Org for specific tag keys.") + parser.add_argument("--role-name", required=True, help="Role to assume in member accounts") + parser.add_argument("--region", required=True, help="Region to use for the API calls") + parser.add_argument("--profile", required=True, help="AWS CLI profile for the Management Account") + parser.add_argument("--tags-file", required=True, help="File with one tag key per line") + return parser.parse_args() + +def get_session_for_account(management_session, account_id, role_name, partition): + sts = management_session.client('sts') + role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}" + try: + response = sts.assume_role( + RoleArn=role_arn, + RoleSessionName="TagDiscoveryScanner" + ) + return boto3.Session( + aws_access_key_id=response['Credentials']['AccessKeyId'], + aws_secret_access_key=response['Credentials']['AccessKeyId'], + aws_session_token=response['Credentials']['SessionToken'] + ) + except ClientError as e: + print(f" [!] Could not assume role for {account_id}: {e}") + return None + +def get_account_alias(session): + """Returns the IAM alias if it exists, otherwise None.""" + try: + iam = session.client('iam') + aliases = iam.list_account_aliases()['AccountAliases'] + return aliases[0] if aliases else None + except ClientError: + return None + +def main(): + args = get_args() + + # Initialize Management Session + session = boto3.Session(profile_name=args.profile) + org_client = session.client('organizations') + + # Get Partition and Account Info + sts_client = session.client('sts') + identity = sts_client.get_caller_identity() + partition = identity['Arn'].split(':')[1] + + # Load Tag Keys + with open(args.tags_file, 'r') as f: + tag_keys = [line.strip().replace('"', '').replace("'", "") for line in f if line.strip()] + + print(f"[*] Starting scan for {len(tag_keys)} tags across Organization...") + + all_results = [] + + # Iterate Accounts + paginator = org_client.get_paginator('list_accounts') + for page in paginator.paginate(): + for account in page['Accounts']: + if account['Status'] != 'ACTIVE': + continue + + acc_id = account['Id'] + acc_name = account['Name'] # This is the Org-level name + print(f"[*] Scanning Account: {acc_id} ({acc_name})") + + m_session = get_session_for_account(session, acc_id, args.role_name, partition) + if not m_session: + continue + + # Get IAM Alias (more useful for many teams than the Org Name) + alias = get_account_alias(m_session) + display_name = alias if alias else acc_name + + # Scan all regions (Resource Groups Tagging API is regional) + ec2 = m_session.client('ec2', region_name=args.region) + regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] + + for region in regions: + tag_client = m_session.client('resourcegroupstaggingapi', region_name=region) + tag_paginator = tag_client.get_paginator('get_resources') + + # Filter for your specific tag keys + tag_filters = [{'Key': k} for k in tag_keys] + + try: + # Tagging API allows up to 50 filters per call + for i in range(0, len(tag_filters), 50): + chunk = tag_filters[i:i+50] + for tag_page in tag_paginator.paginate(TagFilters=chunk): + for r_mapping in tag_page.get('ResourceTagMappingList', []): + arn = r_mapping['ResourceARN'] + # Find which specific keys from our list were found on this resource + found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys] + + for tag in found_tags: + all_results.append({ + "tag_name": tag['Key'], + "tag_value": tag['Value'], + "account_id": acc_id, + "account_alias": display_name, + "region": region, + "arn": arn + }) + except Exception as e: + print(f" [!] Error in {region}: {e}") + + # Output to JSON + with open('findings.json', 'w') as jf: + json.dump(all_results, jf, indent=4) + + # Output to CSV + if all_results: + keys = all_results[0].keys() + with open('findings.csv', 'w', newline='') as cf: + writer = csv.DictWriter(cf, fieldnames=keys) + writer.writeheader() + writer.writerows(all_results) + + print(f"\n[+] Scan Complete. Found {len(all_results)} instances.") + print("[+] Files generated: findings.json, findings.csv") + +if __name__ == "__main__": + main()