Skip to content

Commit

Permalink
initil
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 14, 2026
1 parent 31585df commit 7f76822
Showing 1 changed file with 132 additions and 0 deletions.
132 changes: 132 additions & 0 deletions local-app/python-tools/cross-organization/tag-checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/bin/env python

import boto3
import csv
import json
import argparse
import sys
from botocore.exceptions import ClientError

def get_args():
parser = argparse.ArgumentParser(description="Scan AWS Org for specific tag keys.")
parser.add_argument("--role-name", required=True, help="Role to assume in member accounts")
parser.add_argument("--region", required=True, help="Region to use for the API calls")
parser.add_argument("--profile", required=True, help="AWS CLI profile for the Management Account")
parser.add_argument("--tags-file", required=True, help="File with one tag key per line")
return parser.parse_args()

def get_session_for_account(management_session, account_id, role_name, partition):
sts = management_session.client('sts')
role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}"
try:
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="TagDiscoveryScanner"
)
return boto3.Session(
aws_access_key_id=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['AccessKeyId'],
aws_session_token=response['Credentials']['SessionToken']
)
except ClientError as e:
print(f" [!] Could not assume role for {account_id}: {e}")
return None

def get_account_alias(session):
"""Returns the IAM alias if it exists, otherwise None."""
try:
iam = session.client('iam')
aliases = iam.list_account_aliases()['AccountAliases']
return aliases[0] if aliases else None
except ClientError:
return None

def main():
args = get_args()

# Initialize Management Session
session = boto3.Session(profile_name=args.profile)
org_client = session.client('organizations')

# Get Partition and Account Info
sts_client = session.client('sts')
identity = sts_client.get_caller_identity()
partition = identity['Arn'].split(':')[1]

# Load Tag Keys
with open(args.tags_file, 'r') as f:
tag_keys = [line.strip().replace('"', '').replace("'", "") for line in f if line.strip()]

print(f"[*] Starting scan for {len(tag_keys)} tags across Organization...")

all_results = []

# Iterate Accounts
paginator = org_client.get_paginator('list_accounts')
for page in paginator.paginate():
for account in page['Accounts']:
if account['Status'] != 'ACTIVE':
continue

acc_id = account['Id']
acc_name = account['Name'] # This is the Org-level name
print(f"[*] Scanning Account: {acc_id} ({acc_name})")

m_session = get_session_for_account(session, acc_id, args.role_name, partition)
if not m_session:
continue

# Get IAM Alias (more useful for many teams than the Org Name)
alias = get_account_alias(m_session)
display_name = alias if alias else acc_name

# Scan all regions (Resource Groups Tagging API is regional)
ec2 = m_session.client('ec2', region_name=args.region)
regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]

for region in regions:
tag_client = m_session.client('resourcegroupstaggingapi', region_name=region)
tag_paginator = tag_client.get_paginator('get_resources')

# Filter for your specific tag keys
tag_filters = [{'Key': k} for k in tag_keys]

try:
# Tagging API allows up to 50 filters per call
for i in range(0, len(tag_filters), 50):
chunk = tag_filters[i:i+50]
for tag_page in tag_paginator.paginate(TagFilters=chunk):
for r_mapping in tag_page.get('ResourceTagMappingList', []):
arn = r_mapping['ResourceARN']
# Find which specific keys from our list were found on this resource
found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys]

for tag in found_tags:
all_results.append({
"tag_name": tag['Key'],
"tag_value": tag['Value'],
"account_id": acc_id,
"account_alias": display_name,
"region": region,
"arn": arn
})
except Exception as e:
print(f" [!] Error in {region}: {e}")

# Output to JSON
with open('findings.json', 'w') as jf:
json.dump(all_results, jf, indent=4)

# Output to CSV
if all_results:
keys = all_results[0].keys()
with open('findings.csv', 'w', newline='') as cf:
writer = csv.DictWriter(cf, fieldnames=keys)
writer.writeheader()
writer.writerows(all_results)

print(f"\n[+] Scan Complete. Found {len(all_results)} instances.")
print("[+] Files generated: findings.json, findings.csv")

if __name__ == "__main__":
main()

0 comments on commit 7f76822

Please sign in to comment.