-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
109 additions
and
204 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
107 changes: 51 additions & 56 deletions
107
local-app/python-tools/cross-organization/remediate_tgw_dns.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,84 +1,79 @@ | ||
| #!/bin/env python3 | ||
|
|
||
| import boto3 | ||
| import sys | ||
| import os | ||
| import argparse | ||
| from datetime import datetime | ||
|
|
||
| # --- VERSIONING --- | ||
| __version__ = "1.1.0" | ||
| __version__ = "1.2.1" | ||
|
|
||
| def get_session(account_id, role_name="OrganizationAccountAccessRole"): | ||
| """Assumes a role in the target account to return a boto3 session.""" | ||
| """ | ||
| Internal helper to assume the cross-account role. | ||
| Defaults to OrganizationAccountAccessRole. | ||
| """ | ||
| sts = boto3.client('sts') | ||
| role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" | ||
| try: | ||
| response = sts.assume_role( | ||
| RoleArn=role_arn, | ||
| RoleSessionName="TGW_Remediation_Session" | ||
| RoleSessionName="TGW_Remediation_Execution" | ||
| ) | ||
| creds = response['Credentials'] | ||
| return boto3.Session( | ||
| aws_access_key_id=creds['AccessKeyId'], | ||
| aws_secret_access_key=creds['SecretAccessKey'], | ||
| aws_session_token=creds['SessionToken'] | ||
| ) | ||
| except Exception as e: | ||
| print(f"Error: Could not assume role for {account_id}: {e}") | ||
| except Exception: | ||
| return None | ||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser(description="TGW VPC Attachment DNS Remediator") | ||
| parser.add_argument("--input", default="remediate_tgw_dns.txt", help="Target list file") | ||
| parser.add_argument("--rollback", action="store_true", help="Re-enable DNS Support instead of disabling it") | ||
| args = parser.parse_args() | ||
|
|
||
| if not os.path.exists(args.input): | ||
| print(f"Error: {args.input} not found. Run the assessment script first.") | ||
| sys.exit(1) | ||
|
|
||
| # Determine action based on flag | ||
| desired_state = "enable" if args.rollback else "disable" | ||
| action_label = "ROLLBACK (Enabling)" if args.rollback else "REMEDIATION (Disabling)" | ||
|
|
||
| print("-" * 100) | ||
| print(f"TGW DNS SUPPORT {action_label} | Version {__version__}") | ||
| print("-" * 100) | ||
|
|
||
| with open(args.input, 'r') as f: | ||
| lines = f.readlines() | ||
| def remediate_task(instruction_line, dry_run=True): | ||
| """ | ||
| Core remediation logic called by org_runner.py. | ||
| Parses the instruction line and modifies the TGW attachment. | ||
| """ | ||
| if not instruction_line.startswith("MODIFY_TGW_ATTACHMENT:"): | ||
| return None | ||
|
|
||
| for line in lines: | ||
| if not line.startswith("MODIFY_TGW_ATTACHMENT:"): | ||
| continue | ||
|
|
||
| # Parse: MODIFY_TGW_ATTACHMENT: {acc_id} | {region} | {attach_id} | DnsSupport=disable | ||
| parts = line.split(":")[-1].strip().split("|") | ||
| # Parse: MODIFY_TGW_ATTACHMENT: {acc_id} | {region} | {attach_id} | DnsSupport=disable | ||
| try: | ||
| parts = instruction_line.split(":")[-1].strip().split("|") | ||
| acc_id = parts[0].strip() | ||
| region = parts[1].strip() | ||
| attach_id = parts[2].strip() | ||
| except Exception as e: | ||
| return {"error": f"Failed to parse line: {str(e)}", "line": instruction_line} | ||
|
|
||
| print(f"Target: Account {acc_id} | Region {region} | Attachment {attach_id}") | ||
| log_entry = { | ||
| "account_id": acc_id, | ||
| "region": region, | ||
| "resource": attach_id, | ||
| "action": "DnsSupport=disable", | ||
| "status": "PENDING", | ||
| "timestamp": datetime.now().isoformat() | ||
| } | ||
|
|
||
| session = get_session(acc_id) | ||
| if not session: | ||
| print(f" SKIPPING: Unable to access account {acc_id}") | ||
| continue | ||
| # Handle Dry Run | ||
| if dry_run: | ||
| print(f"[DRY-RUN] Would disable DNS Support for {attach_id} in account {acc_id} ({region})") | ||
| log_entry["status"] = "DRY_RUN_SKIPPED" | ||
| return log_entry | ||
|
|
||
| ec2 = session.client('ec2', region_name=region) | ||
| try: | ||
| # Perform the modification based on the desired state | ||
| response = ec2.modify_transit_gateway_vpc_attachment( | ||
| TransitGatewayAttachmentId=attach_id, | ||
| Options={'DnsSupport': desired_state} | ||
| ) | ||
| state = response['TransitGatewayVpcAttachment']['State'] | ||
| print(f" SUCCESS: DNS Support set to '{desired_state}'. Current state: {state}") | ||
| except Exception as e: | ||
| print(f" FAILED: {e}") | ||
| # Execute Remediation | ||
| session = get_session(acc_id) | ||
| if not session: | ||
| log_entry["status"] = "ERROR: Unable to assume role" | ||
| return log_entry | ||
|
|
||
| print("-" * 100) | ||
| print(f"{action_label} Complete.") | ||
| try: | ||
| ec2 = session.client('ec2', region_name=region) | ||
| ec2.modify_transit_gateway_vpc_attachment( | ||
| TransitGatewayAttachmentId=attach_id, | ||
| Options={'DnsSupport': 'disable'} | ||
| ) | ||
| print(f"SUCCESS: Disabled DNS Support for {attach_id} in {acc_id}") | ||
| log_entry["status"] = "SUCCESS" | ||
| except Exception as e: | ||
| error_msg = str(e) | ||
| print(f"FAILED: {attach_id} in {acc_id} - {error_msg}") | ||
| log_entry["status"] = f"ERROR: {error_msg}" | ||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| return log_entry |