diff --git a/local-app/python-tools/cross-organization/tag-checker.py b/local-app/python-tools/cross-organization/tag-checker.py index d55993fb..4468df81 100755 --- a/local-app/python-tools/cross-organization/tag-checker.py +++ b/local-app/python-tools/cross-organization/tag-checker.py @@ -12,61 +12,64 @@ from botocore.exceptions import ClientError from tqdm import tqdm -__version__ = "1.1.2" +__version__ = "1.1.3" def get_args(): parser = argparse.ArgumentParser(description=f"AWS Org Tag Scanner v{__version__}") parser.add_argument("--role-name", required=True, help="Role to assume in member accounts") - parser.add_argument("--region", required=True, help="Primary region for API initialization") + parser.add_argument("--region", required=True, help="Primary region (e.g., us-gov-east-1)") parser.add_argument("--profile", required=True, help="AWS CLI profile for Management Account") parser.add_argument("--tags-file", required=True, help="CSV file with Tag Key in the first column") - parser.add_argument("--max-workers", type=int, default=8, help="Max concurrent account scans (default: 8)") - parser.add_argument("--account-regex", help="Regex to filter accounts by alias (case-insensitive)") - parser.add_argument("--accounts-from", help="File of Account IDs to process (one per line)") + parser.add_argument("--max-workers", type=int, default=8, help="Max concurrent account scans") + parser.add_argument("--account-regex", help="Regex to filter accounts by alias") + parser.add_argument("--accounts-from", help="File of Account IDs to process") parser.add_argument("--output", default="tag_checker_findings", help="Prefix for output files") parser.add_argument("--limit", type=int, default=0, help="Limit total accounts processed") - parser.add_argument("--verbose", action="store_true", help="Enable detailed logging and error reporting") + parser.add_argument("--verbose", action="store_true", help="Enable detailed logging") return parser.parse_args() -def get_session(management_session, account_id, role_name, partition, verbose): - sts = management_session.client('sts') +def get_session(management_session, account_id, role_name, partition, region_name, verbose): + """Creates a session in the member account with explicit region/partition context.""" + sts = management_session.client('sts', region_name=region_name) role_arn = f"arn:{partition}:iam::{account_id}:role/{role_name}" try: response = sts.assume_role(RoleArn=role_arn, RoleSessionName="TagDiscoveryScanner") c = response['Credentials'] - return boto3.Session(aws_access_key_id=c['AccessKeyId'], - aws_secret_access_key=c['SecretAccessKey'], - aws_session_token=c['SessionToken']) + # Explicitly passing region_name here is critical for GovCloud/China partitions + return boto3.Session( + aws_access_key_id=c['AccessKeyId'], + aws_secret_access_key=c['SecretAccessKey'], + aws_session_token=c['SessionToken'], + region_name=region_name + ) except Exception as e: if verbose: - print(f"\n[!] Auth Error for {account_id}: {str(e)}") + tqdm.write(f"\n[!] Auth Error for {account_id} in {region_name}: {str(e)}") return None def scan_account(account, management_session, role_name, partition, tag_keys, region_name, lane_id, account_regex, verbose): acc_id = account['Id'] - m_session = get_session(management_session, acc_id, role_name, partition, verbose) + m_session = get_session(management_session, acc_id, role_name, partition, region_name, verbose) if not m_session: return [], acc_id, "N/A", "Skipped: Auth/Session Failure" - # Precise Alias Retrieval with Verbose Error Tracking alias = "N/A" try: - iam_client = m_session.client('iam') + # Explicitly setting region_name on the IAM client for partition consistency + iam_client = m_session.client('iam', region_name=region_name) alias_resp = iam_client.list_account_aliases() alias_list = alias_resp.get('AccountAliases', []) if verbose: - # Report the raw list of aliases returned - tqdm.write(f"[DEBUG] {acc_id} | Raw Aliases Found: {alias_list}") + tqdm.write(f"[DEBUG] {acc_id} | Raw Aliases: {alias_list} | Region: {region_name}") alias = alias_list[0] if alias_list else "N/A" except Exception as e: alias = f"ERROR: {type(e).__name__}" if verbose: - tqdm.write(f"[ERROR] {acc_id} | Failed to pull alias: {str(e)}") + tqdm.write(f"[ERROR] {acc_id} | Alias Fetch Error: {str(e)}") - # Check Regex on the actual alias if account_regex and not re.search(account_regex, alias, re.IGNORECASE): return [], acc_id, alias, f"Skipped: Regex mismatch ({alias})" @@ -78,7 +81,6 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re ec2 = m_session.client('ec2', region_name=region_name) regions = [r['RegionName'] for r in ec2.describe_regions()['Regions']] except Exception as e: - if verbose: tqdm.write(f"[DEBUG] {acc_id} | Region fetch failed, using default: {str(e)}") regions = [region_name] for key in tag_keys: @@ -95,7 +97,6 @@ def scan_account(account, management_session, role_name, partition, tag_keys, re }) except ClientError as e: if "Throttling" in str(e): time.sleep(1) - elif verbose: tqdm.write(f"[DEBUG] {acc_id} | TaggingAPI Error in {r}: {str(e)}") pbar.update(1) pbar.close() @@ -106,11 +107,14 @@ def main(): ts = datetime.now().strftime("%Y%m%d_%H%M%S") start_overall = time.time() - session = boto3.Session(profile_name=args.profile) - org = session.client('organizations') - partition = session.client('sts').get_caller_identity()['Arn'].split(':')[1] + # Init Management Session with the targeted region + session = boto3.Session(profile_name=args.profile, region_name=args.region) + org = session.client('organizations', region_name=args.region) + + # Detect Partition (aws, aws-us-gov, aws-cn) + sts_client = session.client('sts', region_name=args.region) + partition = sts_client.get_caller_identity()['Arn'].split(':')[1] - # Load Input Data with open(args.tags_file, mode='r', encoding='utf-8-sig') as f: tag_keys = [row[0].strip() for row in list(csv.reader(f))[1:] if row] @@ -120,8 +124,7 @@ def main(): target_ids = [l.strip() for l in f if l.strip()] print(f"\n{'='*70}\nAWS TAG CHECKER v{__version__}\n{'='*70}") - print(f"Profile: {args.profile} | Region: {args.region} | Workers: {args.max_workers}") - if args.verbose: print("[!] Verbose Logging Enabled") + print(f"Profile: {args.profile} | Region: {args.region} | Partition: {partition}") all_accs = [] paginator = org.get_paginator('list_accounts') @@ -145,7 +148,7 @@ def main(): for future in as_completed(futures): res, acc_id, alias, status = future.result() - if "Skipped" in status: + if "Skipped" in status or "Skipping" in status: overall_pbar.write(f"[-] {acc_id} ({alias}): {status}") else: all_findings.extend(res)