Skip to content

Commit

Permalink
add new tgw checks
Browse files Browse the repository at this point in the history
  • Loading branch information
badra001 committed Mar 10, 2026
1 parent 8c05cec commit b2f54d7
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env python
import json, argparse, sys, os, glob

# --- VERSIONING ---
__version__ = "1.0.0"

def find_latest_file(pattern):
files = glob.glob(pattern)
return max(files, key=os.path.getctime) if files else None

def main():
parser = argparse.ArgumentParser(description="TGW VPC Attachment Assessor")
parser.add_argument("--input", help="JSON audit file")
args = parser.parse_args()

input_file = args.input or find_latest_file("audit_results.check_tgw_attachments.*.json")
if not input_file: print("Error: No file found."); sys.exit(1)

with open(input_file, 'r') as f: data = json.load(f)

report_width = 200
print("-" * report_width)
print(f"TGW VPC ATTACHMENT DNS SUPPORT AUDIT | Accounts: {len(data)}")
print("-" * report_width)
print(f"{'Idx':<4} | {'Account ID':<15} | {'Region':<12} | {'Attachment ID':<25} | {'VPC ID':<15} | {'DNS Support':<12} | {'Name Tag'}")
print("-" * report_width)

remediation_targets = []
stats = {"total": 0, "enabled": 0, "disabled": 0}

for idx, account in enumerate(data, 1):
acc_id = account.get("account_id")
checks = account.get("data", {})
for key, val in checks.items():
if key == "account_summary" or ":" not in key: continue

stats["total"] += 1
dns = val.get("dns_support", "disabled")

if dns == "enable":
stats["enabled"] += 1
remediation_targets.append((acc_id, val['region'], val['resource']))
else:
stats["disabled"] += 1

print(f"{idx:<4} | {acc_id:<15} | {val['region']:<12} | {val['resource']:<25} | {val['vpc_id']:<15} | {dns:<12} | {val['tag_name']}")

print("-" * report_width)
print(f"SUMMARY: {stats['total']} Total Attachments | {stats['enabled']} with DNS Enabled (NON-COMPLIANT) | {stats['disabled']} Disabled")

if remediation_targets:
print(f"\nACTION REQUIRED: Found {len(remediation_targets)} attachments to modify.")
# Generates a pseudo-code action list for the crawler
with open("remediate_tgw_dns.txt", "w") as f:
for target in remediation_targets:
f.write(f"MODIFY_TGW_ATTACHMENT: {target[0]} | {target[1]} | {target[2]} | DnsSupport=disable\n")
print("Remediation target list saved to remediate_tgw_dns.txt")

if __name__ == "__main__":
main()
48 changes: 48 additions & 0 deletions local-app/python-tools/cross-organization/check_tgw_attachments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import boto3
from datetime import datetime

# --- VERSIONING ---
__version__ = "1.0.0"

def account_task(account_session, account_id, account_name, region):
"""
Identifies Transit Gateway VPC attachments and their DNS support status.
"""
results = {"alias": "N/A", "data": {}}
try:
ec2_global = account_session.client('ec2', region_name=region)
regions = [r['RegionName'] for r in ec2_global.describe_regions()['Regions']]

for reg in regions:
ec2 = account_session.client('ec2', region_name=reg)
try:
# Describe only VPC attachments
paginator = ec2.get_paginator('describe_transit_gateway_vpc_attachments')
for page in paginator.paginate():
for attach in page['TransitGatewayVpcAttachments']:
attach_id = attach['TransitGatewayAttachmentId']

# Extract requested Tags
tags = {t['Key']: t['Value'] for t in attach.get('Tags', [])}

results["data"][f"{reg}:{attach_id}"] = {
"resource": attach_id,
"region": reg,
"tgw_id": attach['TransitGatewayId'],
"vpc_id": attach['VpcId'],
"vpc_owner_id": attach['VpcOwnerId'],
"state": attach['State'],
"dns_support": attach.get('Options', {}).get('DnsSupport', 'disabled'),
"ipv6_support": attach.get('Options', {}).get('Ipv6Support', 'disabled'),
"tag_name": tags.get('Name', 'N/A'),
"tag_tgw_label": tags.get('boc:tgw_label', 'N/A'),
"tag_tgw_env": tags.get('boc_tgw_environment', 'N/A')
}
except Exception:
continue

results["data"]["account_summary"] = {"_summary": f"ATTACHMENTS:{len(results['data'])}"}
except Exception as e:
results["error"] = str(e)

return results
10 changes: 10 additions & 0 deletions local-app/python-tools/cross-organization/remediate_tgw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
def modify_attachment_dns(account_session, region, attachment_id):
"""
Action Plugin: Disables DNS support for a specific attachment.
"""
ec2 = account_session.client('ec2', region_name=region)
response = ec2.modify_transit_gateway_vpc_attachment(
TransitGatewayAttachmentId=attachment_id,
Options={'DnsSupport': 'disable'}
)
return response['TransitGatewayVpcAttachment']['State']
72 changes: 72 additions & 0 deletions local-app/python-tools/cross-organization/remediate_tgw_dns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import boto3
import sys
import os

# --- VERSIONING ---
__version__ = "1.0.0"

def get_session(account_id, role_name="OrganizationAccountAccessRole"):
"""Assumes a role in the target account to return a boto3 session."""
sts = boto3.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
try:
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="TGW_Remediation_Session"
)
creds = response['Credentials']
return boto3.Session(
aws_access_key_id=creds['AccessKeyId'],
aws_secret_access_key=creds['SecretAccessKey'],
aws_session_token=creds['SessionToken']
)
except Exception as e:
print(f"Error: Could not assume role for {account_id}: {e}")
return None

def main():
input_file = "remediate_tgw_dns.txt"
if not os.path.exists(input_file):
print(f"Error: {input_file} not found. Run assessment script first.")
sys.exit(1)

print("-" * 100)
print(f"TGW DNS SUPPORT REMEDIATION SCRIPT | Version {__version__}")
print("-" * 100)

with open(input_file, 'r') as f:
lines = f.readlines()

for line in lines:
if not line.startswith("MODIFY_TGW_ATTACHMENT:"):
continue

# Parse: MODIFY_TGW_ATTACHMENT: {acc_id} | {region} | {attach_id} | DnsSupport=disable
parts = line.split(":")[-1].strip().split("|")
acc_id = parts[0].strip()
region = parts[1].strip()
attach_id = parts[2].strip()

print(f"Processing: Account {acc_id} | Region {region} | Attachment {attach_id}...")

session = get_session(acc_id)
if not session:
continue

ec2 = session.client('ec2', region_name=region)
try:
# Perform the modification
response = ec2.modify_transit_gateway_vpc_attachment(
TransitGatewayAttachmentId=attach_id,
Options={'DnsSupport': 'disable'}
)
state = response['TransitGatewayVpcAttachment']['State']
print(f" SUCCESS: Status is now '{state}'")
except Exception as e:
print(f" FAILED: {e}")

print("-" * 100)
print("Remediation Complete.")

if __name__ == "__main__":
main()

0 comments on commit b2f54d7

Please sign in to comment.