Skip to content

Commit

Permalink
fix alias by using region for sts/iam
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Jan 15, 2026
1 parent bfb2e36 commit cbeefe0
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions local-app/python-tools/cross-organization/tag-checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,64 @@
from botocore.exceptions import ClientError
from tqdm import tqdm

__version__ = "1.1.2"
__version__ = "1.1.3"

def get_args():
parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}")
parser.add_argument("--role-name", required=True, help="Role to assume in member accounts")
parser.add_argument("--region", required=True, help="Primary region for API initialization")
parser.add_argument("--region", required=True, help="Primary region (e.g., us-gov-east-1)")
parser.add_argument("--profile", required=True, help="AWS CLI profile for Management Account")
parser.add_argument("--tags-file", required=True, help="CSV file with Tag Key in the first column")
parser.add_argument("--max-workers", type=int, default=8, help="Max concurrent account scans (default: 8)")
parser.add_argument("--account-regex", help="Regex to filter accounts by alias (case-insensitive)")
parser.add_argument("--accounts-from", help="File of Account IDs to process (one per line)")
parser.add_argument("--max-workers", type=int, default=8, help="Max concurrent account scans")
parser.add_argument("--account-regex", help="Regex to filter accounts by alias")
parser.add_argument("--accounts-from", help="File of Account IDs to process")
parser.add_argument("--output", default="tag_checker_findings", help="Prefix for output files")
parser.add_argument("--limit", type=int, default=0, help="Limit total accounts processed")
parser.add_argument("--verbose", action="store_true", help="Enable detailed logging and error reporting")
parser.add_argument("--verbose", action="store_true", help="Enable detailed logging")
return parser.parse_args()

def get_session(management_session, account_id, role_name, partition, verbose):
sts = management_session.client('sts')
def get_session(management_session, account_id, role_name, partition, region_name, verbose):
"""Creates a session in the member account with explicit region/partition context."""
sts = management_session.client('sts', region_name=region_name)
role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}"
try:
response = sts.assume_role(RoleArn=role_arn, RoleSessionName="TagDiscoveryScanner")
c = response['Credentials']
return boto3.Session(aws_access_key_id=c['AccessKeyId'],
aws_secret_access_key=c['SecretAccessKey'],
aws_session_token=c['SessionToken'])
# Explicitly passing region_name here is critical for GovCloud/China partitions
return boto3.Session(
aws_access_key_id=c['AccessKeyId'],
aws_secret_access_key=c['SecretAccessKey'],
aws_session_token=c['SessionToken'],
region_name=region_name
)
except Exception as e:
if verbose:
print(f"\n[!] Auth Error for {account_id}: {str(e)}")
tqdm.write(f"\n[!] Auth Error for {account_id} in {region_name}: {str(e)}")
return None

def scan_account(account, management_session, role_name, partition, tag_keys, region_name, lane_id, account_regex, verbose):
acc_id = account['Id']
m_session = get_session(management_session, acc_id, role_name, partition, verbose)
m_session = get_session(management_session, acc_id, role_name, partition, region_name, verbose)

if not m_session:
return [], acc_id, "N/A", "Skipped: Auth/Session Failure"

# Precise Alias Retrieval with Verbose Error Tracking
alias = "N/A"
try:
iam_client = m_session.client('iam')
# Explicitly setting region_name on the IAM client for partition consistency
iam_client = m_session.client('iam', region_name=region_name)
alias_resp = iam_client.list_account_aliases()
alias_list = alias_resp.get('AccountAliases', [])

if verbose:
# Report the raw list of aliases returned
tqdm.write(f"[DEBUG] {acc_id} | Raw Aliases Found: {alias_list}")
tqdm.write(f"[DEBUG] {acc_id} | Raw Aliases: {alias_list} | Region: {region_name}")

alias = alias_list[0] if alias_list else "N/A"
except Exception as e:
alias = f"ERROR: {type(e).__name__}"
if verbose:
tqdm.write(f"[ERROR] {acc_id} | Failed to pull alias: {str(e)}")
tqdm.write(f"[ERROR] {acc_id} | Alias Fetch Error: {str(e)}")

# Check Regex on the actual alias
if account_regex and not re.search(account_regex, alias, re.IGNORECASE):
return [], acc_id, alias, f"Skipped: Regex mismatch ({alias})"

Expand All @@ -78,7 +81,6 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re
ec2 = m_session.client('ec2', region_name=region_name)
regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']]
except Exception as e:
if verbose: tqdm.write(f"[DEBUG] {acc_id} | Region fetch failed, using default: {str(e)}")
regions = [region_name]

for key in tag_keys:
Expand All @@ -95,7 +97,6 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re
})
except ClientError as e:
if "Throttling" in str(e): time.sleep(1)
elif verbose: tqdm.write(f"[DEBUG] {acc_id} | TaggingAPI Error in {r}: {str(e)}")
pbar.update(1)

pbar.close()
Expand All @@ -106,11 +107,14 @@ def main():
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
start_overall = time.time()

session = boto3.Session(profile_name=args.profile)
org = session.client('organizations')
partition = session.client('sts').get_caller_identity()['Arn'].split(':')[1]
# Init Management Session with the targeted region
session = boto3.Session(profile_name=args.profile, region_name=args.region)
org = session.client('organizations', region_name=args.region)

# Detect Partition (aws, aws-us-gov, aws-cn)
sts_client = session.client('sts', region_name=args.region)
partition = sts_client.get_caller_identity()['Arn'].split(':')[1]

# Load Input Data
with open(args.tags_file, mode='r', encoding='utf-8-sig') as f:
tag_keys = [row[0].strip() for row in list(csv.reader(f))[1:] if row]

Expand All @@ -120,8 +124,7 @@ def main():
target_ids = [l.strip() for l in f if l.strip()]

print(f"\n{'='*70}\nAWS TAG CHECKER v{__version__}\n{'='*70}")
print(f"Profile: {args.profile} | Region: {args.region} | Workers: {args.max_workers}")
if args.verbose: print("[!] Verbose Logging Enabled")
print(f"Profile: {args.profile} | Region: {args.region} | Partition: {partition}")

all_accs = []
paginator = org.get_paginator('list_accounts')
Expand All @@ -145,7 +148,7 @@ def main():

for future in as_completed(futures):
res, acc_id, alias, status = future.result()
if "Skipped" in status:
if "Skipped" in status or "Skipping" in status:
overall_pbar.write(f"[-] {acc_id} ({alias}): {status}")
else:
all_findings.extend(res)
Expand Down

0 comments on commit cbeefe0

Please sign in to comment.