Skip to content

Commit

Permalink
add features
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 14, 2026
1 parent 7f76822 commit a05f901
Showing 1 changed file with 49 additions and 31 deletions.
80 changes: 49 additions & 31 deletions local-app/python-tools/cross-organization/tag-checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import sys
from botocore.exceptions import ClientError

__version__ = "1.0.2"

def get_args():
parser = argparse.ArgumentParser(description="Scan AWS Org for specific tag keys.")
parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}")
parser.add_argument("--role-name", required=True, help="Role to assume in member accounts")
parser.add_argument("--region", required=True, help="Region to use for the API calls")
parser.add_argument("--profile", required=True, help="AWS CLI profile for the Management Account")
parser.add_argument("--tags-file", required=True, help="File with one tag key per line")
parser.add_argument("--region", required=True, help="Primary region for API initialization")
parser.add_argument("--profile", required=True, help="AWS CLI profile for Management Account")
parser.add_argument("--tags-file", required=True, help="CSV file with Tag Key in the first column")
return parser.parse_args()

def get_session_for_account(management_session, account_id, role_name, partition):
Expand All @@ -25,39 +27,55 @@ def get_session_for_account(management_session, account_id, role_name, partition
)
return boto3.Session(
aws_access_key_id=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['AccessKeyId'],
aws_secret_access_key=response['Credentials']['SecretAccessKey'],
aws_session_token=response['Credentials']['SessionToken']
)
except ClientError as e:
print(f" [!] Could not assume role for {account_id}: {e}")
return None

def get_account_alias(session):
"""Returns the IAM alias if it exists, otherwise None."""
try:
iam = session.client('iam')
aliases = iam.list_account_aliases()['AccountAliases']
return aliases[0] if aliases else None
return aliases[0] if aliases else "No Alias"
except ClientError:
return None
return "Unknown/Unauthorized"

def main():
args = get_args()
print(f"[*] Initializing Tag Scanner v{__version__}")

# Initialize Management Session
session = boto3.Session(profile_name=args.profile)
org_client = session.client('organizations')

# Get Partition and Account Info
sts_client = session.client('sts')
identity = sts_client.get_caller_identity()
partition = identity['Arn'].split(':')[1]
try:
session = boto3.Session(profile_name=args.profile)
org_client = session.client('organizations')
sts_client = session.client('sts')
identity = sts_client.get_caller_identity()
partition = identity['Arn'].split(':')[1]
except Exception as e:
print(f"[!] Initialization failed: {e}")
sys.exit(1)

# Load Tag Keys
with open(args.tags_file, 'r') as f:
tag_keys = [line.strip().replace('"', '').replace("'", "") for line in f if line.strip()]
# Load Tag Keys from CSV (First Column)
tag_keys = []
try:
with open(args.tags_file, mode='r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
next(reader) # Skip Header: Tag Key,Status,Type,Last updated date,Last used month
for row in reader:
if row:
# Strip quotes and whitespace
tag_keys.append(row[0].strip().replace('"', '').replace("'", ""))
except Exception as e:
print(f"[!] Error reading tags file: {e}")
sys.exit(1)

if not tag_keys:
print("[!] No tag keys found in the input file.")
sys.exit(0)

print(f"[*] Starting scan for {len(tag_keys)} tags across Organization...")
print(f"[*] Loaded {len(tag_keys)} keys. Starting cross-account scan...")

all_results = []

Expand All @@ -69,44 +87,44 @@ def main():
continue

acc_id = account['Id']
acc_name = account['Name'] # This is the Org-level name
acc_name = account['Name']
print(f"[*] Scanning Account: {acc_id} ({acc_name})")

m_session = get_session_for_account(session, acc_id, args.role_name, partition)
if not m_session:
continue

# Get IAM Alias (more useful for many teams than the Org Name)
alias = get_account_alias(m_session)
display_name = alias if alias else acc_name

# Scan all regions (Resource Groups Tagging API is regional)
ec2 = m_session.client('ec2', region_name=args.region)
regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]
# Get all enabled regions for this account
try:
ec2 = m_session.client('ec2', region_name=args.region)
regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]
except ClientError:
regions = [args.region] # Fallback to provided region

for region in regions:
tag_client = m_session.client('resourcegroupstaggingapi', region_name=region)
tag_paginator = tag_client.get_paginator('get_resources')

# Filter for your specific tag keys
# Tagging API allows up to 50 filters per call
tag_filters = [{'Key': k} for k in tag_keys]

try:
# Tagging API allows up to 50 filters per call
for i in range(0, len(tag_filters), 50):
chunk = tag_filters[i:i+50]
for tag_page in tag_paginator.paginate(TagFilters=chunk):
for r_mapping in tag_page.get('ResourceTagMappingList', []):
arn = r_mapping['ResourceARN']
# Find which specific keys from our list were found on this resource
# Find matching keys
found_tags = [t for t in r_mapping['Tags'] if t['Key'] in tag_keys]

for tag in found_tags:
all_results.append({
"tag_name": tag['Key'],
"tag_value": tag['Value'],
"account_id": acc_id,
"account_alias": display_name,
"account_alias": alias,
"region": region,
"arn": arn
})
Expand All @@ -119,9 +137,9 @@ def main():

# Output to CSV
if all_results:
keys = all_results[0].keys()
headers = ["tag_name", "tag_value", "account_id", "account_alias", "region", "arn"]
with open('findings.csv', 'w', newline='') as cf:
writer = csv.DictWriter(cf, fieldnames=keys)
writer = csv.DictWriter(cf, fieldnames=headers)
writer.writeheader()
writer.writerows(all_results)

Expand Down

0 comments on commit a05f901

Please sign in to comment.