From 34b3d664d8bcd76f60a8f4a1dd8a88e428000e4f Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 20 Feb 2026 15:37:44 -0500 Subject: [PATCH] Add Service Catalog end-to-end test script MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests the full SC product → CloudFormation → Lambda → GitHub stack: 1. AWS credentials check 2. SC product + artifact lookup 3. Provision product (launch CFN stack via SC) 4. Poll CFN stack events to terminal state 5. GitHub repo/PR verification 6. Auto-terminate provisioned product (--no-cleanup to skip) Usage: python scripts/test_service_catalog.py [REPO_NAME] python scripts/test_service_catalog.py --no-cleanup [REPO_NAME] python scripts/test_service_catalog.py --terminate PRODUCT_NAME python scripts/test_service_catalog.py --timeout 900 [REPO_NAME] --- scripts/test_service_catalog.py | 662 ++++++++++++++++++++++++++++++++ 1 file changed, 662 insertions(+) create mode 100755 scripts/test_service_catalog.py diff --git a/scripts/test_service_catalog.py b/scripts/test_service_catalog.py new file mode 100755 index 0000000..c990d56 --- /dev/null +++ b/scripts/test_service_catalog.py @@ -0,0 +1,662 @@ +#!/usr/bin/env python3 +""" +End-to-end Service Catalog product tester for the EKS Terragrunt Repo Generator. + +Tests the full Service Catalog → CloudFormation → Lambda → GitHub stack: + 1. AWS connectivity (credentials, region) + 2. Service Catalog – product and portfolio exist, provisioning artifact is findable + 3. Provision product – launches a CloudFormation stack via Service Catalog + 4. CloudFormation – polls stack events until CREATE_COMPLETE or CREATE_FAILED + 5. GitHub – verifies the repository and PR were created + 6. Cleanup – terminates the provisioned product (optional, default ON) + +Usage +----- + python scripts/test_service_catalog.py [REPO_NAME] + python scripts/test_service_catalog.py --no-cleanup [REPO_NAME] + python scripts/test_service_catalog.py --terminate PROVISIONED_PRODUCT_NAME + + REPO_NAME Name of the test GitHub repo to create. + Defaults to "sc-test-" + --no-cleanup Leave the provisioned product running after the test. + --terminate Terminate an existing provisioned product by name and exit. + --timeout Max seconds to wait for provisioning (default: 600) +""" + +import argparse +import json +import sys +import time +from datetime import datetime, timezone +from typing import Any, Optional + +import boto3 +import requests +from botocore.exceptions import ClientError, NoCredentialsError +from pydantic import BaseModel +from rich.console import Console +from rich.panel import Panel +from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn +from rich.rule import Rule +from rich.table import Table + +# --------------------------------------------------------------------------- +# Constants – derived from deployed infrastructure / terraform.tfvars +# --------------------------------------------------------------------------- + +REGION = "us-gov-west-1" +ACCOUNT_ID = "229685449397" + +# Service Catalog product details +SC_PRODUCT_NAME = "eks-terragrunt-repo-creator" +SC_ARTIFACT_NAME = "v2.0" + +# GitHub / EKS defaults used as provisioning parameters +GITHUB_API = "https://github.e.it.census.gov" +GITHUB_ORG = "SCT-Engineering" +SECRET_NAME = "/eks-cluster-deployment/github_token" + +# Default EKS test parameters (match the lambda test defaults) +DEFAULT_PARAMS: dict[str, str] = { + "OwningTeam": "tf-module-admins", + "Environment": "dev", + "AwsRegion": REGION, + "AccountName": "csvd-dev-ew", + "AWSAccountId": ACCOUNT_ID, + "EnvironmentAbbr": "dev", + "VpcName": "csvd-dev-vpc", + "VpcDomainName": "dev.inf.csp1.census.gov", + "ClusterMailingList": "sct-engineering@census.gov", + "OrganizationPath": "census:ocio:csvd", + "FinOpsProjectName": "EKS Platform Services", + "FinOpsProjectNumber": "12345", +} + +console = Console() + + +# --------------------------------------------------------------------------- +# Pydantic models +# --------------------------------------------------------------------------- + +class StepResult(BaseModel): + name: str + passed: bool + detail: str = "" + warning: str = "" + +class SCProductInfo(BaseModel): + product_id: str + artifact_id: str + product_name: str + artifact_name: str + +class ProvisionResult(BaseModel): + accepted: bool + provisioned_name: str = "" + provisioned_id: str = "" + stack_id: str = "" + stack_name: str = "" + status: str = "UNKNOWN" # SUCCESS | FAILED | TIMEOUT | TERMINATED + repo_url: str = "" + pr_url: str = "" + error_message: str = "" + duration_s: float = 0.0 + + +# --------------------------------------------------------------------------- +# AWS helpers +# --------------------------------------------------------------------------- + +def _session() -> boto3.Session: + return boto3.Session(region_name=REGION) + + +def _check_aws(session: boto3.Session) -> StepResult: + try: + identity = session.client("sts").get_caller_identity() + return StepResult( + name="AWS Credentials", + passed=True, + detail=f"Account: {identity['Account']}\nIdentity: {identity['Arn']}", + ) + except NoCredentialsError: + return StepResult( + name="AWS Credentials", + passed=False, + detail="No AWS credentials found. Run `source ~/aws-creds` or configure ~/.aws/credentials.", + ) + except ClientError as exc: + return StepResult(name="AWS Credentials", passed=False, detail=str(exc)) + + +def _find_sc_product(session: boto3.Session) -> tuple[StepResult, Optional[SCProductInfo]]: + """Find the Service Catalog product ID and the provisioning artifact ID.""" + sc = session.client("servicecatalog", region_name=REGION) + + # Search for the product by name + try: + pages = sc.get_paginator("search_products_as_admin").paginate( + Filters={"FullTextSearch": [SC_PRODUCT_NAME]} + ) + products = [] + for page in pages: + products.extend(page.get("ProductViewDetails", [])) + except ClientError as exc: + return StepResult(name="Service Catalog Product", passed=False, detail=str(exc)), None + + match = next( + (p for p in products + if p["ProductViewSummary"]["Name"] == SC_PRODUCT_NAME), + None, + ) + if not match: + names = [p["ProductViewSummary"]["Name"] for p in products] + return StepResult( + name="Service Catalog Product", + passed=False, + detail=( + f"Product '{SC_PRODUCT_NAME}' not found.\n" + f"Available products matching search: {names}" + ), + ), None + + product_id = match["ProductViewSummary"]["ProductId"] + + # Find the provisioning artifact (version) + try: + artifacts = sc.list_provisioning_artifacts(ProductId=product_id)["ProvisioningArtifactDetails"] + except ClientError as exc: + return StepResult(name="Service Catalog Product", passed=False, detail=str(exc)), None + + artifact = next((a for a in artifacts if a["Name"] == SC_ARTIFACT_NAME), None) + if not artifact: + names = [a["Name"] for a in artifacts] + return StepResult( + name="Service Catalog Product", + passed=False, + detail=( + f"Artifact version '{SC_ARTIFACT_NAME}' not found for product '{SC_PRODUCT_NAME}'.\n" + f"Available versions: {names}" + ), + ), None + + info = SCProductInfo( + product_id=product_id, + artifact_id=artifact["Id"], + product_name=SC_PRODUCT_NAME, + artifact_name=SC_ARTIFACT_NAME, + ) + return StepResult( + name="Service Catalog Product", + passed=True, + detail=( + f"Product: {SC_PRODUCT_NAME} ({product_id})\n" + f"Artifact: {SC_ARTIFACT_NAME} ({artifact['Id']})" + ), + ), info + + +def _provision_product( + session: boto3.Session, + info: SCProductInfo, + repo_name: str, +) -> tuple[StepResult, ProvisionResult]: + """Launch a new provisioned product.""" + sc = session.client("servicecatalog", region_name=REGION) + + # Build the full parameter list: project-specific + defaults + params = [ + {"Key": "ProjectName", "Value": repo_name}, + {"Key": "ClusterName", "Value": repo_name}, + ] + for key, value in DEFAULT_PARAMS.items(): + params.append({"Key": key, "Value": value}) + + provisioned_name = repo_name # re-use repo_name as the provisioned product name + + try: + resp = sc.provision_product( + ProductId = info.product_id, + ProvisioningArtifactId = info.artifact_id, + ProvisionedProductName = provisioned_name, + ProvisioningParameters = params, + Tags=[ + {"Key": "ManagedBy", "Value": "test-service-catalog"}, + {"Key": "TestRun", "Value": datetime.now(timezone.utc).isoformat()}, + {"Key": "CleanupAfter", "Value": "yes"}, + ], + ) + except ClientError as exc: + return StepResult( + name="Provision Product", + passed=False, + detail=str(exc), + ), ProvisionResult(accepted=False) + + detail = resp.get("RecordDetail", {}) + prov_id = detail.get("ProvisionedProductId", "") + record_id = detail.get("RecordId", "") + status = detail.get("Status", "") + + result = ProvisionResult( + accepted=True, + provisioned_name=provisioned_name, + provisioned_id=prov_id, + ) + + return StepResult( + name="Provision Product", + passed=True, + detail=( + f"Provisioned product name: {provisioned_name}\n" + f"Provisioned product ID: {prov_id}\n" + f"Record ID: {record_id}\n" + f"Initial status: {status}" + ), + ), result + + +def _poll_provisioning( + session: boto3.Session, + result: ProvisionResult, + timeout_s: int = 600, +) -> ProvisionResult: + """ + Poll Service Catalog until the product reaches a terminal status, + then pull CFN stack outputs for the repo/PR URLs. + """ + sc = session.client("servicecatalog", region_name=REGION) + cfn = session.client("cloudformation", region_name=REGION) + + deadline = time.time() + timeout_s + start_time = time.time() + last_event_time: Optional[int] = None + + console.print( + f"\n[dim]Polling provisioned product [bold]{result.provisioned_name}[/bold] " + f"(max {timeout_s}s)…[/dim]" + ) + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + console=console, + transient=True, + ) as progress: + task = progress.add_task("Waiting for provisioning to complete…", total=None) + + while time.time() < deadline: + # ── Check provisioned product status ───────────────────────────── + try: + desc = sc.describe_provisioned_product( + Name=result.provisioned_name + )["ProvisionedProductDetail"] + except ClientError as exc: + progress.update(task, description=f"describe error: {exc}") + time.sleep(10) + continue + + pp_status = desc.get("Status", "UNKNOWN") + pp_status_msg = desc.get("StatusMessage", "") + + # Grab the underlying CFN stack name from the provisioned product + if not result.stack_name: + cfn_name = desc.get("PhysicalId", "") # e.g. SC-229685449397-pp-xxxxx + if cfn_name: + result.stack_name = cfn_name + + # ── Stream CloudFormation events ────────────────────────────────── + if result.stack_name: + try: + events = cfn.describe_stack_events( + StackName=result.stack_name + )["StackEvents"] + + # Show only new events (after last_event_time) + for ev in reversed(events): + ev_ts = int(ev["Timestamp"].timestamp() * 1000) + if last_event_time is None or ev_ts > last_event_time: + last_event_time = ev_ts + resource = ev.get("LogicalResourceId", "") + ev_status = ev.get("ResourceStatus", "") + ev_reason = ev.get("ResourceStatusReason", "") + msg = f"{resource} → {ev_status}" + if ev_reason and "complete" not in ev_reason.lower(): + msg += f" ({ev_reason[:80]})" + progress.update(task, description=msg[:100]) + + except ClientError: + pass # stack may not be visible yet + + # ── Terminal states ─────────────────────────────────────────────── + if pp_status in ("AVAILABLE",): + result.status = "SUCCESS" + result.duration_s = time.time() - start_time + + # Pull stack outputs for repo/PR URLs + if result.stack_name: + try: + stack_info = cfn.describe_stacks( + StackName=result.stack_name + )["Stacks"][0] + for out in stack_info.get("Outputs", []): + k = out["OutputKey"] + v = out["OutputValue"] + if "Repository" in k or "Repo" in k: + result.repo_url = v + if "PullRequest" in k or "PR" in k or "Merge" in k: + result.pr_url = v + except ClientError: + pass + + break + + if pp_status in ("ERROR", "TAINTED"): + result.status = "FAILED" + result.error_message = pp_status_msg or pp_status + result.duration_s = time.time() - start_time + break + + if pp_status == "UNDER_CHANGE": + progress.update(task, description=f"Status: {pp_status} {pp_status_msg or ''}") + + time.sleep(10) + + if result.status == "UNKNOWN": + result.status = "TIMEOUT" + result.error_message = f"Provisioning did not complete within {timeout_s}s." + result.duration_s = time.time() - start_time + + return result + + +def _verify_github( + session: boto3.Session, + repo_name: str, + repo_url: str, +) -> StepResult: + """Verify the GitHub repository exists and has a PR open.""" + # Retrieve token from Secrets Manager (same path as the Lambda uses) + sm = session.client("secretsmanager", region_name=REGION) + try: + token = sm.get_secret_value(SecretId=SECRET_NAME)["SecretString"] + except ClientError as exc: + return StepResult( + name="GitHub Verification", + passed=False, + detail=f"Could not retrieve token: {exc}", + ) + + api_base = GITHUB_API.rstrip("/") + "/api/v3" + headers = { + "Authorization": f"token {token}", + "Accept": "application/vnd.github.v3+json", + } + + # Check repo exists + repo_endpoint = f"{api_base}/repos/{GITHUB_ORG}/{repo_name}" + try: + resp = requests.get(repo_endpoint, headers=headers, verify=False, timeout=10) + except requests.exceptions.RequestException as exc: + return StepResult( + name="GitHub Verification", + passed=False, + detail=f"Request failed: {exc}", + ) + + if resp.status_code != 200: + return StepResult( + name="GitHub Verification", + passed=False, + detail=f"Repo not found: HTTP {resp.status_code} from {repo_endpoint}", + ) + + repo_data = resp.json() + default_branch = repo_data.get("default_branch", "main") + visibility = repo_data.get("visibility", "unknown") + + # Check for open pull requests + pr_endpoint = f"{api_base}/repos/{GITHUB_ORG}/{repo_name}/pulls?state=open" + pr_resp = requests.get(pr_endpoint, headers=headers, verify=False, timeout=10) + pr_count = len(pr_resp.json()) if pr_resp.status_code == 200 else "?" + + return StepResult( + name="GitHub Verification", + passed=True, + detail=( + f"Repository: {GITHUB_ORG}/{repo_name}\n" + f"Visibility: {visibility}\n" + f"Default branch: {default_branch}\n" + f"Open PRs: {pr_count}" + ), + ) + + +def _terminate_product(session: boto3.Session, provisioned_name: str) -> StepResult: + """Terminate a provisioned product by name.""" + sc = session.client("servicecatalog", region_name=REGION) + + try: + resp = sc.terminate_provisioned_product( + ProvisionedProductName=provisioned_name, + IgnoreErrors=True, + ) + record_id = resp.get("RecordDetail", {}).get("RecordId", "unknown") + return StepResult( + name="Terminate Product", + passed=True, + detail=f"Termination initiated for '{provisioned_name}' (record: {record_id})", + ) + except ClientError as exc: + return StepResult( + name="Terminate Product", + passed=False, + detail=str(exc), + ) + + +# --------------------------------------------------------------------------- +# Rendering helpers +# --------------------------------------------------------------------------- + +STATUS_ICONS = {True: "[green]✔[/green]", False: "[red]✗[/red]"} +INVOKE_ICONS = { + "SUCCESS": "[green]✔ SUCCESS[/green]", + "FAILED": "[red]✗ FAILED[/red]", + "TIMEOUT": "[yellow]⏱ TIMEOUT[/yellow]", + "TERMINATED": "[dim]◼ TERMINATED[/dim]", + "UNKNOWN": "[dim]? UNKNOWN[/dim]", +} + + +def render_step(step: StepResult) -> None: + icon = STATUS_ICONS[step.passed] + color = "green" if step.passed else "red" + console.print(f"{icon} [bold {color}]{step.name}[/bold {color}]") + for line in step.detail.splitlines(): + console.print(f" [dim]{line}[/dim]") + if step.warning: + console.print(f" [yellow]⚠ {step.warning}[/yellow]") + + +def render_summary(steps: list[StepResult], prov: Optional[ProvisionResult]) -> None: + console.print(Rule("[bold]Summary[/bold]")) + + table = Table(show_header=True, header_style="bold cyan", expand=True) + table.add_column("Step", style="bold") + table.add_column("Result", justify="center") + table.add_column("Notes", overflow="fold") + + for s in steps: + icon = STATUS_ICONS[s.passed] + notes = (s.detail.splitlines()[0] if s.detail else "") or s.warning + table.add_row(s.name, icon, notes[:80]) + + if prov: + icon = INVOKE_ICONS.get(prov.status, "?") + notes = prov.repo_url or prov.error_message or "" + table.add_row("Provisioning", icon, notes[:80]) + + console.print(table) + + if prov and prov.status == "SUCCESS": + console.print(Panel( + f"[bold green]Service Catalog provisioning completed![/bold green]\n\n" + f"[bold]Repository URL:[/bold] {prov.repo_url or '(see CFN outputs)'}\n" + f"[bold]Pull Request URL:[/bold] {prov.pr_url or '(see CFN outputs)'}\n" + f"[bold]Duration:[/bold] {prov.duration_s:.0f}s\n" + f"[bold]CFN Stack:[/bold] {prov.stack_name}", + title="[bold green]✔ PASS[/bold green]", + border_style="green", + )) + elif prov and prov.status == "FAILED": + console.print(Panel( + f"[bold red]Provisioning FAILED.[/bold red]\n\n" + f"[bold]Error:[/bold] {prov.error_message}\n" + f"[bold]CFN Stack:[/bold] {prov.stack_name}", + title="[bold red]✗ FAIL[/bold red]", + border_style="red", + )) + elif prov and prov.status == "TIMEOUT": + console.print(Panel( + f"[yellow]Provisioning did not complete within the wait window.[/yellow]\n\n" + f"[bold]Provisioned Product:[/bold] {prov.provisioned_name}\n" + f"[bold]CFN Stack:[/bold] {prov.stack_name or 'not yet known'}\n\n" + "Check the AWS Console → Service Catalog → Provisioned products", + title="[yellow]⏱ TIMEOUT[/yellow]", + border_style="yellow", + )) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser( + description="End-to-end Service Catalog product tester", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + parser.add_argument( + "repo_name", + nargs="?", + default=None, + help="Repository / provisioned-product name (default: sc-test-)", + ) + parser.add_argument( + "--no-cleanup", + action="store_true", + help="Do NOT terminate the provisioned product after the test.", + ) + parser.add_argument( + "--terminate", + metavar="PROVISIONED_PRODUCT_NAME", + default=None, + help="Terminate an existing provisioned product by name and exit.", + ) + parser.add_argument( + "--timeout", + type=int, + default=600, + help="Max seconds to wait for provisioning (default: 600)", + ) + args = parser.parse_args() + + session = _session() + + # ── --terminate shortcut ────────────────────────────────────────────────── + if args.terminate: + console.print(Rule(f"[bold cyan]Terminating: {args.terminate}[/bold cyan]")) + step = _terminate_product(session, args.terminate) + render_step(step) + sys.exit(0 if step.passed else 1) + + repo_name = args.repo_name or f"sc-test-{int(time.time())}" + cleanup = not args.no_cleanup + + console.print(Rule("[bold cyan]EKS Terragrunt Repo Generator – Service Catalog Test[/bold cyan]")) + console.print(f"[dim]Timestamp : {datetime.now(timezone.utc).isoformat()}[/dim]") + console.print(f"[dim]Test repo : [bold]{repo_name}[/bold][/dim]") + console.print(f"[dim]Product : {SC_PRODUCT_NAME} v{SC_ARTIFACT_NAME}[/dim]") + console.print(f"[dim]Region : {REGION}[/dim]") + console.print(f"[dim]Cleanup : {'yes (terminate after test)' if cleanup else 'no (--no-cleanup)'}[/dim]\n") + + steps: list[StepResult] = [] + prov_result: Optional[ProvisionResult] = None + + # ── Step 1: AWS credentials ─────────────────────────────────────────────── + console.print(Rule("Step 1 · AWS Credentials")) + creds = _check_aws(session) + steps.append(creds) + render_step(creds) + if not creds.passed: + render_summary(steps, None) + sys.exit(1) + + # ── Step 2: Find the SC product ─────────────────────────────────────────── + console.print(Rule("Step 2 · Service Catalog Product")) + sc_step, sc_info = _find_sc_product(session) + steps.append(sc_step) + render_step(sc_step) + if not sc_step.passed or sc_info is None: + render_summary(steps, None) + sys.exit(1) + + # ── Step 3: Launch provisioned product ─────────────────────────────────── + console.print(Rule("Step 3 · Launch Provisioned Product")) + prov_step, prov_result = _provision_product(session, sc_info, repo_name) + steps.append(prov_step) + render_step(prov_step) + if not prov_step.passed: + render_summary(steps, prov_result) + sys.exit(1) + + # ── Step 4: Poll until complete ─────────────────────────────────────────── + console.print(Rule("Step 4 · Polling CloudFormation Stack")) + prov_result = _poll_provisioning(session, prov_result, timeout_s=args.timeout) + prov_status_icon = { + "SUCCESS": "[green]✔ SUCCESS[/green]", + "FAILED": "[red]✗ FAILED[/red]", + "TIMEOUT": "[yellow]⏱ TIMEOUT[/yellow]", + }.get(prov_result.status, prov_result.status) + console.print(f"\n{prov_status_icon} Duration: {prov_result.duration_s:.0f}s") + if prov_result.stack_name: + console.print(f" [dim]CFN Stack: {prov_result.stack_name}[/dim]") + + # ── Step 5: Verify GitHub repository ───────────────────────────────────── + if prov_result.status == "SUCCESS": + console.print(Rule("Step 5 · GitHub Repository Verification")) + gh_step = _verify_github(session, repo_name, prov_result.repo_url) + steps.append(gh_step) + render_step(gh_step) + + # ── Step 6: Cleanup ─────────────────────────────────────────────────────── + if cleanup and prov_result.provisioned_name: + console.print(Rule("Step 6 · Cleanup (Terminate Provisioned Product)")) + term_step = _terminate_product(session, prov_result.provisioned_name) + steps.append(term_step) + render_step(term_step) + if term_step.passed: + console.print( + " [dim]Note: GitHub repo created during the test is NOT deleted automatically.\n" + " Delete it manually if no longer needed:[/dim]\n" + f" [dim] {GITHUB_API}/{GITHUB_ORG}/{repo_name}[/dim]" + ) + elif not cleanup: + console.print( + f"\n[yellow]⚠ Skipping cleanup (--no-cleanup). " + f"Provisioned product: [bold]{prov_result.provisioned_name}[/bold][/yellow]\n" + f" To clean up later, run:\n" + f" [dim]python scripts/test_service_catalog.py --terminate {prov_result.provisioned_name}[/dim]" + ) + + # ── Final summary ───────────────────────────────────────────────────────── + render_summary(steps, prov_result) + sys.exit(0 if prov_result.status == "SUCCESS" else 1) + + +if __name__ == "__main__": + main()