From a4410a0ee86faec52b917fec3fcbbf8fcc11b848 Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 20 Feb 2026 13:48:16 -0500 Subject: [PATCH] fix: use internal visibility for GHE repo creation; add REPO_VISIBILITY env var GHE enterprise policy blocks private repo creation for org members. Switch default visibility to 'internal' (visible to org members, not public) which complies with both the enterprise policy and security posture. Changes: - repository_provider.py: _default_visibility() reads REPO_VISIBILITY env var, defaults to 'internal' instead of hardcoded 'private' - github_provider.py: use effective_visibility so 'private' boolean field is False for 'internal' repos (previously always True, causing GHE 403) - app.py: explicitly pass RepositorySettings(visibility=REPO_VISIBILITY) into get_repository() so env var is honoured by the running container - packer.pkr.hcl: bake Census Bureau Root CA cert into container trust store so VERIFY_SSL=true can be re-enabled after next rebuild - deploy/terraform.tfvars: GITHUB_API without /api/v3 (code appends it) New scripts: - scripts/test_workflow.py: end-to-end 6-step workflow tester with Rich UI - scripts/check_github_permissions.py: 11-check GitHub PAT permissions verifier Verified: Lambda successfully creates internal repos in SCT-Engineering org --- deploy/terraform.tfvars | 4 +- docs/SERVICE_CATALOG_RESOLUTION.md | 153 ++++ packer.pkr.hcl | 4 + scripts/check_github_permissions.py | 773 +++++++++++++++++++++ scripts/test_workflow.py | 636 +++++++++++++++++ template_automation/app.py | 10 +- template_automation/github_provider.py | 13 +- template_automation/repository_provider.py | 19 +- 8 files changed, 1604 insertions(+), 8 deletions(-) create mode 100644 docs/SERVICE_CATALOG_RESOLUTION.md create mode 100644 scripts/check_github_permissions.py create mode 100644 scripts/test_workflow.py diff --git a/deploy/terraform.tfvars b/deploy/terraform.tfvars index b87b247d..db49b2f5 100644 --- a/deploy/terraform.tfvars +++ b/deploy/terraform.tfvars @@ -2,9 +2,11 @@ aws_region = "us-gov-west-1" -github_api_url = "https://github.e.it.census.gov/api/v3" # Your GitHub Enterprise URL +github_api_url = "https://github.e.it.census.gov" # Your GitHub Enterprise URL (code adds /api/v3 automatically) github_org_name = "SCT-Engineering" # Your GitHub organization name template_repo_name = "template-eks-cluster" +# GHE enterprise policy blocks 'private' repo creation for org members; use 'internal' +repo_visibility = "internal" create_service_catalog = true github_token_secret_name = "/eks-cluster-deployment/github_token" diff --git a/docs/SERVICE_CATALOG_RESOLUTION.md b/docs/SERVICE_CATALOG_RESOLUTION.md new file mode 100644 index 00000000..4891f8f7 --- /dev/null +++ b/docs/SERVICE_CATALOG_RESOLUTION.md @@ -0,0 +1,153 @@ +# Service Catalog S3 Access Issue - RESOLVED ✅ + +**Date:** February 11, 2026 +**Resolved By:** David Arnold & DeLong +**Issue Duration:** ~2 days + +## Problem Summary + +Service Catalog product deployment was failing with S3 Access Denied errors when attempting to launch the GitHub Repository Creator product. The CloudFormation template was uploaded to the enterprise artifacts bucket but Service Catalog couldn't read it during provisioning. + +## Root Cause + +The issue was **tag-based access control** enforced by Service Control Policies (SCPs) in the GovCloud environment. The launch role's IAM policy was missing a required condition that restricts S3 access to only objects tagged with `servicecatalog:provisioning = true`. + +### Key Insight +All working Service Catalog products in the enterprise bucket had the `servicecatalog:provisioning = true` tag on their templates, while our newly uploaded template did not. + +## Solution Implemented + +### 1. Updated Launch Role IAM Policy + +Changed the `S3ReadTemplate` statement in `deploy/service_catalog.tf`: + +**Before:** +```hcl +{ + Sid = "S3ReadTemplate" + Effect = "Allow" + Action = [ + "s3:GetObject" + ] + Resource = "arn:${data.aws_partition.current.partition}:s3:::${local.artifacts_bucket}/*" +} +``` + +**After:** +```hcl +{ + Sid = "S3ReadTemplate" + Effect = "Allow" + Action = [ + "s3:GetObject" + ] + Resource = "*" + Condition = { + "StringEquals" = { + "s3:ExistingObjectTag/servicecatalog:provisioning" = ["true"] + } + } +} +``` + +### 2. Updated S3 Object Configuration + +Modified the `aws_s3_object.product_template` resource to automatically add the required tag: + +```hcl +resource "aws_s3_object" "product_template" { + count = local.create_sc ? 1 : 0 + bucket = local.artifacts_bucket + key = local.product_s3_key + source = local.product_template + etag = filemd5(local.product_template) + + tags = merge(var.tags, { + "servicecatalog:provisioning" = "true" + }) +} +``` + +## Verification + +After applying the changes: + +1. ✅ S3 object has the required tag: + ```json + { + "servicecatalog:provisioning": "true", + "ManagedBy": "Terraform", + "Environment": "production", + "Purpose": "ServiceCatalogRepoGenerator" + } + ``` + +2. ✅ Service Catalog can read the template: + ```bash + aws servicecatalog describe-provisioning-parameters \ + --product-id prod-w3uvfaxmeblxe \ + --provisioning-artifact-name "v1.0" \ + --path-id "lpv3-uchiqj7m3d57k" \ + --region us-gov-west-1 + ``` + Returns all CloudFormation parameters successfully. + +3. ✅ Launch constraints properly configured: + - LAUNCH constraint: Uses `github-automation-sc-launch-role` + - TEMPLATE constraint: Locks Lambda ARN to deployed function + +## Product Details + +- **Product ID:** `prod-w3uvfaxmeblxe` +- **Portfolio ID:** `port-uchiqj7m3d57k` +- **Template Location:** `s3://servicecatalog-product-artifacts-20250904021619588100000003/github-repo-creator/v1.0/product-template.yaml` +- **Template URL:** `https://servicecatalog-product-artifacts-20250904021619588100000003.s3.us-gov-west-1.amazonaws.com/github-repo-creator/v1.0/product-template.yaml` +- **Console URL:** https://console.amazonaws-us-gov.com/servicecatalog/home?region=us-gov-west-1#/products/prod-w3uvfaxmeblxe + +## Parameters Available + +The product accepts the following parameters: + +| Parameter | Required | Default | Description | +|-----------|----------|---------|-------------| +| `ProjectName` | Yes | - | Repository name (lowercase, hyphens) | +| `OwningTeam` | Yes | `tf-module-admins` | GitHub team with admin access | +| `Environment` | Yes | `development` | Environment (development/staging/production/sandbox) | +| `AwsRegion` | No | `us-gov-west-1` | Primary AWS region | +| `OrganizationPath` | No | - | Organization path (e.g., dept:team:subteam) | +| `FinOpsProjectNumber` | No | - | FinOps project number | +| `FinOpsProjectName` | No | - | FinOps project name | +| `AdditionalTags` | No | `{}` | Additional tags as JSON | +| `LambdaFunctionArn` | No | (locked) | ARN of the Lambda function (enforced by template constraint) | + +## Lessons Learned + +1. **Tag-Based Access Control:** Enterprise S3 buckets in GovCloud environments often use tag-based SCPs to control access +2. **Consistency Matters:** Always examine working examples to understand hidden requirements (tags, URL formats, etc.) +3. **IAM Conditions:** IAM policy conditions can reference S3 object tags for fine-grained access control +4. **Service Catalog Permissions:** Service Catalog service principal needs proper S3 access, which is often controlled differently than user access + +## Related Files + +- **Terraform Configuration:** `deploy/service_catalog.tf` +- **Product Template:** `service-catalog/product-template.yaml` +- **Variables:** `deploy/variables.tf` +- **Config:** `deploy/terraform.tfvars` +- **Request Document:** `docs/S3_ACCESS_REQUEST.md` + +## Next Steps + +1. ✅ Service Catalog product is ready to use +2. ⏳ Test launching a repository through the product +3. ⏳ Rebuild Lambda container image to fix stale code issue (separate task) +4. ⏳ Document end-user instructions for using the Service Catalog product +5. ⏳ Set up CI/CD pipeline to automatically update product template on changes + +## Acknowledgments + +Special thanks to **DeLong** for identifying the tag-based access control requirement that was the key to resolving this issue! + +--- + +**Status:** ✅ RESOLVED +**Last Updated:** February 11, 2026 diff --git a/packer.pkr.hcl b/packer.pkr.hcl index 96dbf3c9..36daa30f 100644 --- a/packer.pkr.hcl +++ b/packer.pkr.hcl @@ -99,6 +99,10 @@ build { provisioner "shell" { inline = [ "cd /var/task", + "echo 'Installing Census Bureau Root CA certificate...'", + "cp /etc/pip-cert.pem /etc/pki/ca-trust/source/anchors/census-root-ca.pem || true", + "update-ca-trust extract || true", + "cat /etc/pip-cert.pem >> /var/lang/lib/python3.11/site-packages/certifi/cacert.pem || true", "echo 'Installing dependencies for Service Catalog Lambda function...'", "pip install -r requirements.txt", "echo 'Lambda function built for AWS Service Catalog integration'" diff --git a/scripts/check_github_permissions.py b/scripts/check_github_permissions.py new file mode 100644 index 00000000..05ec957d --- /dev/null +++ b/scripts/check_github_permissions.py @@ -0,0 +1,773 @@ +#!/usr/bin/env python3 +""" +GitHub PAT permissions checker for the Service Catalog Repository Creator. + +Verifies that the token stored in AWS Secrets Manager (or supplied directly) +has every permission required by the Lambda workflow: + + Required PAT scopes + ─────────────────── + repo – read & write repos, branches, files, git refs, pull requests + admin:org – create repos inside an org (or at minimum: write:org) + read:org – check org membership + + Required org-level capabilities (your user in SCT-Engineering) + ───────────────────────────────────────────────────────────── + • Member of the org + • Can create repositories (org setting: members_can_create_repositories) + + API operations checked + ────────────────────── + 1. GET /user – token is valid + 2. GET /user/orgs – read:org scope + 3. GET /orgs/{org} – org visibility + membership + 4. GET /orgs/{org}/members/{user} – is member of org + 5. GET /orgs/{org}/repos?per_page=1 – list org repos (repo scope) + 6. POST /orgs/{org}/repos (dry-run) – create repo permission + → we parse the 403 message to distinguish + "missing scope" vs "org policy blocks member creation" + 7. GET /repos/{org}/{template} – can read template repo + 8. GET /repos/{org}/{template}/branches – can list branches (repo scope) + 9. POST /repos/{org}/{probe}/git/refs – create git ref (repo scope) + [only attempted when a writable probe repo is available] + 10. POST /repos/{org}/{probe}/pulls – create pull request (repo scope) + [only attempted when a writable probe repo is available] + 11. PUT /orgs/{org}/teams/{team}/repos/… – team permission (admin:org) + [read-only check: validate team exists] + +Usage +───── + # token from Secrets Manager (same path the Lambda uses): + python scripts/check_github_permissions.py + + # or supply a token directly: + python scripts/check_github_permissions.py --token ghp_... + + # override org / template repo: + python scripts/check_github_permissions.py --org MyOrg --template my-template + +Options +─────── + --token PAT to test (default: read from Secrets Manager) + --api GitHub API base URL (default: from Lambda env or env var GITHUB_API) + --org GitHub org (default: from Lambda env or env var GITHUB_ORG_NAME) + --template Template repo name (default: from Lambda env or TEMPLATE_REPO_NAME) + --team Owning team slug to check (default: tf-module-admins) + --no-ssl-verify Skip TLS verification (for Census internal GHE) + --region AWS region (default: us-gov-west-1) + --secret Secrets Manager secret name (default: from Lambda env) +""" + +import argparse +import os +import sys +from typing import Any, Optional + +import boto3 +import requests +from botocore.exceptions import ClientError +from pydantic import BaseModel, Field, field_validator +from rich.console import Console +from rich.panel import Panel +from rich.rule import Rule +from rich.table import Table + +# ── shared constants (mirror test_workflow.py) ──────────────────────────────── +REGION = "us-gov-west-1" +FUNCTION_NAME = "service-catalog-repo-gen-template-automation" +DEFAULT_SECRET = "/eks-cluster-deployment/github_token" +DEFAULT_API = "https://github.e.it.census.gov" +DEFAULT_ORG = "SCT-Engineering" +DEFAULT_TEMPLATE = "template-eks-cluster" +DEFAULT_TEAM = "tf-module-admins" + +console = Console() +PASS = "[green]✔[/green]" +FAIL = "[red]✗[/red]" +WARN = "[yellow]⚠[/yellow]" +SKIP = "[dim]–[/dim]" + + +# ── Pydantic models ─────────────────────────────────────────────────────────── + +class CheckConfig(BaseModel): + api_base: str + org: str + template: str + team: str + token: str + verify_ssl: bool + region: str = REGION + secret_name: str = DEFAULT_SECRET + + @field_validator("api_base") + @classmethod + def ensure_api_v3(cls, v: str) -> str: + v = v.rstrip("/") + if "/api/v3" not in v: + v = v + "/api/v3" + return v + + @field_validator("token") + @classmethod + def non_empty(cls, v: str) -> str: + if not v: + raise ValueError("Token must not be empty") + return v + + +class CheckResult(BaseModel): + name: str + status: str # "pass" | "fail" | "warn" | "skip" + detail: str = "" + hint: str = "" + + +# ── HTTP helper ─────────────────────────────────────────────────────────────── + +class GHClient: + """Thin wrapper around requests for GitHub API calls.""" + + def __init__(self, cfg: CheckConfig) -> None: + self.cfg = cfg + self.session = requests.Session() + + # Choose auth header format + token = cfg.token + if token.startswith(("ghs_", "ghu_", "github_pat_")) or len(token) > 50: + auth = f"Bearer {token}" + else: + auth = f"token {token}" + + self.session.headers.update({ + "Authorization": auth, + "Accept": "application/vnd.github.v3+json", + "User-Agent": "SC-Permissions-Checker/1.0", + }) + self.session.verify = cfg.verify_ssl + + def get(self, path: str, **kw) -> requests.Response: + return self.session.get(f"{self.cfg.api_base}{path}", timeout=10, **kw) + + def post(self, path: str, **kw) -> requests.Response: + return self.session.post(f"{self.cfg.api_base}{path}", timeout=10, **kw) + + def put(self, path: str, **kw) -> requests.Response: + return self.session.put(f"{self.cfg.api_base}{path}", timeout=10, **kw) + + def scopes(self, resp: requests.Response) -> list[str]: + """Parse X-OAuth-Scopes header into a list.""" + raw = resp.headers.get("X-OAuth-Scopes", "") + return [s.strip() for s in raw.split(",") if s.strip()] + + +# ── Individual checks ───────────────────────────────────────────────────────── + +def chk_token_valid(gh: GHClient) -> tuple[CheckResult, Optional[str]]: + resp = gh.get("/user") + if resp.status_code == 200: + data = resp.json() + username = data.get("login", "unknown") + scopes = gh.scopes(resp) + return CheckResult( + name = "Token valid", + status = "pass", + detail = f"Authenticated as [bold]{username}[/bold] | scopes: {scopes or '(none reported – GHE may omit)'}", + ), username + return CheckResult( + name = "Token valid", + status = "fail", + detail = f"HTTP {resp.status_code}: {resp.text[:120]}", + hint = "Regenerate the token and update Secrets Manager.", + ), None + + +def chk_token_scopes(gh: GHClient) -> CheckResult: + """ + GHE often doesn't return X-OAuth-Scopes reliably; we infer from what + the token can and cannot do rather than trusting the header. + This check is informational only. + """ + resp = gh.get("/user") + scopes = gh.scopes(resp) + + if not scopes: + return CheckResult( + name = "PAT scopes (header)", + status = "warn", + detail = "X-OAuth-Scopes header absent – GitHub Enterprise sometimes omits it.", + hint = "Scope enforcement is checked functionally in later steps.", + ) + + required = {"repo"} + # admin:org or write:org both allow org repo creation + org_scope = any(s in scopes for s in ("admin:org", "write:org", "admin")) + missing = required - set(scopes) + + if missing or not org_scope: + parts = [] + if missing: + parts.append(f"missing: {missing}") + if not org_scope: + parts.append("need admin:org or write:org to create org repos") + return CheckResult( + name = "PAT scopes (header)", + status = "fail", + detail = f"Reported scopes: {scopes} | Issues: {'; '.join(parts)}", + hint = "Edit the token on GitHub → Settings → Developer settings → Personal access tokens.", + ) + + return CheckResult( + name = "PAT scopes (header)", + status = "pass", + detail = f"Scopes: {scopes}", + ) + + +def chk_read_org(gh: GHClient) -> CheckResult: + org = gh.cfg.org + resp = gh.get(f"/orgs/{org}") + if resp.status_code == 200: + data = resp.json() + members_can_create = data.get("members_can_create_repositories", None) + detail = f"Org [bold]{org}[/bold] is reachable." + hint = "" + status = "pass" + if members_can_create is False: + detail += " [yellow]⚠ members_can_create_repositories = false[/yellow]" + hint = "An org owner must enable 'Members can create repositories' or use an org admin token." + status = "warn" + elif members_can_create is True: + detail += " members_can_create_repositories = true ✔" + return CheckResult(name="Read org", status=status, detail=detail, hint=hint) + + if resp.status_code == 404: + return CheckResult( + name = "Read org", + status = "fail", + detail = f"Org '{org}' not found (404) – check GITHUB_ORG_NAME.", + ) + return CheckResult( + name = "Read org", + status = "fail", + detail = f"HTTP {resp.status_code}: {resp.text[:120]}", + hint = "Token may need read:org scope.", + ) + + +def chk_org_membership(gh: GHClient, username: str) -> CheckResult: + org = gh.cfg.org + resp = gh.get(f"/orgs/{org}/members/{username}") + if resp.status_code == 204: + return CheckResult( + name = "Org membership", + status = "pass", + detail = f"[bold]{username}[/bold] is a member of [bold]{org}[/bold].", + ) + if resp.status_code == 302: + return CheckResult( + name = "Org membership", + status = "warn", + detail = "302 redirect – user may be a member but token lacks read:org scope to confirm.", + hint = "Add read:org scope to the token.", + ) + if resp.status_code == 404: + return CheckResult( + name = "Org membership", + status = "fail", + detail = f"[bold]{username}[/bold] is NOT a member of [bold]{org}[/bold] (or token can't see it).", + hint = "Ask an org owner to add your account, or use a token that belongs to an org member.", + ) + return CheckResult( + name = "Org membership", + status = "warn", + detail = f"Unexpected HTTP {resp.status_code} – could not confirm membership.", + ) + + +def chk_list_org_repos(gh: GHClient) -> CheckResult: + org = gh.cfg.org + resp = gh.get(f"/orgs/{org}/repos", params={"per_page": 1, "type": "all"}) + if resp.status_code == 200: + count = len(resp.json()) + return CheckResult( + name = "List org repos", + status = "pass", + detail = f"Can list repos in [bold]{org}[/bold] (repo scope confirmed).", + ) + return CheckResult( + name = "List org repos", + status = "fail", + detail = f"HTTP {resp.status_code}: {resp.text[:120]}", + hint = "Token needs 'repo' scope.", + ) + + +def chk_create_repo(gh: GHClient) -> CheckResult: + """ + Attempt to create a repo with an obviously-invalid name so GitHub rejects + it with a validation error (422) rather than actually creating it. + A 422 means the token HAS permission but the payload was bad → pass. + A 403 means permission denied → fail, and we decode the reason. + + We try private first, then public, because GHE enterprise policies sometimes + block private repo creation for members even with admin:org scope. + """ + org = gh.cfg.org + sentinel = "__perm-check-DO-NOT-CREATE__" + + results_by_visibility: dict[str, tuple[int, str]] = {} + + for visibility, private_flag in (("private", True), ("public", False)): + resp = gh.post(f"/orgs/{org}/repos", json={ + "name": sentinel, + "private": private_flag, + }) + + if resp.status_code in (201, 200): + # Actually created – delete it immediately + gh.session.delete(f"{gh.cfg.api_base}/repos/{org}/{sentinel}", timeout=10) + return CheckResult( + name = "Create repo in org", + status = "pass", + detail = f"Token can create {visibility} repos (sentinel created and deleted).", + ) + + if resp.status_code == 422: + # Validation error → auth accepted, permission OK + try: + msg = resp.json().get("errors", resp.json().get("message", "")) + except Exception: + msg = resp.text[:80] + return CheckResult( + name = "Create repo in org", + status = "pass", + detail = f"Permission confirmed via 422 on sentinel ({visibility}): {msg}", + ) + + try: + message = resp.json().get("message", resp.text[:120]) + except Exception: + message = resp.text[:120] + results_by_visibility[visibility] = (resp.status_code, message) + + # Both attempts failed – build a clear report + priv_code, priv_msg = results_by_visibility.get("private", (0, "")) + pub_code, pub_msg = results_by_visibility.get("public", (0, "")) + + private_scope_error = "different OAuth scope" in priv_msg or "scope" in priv_msg.lower() + public_also_blocked = pub_code == 403 + + if private_scope_error and not public_also_blocked: + return CheckResult( + name = "Create repo in org", + status = "fail", + detail = ( + f"Private repos blocked by GHE enterprise policy (403): {priv_msg}\n" + f"Public repo creation was also blocked (403): {pub_msg}" + if public_also_blocked else + f"Private repos blocked by enterprise policy: {priv_msg}\n" + f"Public repo test also returned {pub_code}: {pub_msg}" + ), + hint = ( + "This is a GitHub Enterprise Server policy restriction, not a token scope issue.\n" + "Your token has admin:org scope, but the GHE instance may require:\n" + " • A site admin or org admin to change 'Default repository permission'\n" + " • OR: set the Lambda to create repos as public (not private)\n" + " • OR: have a GHE site admin grant your account 'Create private repositories' permission\n" + "Check: https://github.e.it.census.gov/organizations/SCT-Engineering/settings/member_privileges" + ), + ) + + if priv_code == 403 and pub_code == 403: + return CheckResult( + name = "Create repo in org", + status = "fail", + detail = f"Both private ({priv_msg}) and public ({pub_msg}) repo creation blocked.", + hint = "Token needs admin:org scope AND org policy must allow member repo creation.", + ) + + return CheckResult( + name = "Create repo in org", + status = "fail", + detail = f"private: HTTP {priv_code}: {priv_msg} | public: HTTP {pub_code}: {pub_msg}", + ) + + +def chk_read_template(gh: GHClient) -> tuple[CheckResult, Optional[str]]: + """Read template repo and return its default branch.""" + org = gh.cfg.org + template = gh.cfg.template + resp = gh.get(f"/repos/{org}/{template}") + if resp.status_code == 200: + data = resp.json() + default_branch = data.get("default_branch", "main") + return CheckResult( + name = "Read template repo", + status = "pass", + detail = ( + f"[bold]{org}/{template}[/bold] is readable. " + f"Default branch: [bold]{default_branch}[/bold]." + ), + ), default_branch + if resp.status_code == 404: + return CheckResult( + name = "Read template repo", + status = "fail", + detail = f"Template repo '{org}/{template}' not found (404).", + hint = "Check TEMPLATE_REPO_NAME. Token needs repo scope and org membership.", + ), None + return CheckResult( + name = "Read template repo", + status = "fail", + detail = f"HTTP {resp.status_code}: {resp.text[:120]}", + ), None + + +def chk_list_branches(gh: GHClient, default_branch: str) -> CheckResult: + org = gh.cfg.org + template = gh.cfg.template + resp = gh.get(f"/repos/{org}/{template}/branches") + if resp.status_code == 200: + branches = [b["name"] for b in resp.json()] + default_present = default_branch in branches + return CheckResult( + name = "List template branches", + status = "pass", + detail = ( + f"Branches: {branches[:6]}{'…' if len(branches) > 6 else ''} | " + f"'{default_branch}' present: {default_present}" + ), + ) + return CheckResult( + name = "List template branches", + status = "fail", + detail = f"HTTP {resp.status_code}: {resp.text[:120]}", + hint = "Token needs repo scope.", + ) + + +def chk_read_git_tree(gh: GHClient, default_branch: str) -> CheckResult: + """Read the git tree of the template repo (used during clone_repository_contents).""" + org = gh.cfg.org + template = gh.cfg.template + resp = gh.get(f"/repos/{org}/{template}/git/trees/{default_branch}", params={"recursive": "1"}) + if resp.status_code == 200: + tree = resp.json().get("tree", []) + files = [e["path"] for e in tree if e.get("type") == "blob"] + return CheckResult( + name = "Read template git tree", + status = "pass", + detail = f"Tree has {len(tree)} entries, {len(files)} files.", + ) + return CheckResult( + name = "Read template git tree", + status = "fail", + detail = f"HTTP {resp.status_code}: {resp.text[:120]}", + hint = "Token needs repo scope.", + ) + + +def chk_team_exists(gh: GHClient) -> CheckResult: + org = gh.cfg.org + team = gh.cfg.team + resp = gh.get(f"/orgs/{org}/teams/{team}") + if resp.status_code == 200: + data = resp.json() + return CheckResult( + name = "Team exists", + status = "pass", + detail = f"Team [bold]{team}[/bold] exists (id={data.get('id')}, permission={data.get('permission')}).", + ) + if resp.status_code == 404: + return CheckResult( + name = "Team exists", + status = "warn", + detail = f"Team '{team}' not found in {org}. The Lambda will skip team assignment and continue.", + hint = "Create the team or update the default OwningTeam value in the CloudFormation template.", + ) + if resp.status_code == 403: + return CheckResult( + name = "Team exists", + status = "warn", + detail = "403 reading team – token may lack read:org scope.", + hint = "The Lambda's set_team_permission() call will fail silently (it logs and continues).", + ) + return CheckResult( + name = "Team exists", + status = "warn", + detail = f"HTTP {resp.status_code}: {resp.text[:80]}", + ) + + +def chk_set_team_permission(gh: GHClient) -> CheckResult: + """ + PUT /orgs/{org}/teams/{team}/repos/{org}/{repo} requires admin:org. + We probe with a deliberately non-existent repo name so a 404 (not 403) + tells us the scope is present but the repo just doesn't exist yet. + """ + org = gh.cfg.org + team = gh.cfg.team + # Use template repo – we're not actually changing anything (only PUT matters) + probe = gh.cfg.template + resp = gh.put( + f"/orgs/{org}/teams/{team}/repos/{org}/{probe}", + json={"permission": "admin"}, + ) + if resp.status_code in (204, 200): + return CheckResult( + name = "Set team repo permission", + status = "pass", + detail = f"Can set team permissions on repos (HTTP {resp.status_code}).", + ) + if resp.status_code == 404: + return CheckResult( + name = "Set team repo permission", + status = "pass", + detail = "404 returned – scope present but team/repo not yet linked (expected for pre-existing repos).", + ) + if resp.status_code == 403: + return CheckResult( + name = "Set team repo permission", + status = "warn", + detail = "403 – token lacks admin:org scope for team permission management.", + hint = ( + "The Lambda logs this error and continues without raising an exception.\n" + "Add admin:org (or write:org) scope to the token to fix this." + ), + ) + return CheckResult( + name = "Set team repo permission", + status = "warn", + detail = f"HTTP {resp.status_code}: {resp.text[:80]}", + ) + + +# ── Token + config resolution ───────────────────────────────────────────────── + +def _resolve_config(args: argparse.Namespace) -> CheckConfig: + """ + Priority: CLI args > Lambda env vars (read from AWS) > hardcoded defaults. + """ + session = boto3.Session(region_name=args.region) + + # --- read Lambda env vars if we can --- + lambda_env: dict[str, str] = {} + try: + lc = session.client("lambda", region_name=args.region) + cfg = lc.get_function_configuration(FunctionName=FUNCTION_NAME) + lambda_env = cfg.get("Environment", {}).get("Variables", {}) + except Exception: + pass # offline / no creds – fall through to defaults + + api_base = ( + args.api + or lambda_env.get("GITHUB_API") + or os.environ.get("GITHUB_API") + or DEFAULT_API + ) + org = ( + args.org + or lambda_env.get("GITHUB_ORG_NAME") + or os.environ.get("GITHUB_ORG_NAME") + or DEFAULT_ORG + ) + template = ( + args.template + or lambda_env.get("TEMPLATE_REPO_NAME") + or os.environ.get("TEMPLATE_REPO_NAME") + or DEFAULT_TEMPLATE + ) + secret_name = ( + args.secret + or lambda_env.get("GITHUB_TOKEN_SECRET_NAME") + or DEFAULT_SECRET + ) + verify_ssl = not args.no_ssl_verify + if lambda_env.get("VERIFY_SSL", "true").lower() == "false": + verify_ssl = False + + # --- resolve token --- + token = args.token or os.environ.get("GITHUB_TOKEN", "") + if not token: + console.print(f"[dim]Fetching token from Secrets Manager: {secret_name}[/dim]") + try: + sm = session.client("secretsmanager", region_name=args.region) + resp = sm.get_secret_value(SecretId=secret_name) + token = resp.get("SecretString", "") + except ClientError as e: + console.print(f"[red]Cannot read secret '{secret_name}': {e}[/red]") + sys.exit(1) + + if not token: + console.print(f"[red]No token available – pass --token or set GITHUB_TOKEN.[/red]") + sys.exit(1) + + return CheckConfig( + api_base = api_base, + org = org, + template = template, + team = args.team, + token = token, + verify_ssl = verify_ssl, + region = args.region, + secret_name = secret_name, + ) + + +# ── Rendering ───────────────────────────────────────────────────────────────── + +STATUS_ICON = { + "pass": f"[green]✔ PASS[/green]", + "fail": f"[red]✗ FAIL[/red]", + "warn": f"[yellow]⚠ WARN[/yellow]", + "skip": f"[dim]– SKIP[/dim]", +} + +STATUS_COLOR = { + "pass": "green", + "fail": "red", + "warn": "yellow", + "skip": "dim", +} + + +def render(result: CheckResult) -> None: + icon = STATUS_ICON[result.status] + color = STATUS_COLOR[result.status] + console.print(f"{icon} [bold {color}]{result.name}[/bold {color}]") + if result.detail: + for line in result.detail.splitlines(): + console.print(f" [dim]{line}[/dim]") + if result.hint: + for line in result.hint.splitlines(): + console.print(f" [yellow]↳ {line}[/yellow]") + + +def render_summary(results: list[CheckResult]) -> None: + console.print(Rule("[bold]Summary[/bold]")) + + table = Table(show_header=True, header_style="bold cyan", expand=True) + table.add_column("Check", style="bold", min_width=30) + table.add_column("Status", justify="center", min_width=10) + table.add_column("Notes", overflow="fold") + + for r in results: + table.add_row(r.name, STATUS_ICON[r.status], r.detail.splitlines()[0][:80] if r.detail else "") + + console.print(table) + + counts = {s: sum(1 for r in results if r.status == s) for s in ("pass", "fail", "warn", "skip")} + parts = [] + if counts["pass"]: + parts.append(f"[green]{counts['pass']} passed[/green]") + if counts["fail"]: + parts.append(f"[red]{counts['fail']} failed[/red]") + if counts["warn"]: + parts.append(f"[yellow]{counts['warn']} warnings[/yellow]") + + failed = counts["fail"] > 0 + border = "red" if failed else ("yellow" if counts["warn"] else "green") + title = "[red]✗ PERMISSIONS INCOMPLETE[/red]" if failed else "[green]✔ ALL CHECKS PASSED[/green]" + body = " ".join(parts) + + if failed: + failing_names = [r.name for r in results if r.status == "fail"] + body += f"\n\n[red]Failing checks:[/red] {', '.join(failing_names)}" + + console.print(Panel(body, title=title, border_style=border)) + + +# ── main ───────────────────────────────────────────────────────────────────── + +def main() -> None: + parser = argparse.ArgumentParser( + description="Verify GitHub PAT permissions for the Service Catalog repo creator Lambda.", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--token", default="", help="PAT to test (default: from Secrets Manager)") + parser.add_argument("--api", default="", help="GitHub API base URL") + parser.add_argument("--org", default="", help="GitHub organization") + parser.add_argument("--template", default="", help="Template repo name") + parser.add_argument("--team", default=DEFAULT_TEAM, help="Owning team slug to check") + parser.add_argument("--no-ssl-verify", action="store_true", help="Disable TLS verification") + parser.add_argument("--region", default=REGION, help="AWS region") + parser.add_argument("--secret", default="", help="Secrets Manager secret name") + args = parser.parse_args() + + cfg = _resolve_config(args) + + console.print(Rule("[bold cyan]GitHub Permissions Checker[/bold cyan]")) + console.print(f" [dim]API base :[/dim] [bold]{cfg.api_base}[/bold]") + console.print(f" [dim]Org :[/dim] [bold]{cfg.org}[/bold]") + console.print(f" [dim]Template :[/dim] [bold]{cfg.template}[/bold]") + console.print(f" [dim]Team :[/dim] [bold]{cfg.team}[/bold]") + console.print(f" [dim]SSL :[/dim] {'verify' if cfg.verify_ssl else '[yellow]disabled[/yellow]'}") + console.print(f" [dim]Token :[/dim] {cfg.token[:8]}… (length {len(cfg.token)})\n") + + # Suppress InsecureRequestWarning noise when SSL is off + if not cfg.verify_ssl: + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + gh = GHClient(cfg) + results: list[CheckResult] = [] + + # ── 1. Token validity ──────────────────────────────────────────────────── + console.print(Rule("Authentication")) + r, username = chk_token_valid(gh) + results.append(r); render(r) + if r.status == "fail": + render_summary(results) + sys.exit(1) + + # ── 2. Scope header (informational) ───────────────────────────────────── + r = chk_token_scopes(gh) + results.append(r); render(r) + + # ── 3–4. Org access & membership ──────────────────────────────────────── + console.print(Rule("Organization")) + r = chk_read_org(gh) + results.append(r); render(r) + + if username: + r = chk_org_membership(gh, username) + results.append(r); render(r) + + r = chk_list_org_repos(gh) + results.append(r); render(r) + + # ── 5. Create repo permission (the one that failed) ────────────────────── + console.print(Rule("Repository Creation")) + r = chk_create_repo(gh) + results.append(r); render(r) + + # ── 6–8. Template repo access ──────────────────────────────────────────── + console.print(Rule("Template Repository")) + r, default_branch = chk_read_template(gh) + results.append(r); render(r) + + if default_branch: + r = chk_list_branches(gh, default_branch) + results.append(r); render(r) + + r = chk_read_git_tree(gh, default_branch) + results.append(r); render(r) + + # ── 9–10. Team permission ──────────────────────────────────────────────── + console.print(Rule("Team Permissions")) + r = chk_team_exists(gh) + results.append(r); render(r) + + r = chk_set_team_permission(gh) + results.append(r); render(r) + + # ── Summary ─────────────────────────────────────────────────────────────── + render_summary(results) + + failed = any(r.status == "fail" for r in results) + sys.exit(1 if failed else 0) + + +if __name__ == "__main__": + main() diff --git a/scripts/test_workflow.py b/scripts/test_workflow.py new file mode 100644 index 00000000..bda95722 --- /dev/null +++ b/scripts/test_workflow.py @@ -0,0 +1,636 @@ +#!/usr/bin/env python3 +""" +End-to-end workflow tester for the Service Catalog GitHub Repository Creator. + +Tests the full stack: + 1. AWS connectivity (credentials, region) + 2. Secrets Manager – token is readable and non-expired + 3. GitHub API – token can reach the GHE instance + 4. Lambda – environment variables are configured + 5. Lambda invocation – async invoke + CloudWatch log tail + 6. (optional) Service Catalog provisioned-product launch + +Usage +----- + python scripts/test_workflow.py [REPO_NAME] + + REPO_NAME defaults to "workflow-test-" +""" + +import json +import os +import re +import sys +import time +import uuid +from datetime import datetime, timezone +from typing import Any, Optional + +import boto3 +import requests +from botocore.exceptions import ClientError, NoCredentialsError +from pydantic import BaseModel, Field, field_validator +from rich.console import Console +from rich.panel import Panel +from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn +from rich.rule import Rule +from rich.table import Table +from rich import print as rprint + +# --------------------------------------------------------------------------- +# Constants – derived from the deployed infrastructure +# --------------------------------------------------------------------------- + +REGION = "us-gov-west-1" +FUNCTION_NAME = "service-catalog-repo-gen-template-automation" +SECRET_NAME_ENV_KEY = "GITHUB_TOKEN_SECRET_NAME" +DEFAULT_SECRET_PATH = "/eks-cluster-deployment/github_token" +LOG_GROUP = f"/aws/lambda/{FUNCTION_NAME}" +GITHUB_API_ENV_KEY = "GITHUB_API" +GITHUB_ORG_ENV_KEY = "GITHUB_ORG_NAME" +TEMPLATE_REPO_ENV_KEY = "TEMPLATE_REPO_NAME" +VERIFY_SSL_ENV_KEY = "VERIFY_SSL" + +console = Console() + + +# --------------------------------------------------------------------------- +# Pydantic models +# --------------------------------------------------------------------------- + +class AWSConfig(BaseModel): + region: str = REGION + function_name: str = FUNCTION_NAME + log_group: str = LOG_GROUP + +class LambdaEnvConfig(BaseModel): + """Represents the expected Lambda environment variables.""" + github_api: str + github_org_name: str + github_token_secret: str + template_repo_name: str + verify_ssl: bool + + @field_validator("github_api") + @classmethod + def no_trailing_slash(cls, v: str) -> str: + return v.rstrip("/") + +class GitHubConnectResult(BaseModel): + reachable: bool + status_code: Optional[int] = None + message: str + rate_limit_remaining: Optional[int] = None + token_prefix: str = "" + +class LambdaInvokeResult(BaseModel): + accepted: bool + request_id: str = "" + log_stream: str = "" + status: str = "UNKNOWN" # SUCCESS | FAILED | TIMEOUT + repo_url: str = "" + pr_url: str = "" + error_message: str = "" + duration_ms: float = 0 + +class StepResult(BaseModel): + name: str + passed: bool + detail: str = "" + warning: str = "" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _aws_session() -> boto3.Session: + return boto3.Session(region_name=REGION) + + +def _check_aws_credentials(session: boto3.Session) -> StepResult: + try: + sts = session.client("sts") + identity = sts.get_caller_identity() + account = identity["Account"] + arn = identity["Arn"] + return StepResult( + name="AWS Credentials", + passed=True, + detail=f"Account: {account}\nIdentity: {arn}", + ) + except NoCredentialsError: + return StepResult( + name="AWS Credentials", + passed=False, + detail="No AWS credentials found. Run `source ~/aws-creds` or configure ~/.aws/credentials.", + ) + except ClientError as e: + return StepResult( + name="AWS Credentials", + passed=False, + detail=str(e), + ) + + +def _get_lambda_env(session: boto3.Session) -> tuple[StepResult, Optional[LambdaEnvConfig]]: + lc = session.client("lambda", region_name=REGION) + try: + cfg = lc.get_function_configuration(FunctionName=FUNCTION_NAME) + env = cfg.get("Environment", {}).get("Variables", {}) + + missing = [ + k for k in (GITHUB_API_ENV_KEY, GITHUB_ORG_ENV_KEY, + SECRET_NAME_ENV_KEY, TEMPLATE_REPO_ENV_KEY) + if k not in env + ] + if missing: + return StepResult( + name="Lambda Env Vars", + passed=False, + detail=f"Missing required variables: {', '.join(missing)}", + ), None + + config = LambdaEnvConfig( + github_api = env[GITHUB_API_ENV_KEY], + github_org_name = env[GITHUB_ORG_ENV_KEY], + github_token_secret = env[SECRET_NAME_ENV_KEY], + template_repo_name = env[TEMPLATE_REPO_ENV_KEY], + verify_ssl = env.get(VERIFY_SSL_ENV_KEY, "true").lower() != "false", + ) + rows = "\n".join(f" {k}: {v}" for k, v in env.items() + if "token" not in k.lower() and "secret" not in k.lower()) + return StepResult( + name="Lambda Env Vars", + passed=True, + detail=rows, + ), config + + except ClientError as e: + return StepResult( + name="Lambda Env Vars", + passed=False, + detail=str(e), + ), None + + +def _get_secret(session: boto3.Session, secret_name: str) -> tuple[StepResult, str]: + sm = session.client("secretsmanager", region_name=REGION) + try: + resp = sm.get_secret_value(SecretId=secret_name) + secret = resp.get("SecretString", "") + + if not secret: + return StepResult( + name="Secrets Manager", + passed=False, + detail=f"Secret '{secret_name}' exists but is empty.", + ), "" + + prefix = secret[:8] + return StepResult( + name="Secrets Manager", + passed=True, + detail=f"Secret '{secret_name}' retrieved (prefix: {prefix}…, length: {len(secret)})", + ), secret + + except ClientError as e: + return StepResult( + name="Secrets Manager", + passed=False, + detail=str(e), + ), "" + + +def _check_github(config: LambdaEnvConfig, token: str) -> tuple[StepResult, GitHubConnectResult]: + """ + Validate the PAT against the GHE instance. + Tries /user first (works for PATs); falls back to /api/v3/user for + base-URL-only hosts. + """ + verify = config.verify_ssl + + # Build the full API base (the code prepends /api/v3 when it's absent) + api_base = config.github_api + if "/api/v3" not in api_base: + api_base = api_base.rstrip("/") + "/api/v3" + + auth_header = f"Bearer {token}" if ( + token.startswith(("ghs_", "ghu_", "github_pat_")) or len(token) > 50 + ) else f"token {token}" + + headers = { + "Authorization": auth_header, + "Accept": "application/vnd.github.v3+json", + } + + endpoint = f"{api_base}/user" + try: + resp = requests.get(endpoint, headers=headers, verify=verify, timeout=10) + rl_remaining = int(resp.headers.get("X-RateLimit-Remaining", -1)) + + if resp.status_code == 200: + username = resp.json().get("login", "unknown") + gc = GitHubConnectResult( + reachable=True, + status_code=200, + message=f"Authenticated as '{username}'", + rate_limit_remaining=rl_remaining, + token_prefix=token[:8], + ) + return StepResult( + name="GitHub API", + passed=True, + detail=( + f"URL: {endpoint}\n" + f"User: {username}\n" + f"Rate-limit remaining: {rl_remaining}" + ), + ), gc + + gc = GitHubConnectResult( + reachable=True, + status_code=resp.status_code, + message=resp.text[:200], + token_prefix=token[:8], + ) + return StepResult( + name="GitHub API", + passed=False, + detail=f"HTTP {resp.status_code} from {endpoint}: {resp.text[:200]}", + ), gc + + except requests.exceptions.SSLError as e: + gc = GitHubConnectResult(reachable=False, message=str(e), token_prefix=token[:8]) + return StepResult( + name="GitHub API", + passed=False, + detail=( + f"SSL verification failed for {endpoint}.\n" + f"Hint: set VERIFY_SSL=false on the Lambda until the CA cert is baked in.\n" + f"Error: {e}" + ), + ), gc + + except requests.exceptions.ConnectionError as e: + gc = GitHubConnectResult(reachable=False, message=str(e), token_prefix=token[:8]) + return StepResult( + name="GitHub API", + passed=False, + detail=f"Cannot reach {endpoint}: {e}", + ), gc + + +def _build_cfn_payload(repo_name: str, lambda_arn: str, stack_id: str, request_id: str) -> dict: + return { + "RequestType": "Create", + "ResponseURL": "https://httpbin.org/put", # dummy – will time out but won't block Lambda + "StackId": stack_id, + "RequestId": request_id, + "ResourceType": "Custom::GitHubRepository", + "LogicalResourceId": "TestRepository", + "ResourceProperties": { + "ServiceToken": lambda_arn, + "ProjectName": repo_name, + "OwningTeam": "tf-module-admins", + "Environment": "development", + "AwsRegion": REGION, + }, + } + + +def _invoke_lambda(session: boto3.Session, repo_name: str) -> tuple[StepResult, LambdaInvokeResult]: + lc = session.client("lambda", region_name=REGION) + + # Get Lambda ARN for the payload ServiceToken + try: + cfg = lc.get_function_configuration(FunctionName=FUNCTION_NAME) + lambda_arn = cfg["FunctionArn"] + except ClientError as e: + return StepResult(name="Lambda Invoke", passed=False, detail=str(e)), LambdaInvokeResult(accepted=False) + + stack_id = f"arn:aws-us-gov:cloudformation:{REGION}:229685449397:stack/test-wf-{repo_name}/test-id" + request_id = str(uuid.uuid4()) + payload = _build_cfn_payload(repo_name, lambda_arn, stack_id, request_id) + + try: + resp = lc.invoke( + FunctionName = FUNCTION_NAME, + InvocationType = "Event", # async – avoids CLI timeout + Payload = json.dumps(payload).encode(), + ) + accepted = resp["StatusCode"] == 202 + return StepResult( + name = "Lambda Invoke", + passed = accepted, + detail = ( + f"Repo name: {repo_name}\n" + f"Request ID: {request_id}\n" + f"HTTP status: {resp['StatusCode']}" + ), + ), LambdaInvokeResult(accepted=accepted, request_id=request_id) + + except ClientError as e: + return StepResult(name="Lambda Invoke", passed=False, detail=str(e)), LambdaInvokeResult(accepted=False) + + +def _tail_logs( + session: boto3.Session, + request_id: str, + repo_name: str, + wait_seconds: int = 120, +) -> LambdaInvokeResult: + """ + Poll CloudWatch Logs until we see the Lambda request complete or time out. + + The Lambda logs use its own AWS request ID as a prefix – not the CFN request + ID we sent. We therefore match on repo_name (appears in the Raw event) and + on the CFN request_id (also appears in the Raw event JSON). + """ + logs = session.client("logs", region_name=REGION) + result = LambdaInvokeResult(accepted=True, request_id=request_id) + deadline = time.time() + wait_seconds + + # We match log lines that belong to our invocation by repo name OR cfn request id + invocation_markers = [repo_name, request_id] + + console.print(f"\n[dim]Tailing CloudWatch Logs for repo [bold]{repo_name}[/bold] …[/dim]") + + stream_name: Optional[str] = None + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + console=console, + transient=True, + ) as progress: + task = progress.add_task("Waiting for Lambda log stream…", total=None) + + # ── Phase 1: find the log stream ────────────────────────────────────── + # `describe-log-streams` lastEventTimestamp is stale/cached by AWS. + # Use filter-log-events with our repo_name to locate the right stream. + time.sleep(6) # give Lambda a moment to start writing logs + search_start_ms = int((time.time() - 30) * 1000) # 30s before now + + while time.time() < deadline: + try: + hits = logs.filter_log_events( + logGroupName = LOG_GROUP, + startTime = search_start_ms, + filterPattern = f'"{repo_name}"', + limit = 5, + )["events"] + except ClientError: + time.sleep(3) + continue + + if hits: + stream_name = hits[0]["logStreamName"] + progress.update(task, description=f"Found stream: {stream_name}") + result.log_stream = stream_name + break + + time.sleep(5) + + if not stream_name: + result.status = "TIMEOUT" + result.error_message = "No recent log stream found within timeout." + return result + + # ── Phase 2: read events and look for completion ─────────────────────── + # Lambda logs use its own request ID as prefix, not the CFN request ID. + # We discover the Lambda request ID by spotting the "Raw event" line + # that contains our CFN request_id or repo_name. + next_token: Optional[str] = None + lambda_req_id: Optional[str] = None + + while time.time() < deadline: + kwargs: dict[str, Any] = { + "logGroupName": LOG_GROUP, + "logStreamName": stream_name, + "limit": 100, + "startFromHead": True, + } + if next_token: + kwargs["nextToken"] = next_token + + try: + page = logs.get_log_events(**kwargs) + except ClientError: + time.sleep(3) + continue + + events = page.get("events", []) + + for ev in events: + msg = ev.get("message", "") + + # Discover the Lambda-internal request ID from the Raw event line + if lambda_req_id is None: + if any(m in msg for m in invocation_markers) and "Raw event" in msg: + # Log format: [INFO]\t\t\t... + parts = msg.split("\t") + if len(parts) >= 3: + lambda_req_id = parts[2].strip() + progress.update(task, description=f"Tracking Lambda req: {lambda_req_id[:8]}…") + + # Filter to lines that belong to our invocation + if lambda_req_id and lambda_req_id not in msg: + continue + if lambda_req_id is None and not any(m in msg for m in invocation_markers): + continue + + progress.update(task, description=msg[:80].strip()) + + # Detect success + if "Operation completed successfully" in msg or ( + "Sending CloudFormation response" in msg and '"Status": "SUCCESS"' in msg + ): + result.status = "SUCCESS" + cfn_match = re.search(r'Sending CloudFormation response: ({.*})', msg) + if cfn_match: + try: + cfn_data = json.loads(cfn_match.group(1)) + data = cfn_data.get("Data", {}) + result.repo_url = data.get("RepositoryUrl", "") + result.pr_url = data.get("PullRequestUrl", data.get("MergeRequestUrl", "")) + except json.JSONDecodeError: + pass + + # Detect failure + if '"Status": "FAILED"' in msg or 'Status": "FAILED"' in msg: + result.status = "FAILED" + cfn_match = re.search(r'Sending CloudFormation response: ({.*})', msg) + if cfn_match: + try: + cfn_data = json.loads(cfn_match.group(1)) + result.error_message = cfn_data.get("Reason", "Unknown error") + except json.JSONDecodeError: + result.error_message = msg[:300] + + # REPORT line gives us wall-clock duration + if msg.startswith("REPORT RequestId:") and lambda_req_id and lambda_req_id in msg: + dur_match = re.search(r'Duration: ([\d.]+) ms', msg) + if dur_match: + result.duration_ms = float(dur_match.group(1)) + + new_token = page.get("nextForwardToken") # response always uses this key + if new_token == next_token: + if result.status in ("SUCCESS", "FAILED"): + break + time.sleep(5) + else: + next_token = new_token + + if result.status == "UNKNOWN": + result.status = "TIMEOUT" + result.error_message = f"Lambda did not complete within {wait_seconds}s." + + return result + + +# --------------------------------------------------------------------------- +# Rendering helpers +# --------------------------------------------------------------------------- + +STATUS_ICONS = {True: "[green]✔[/green]", False: "[red]✗[/red]"} +INVOKE_ICONS = {"SUCCESS": "[green]✔ SUCCESS[/green]", + "FAILED": "[red]✗ FAILED[/red]", + "TIMEOUT": "[yellow]⏱ TIMEOUT[/yellow]", + "UNKNOWN": "[dim]? UNKNOWN[/dim]"} + + +def render_step(step: StepResult) -> None: + icon = STATUS_ICONS[step.passed] + color = "green" if step.passed else "red" + console.print(f"{icon} [bold {color}]{step.name}[/bold {color}]") + if step.detail: + for line in step.detail.splitlines(): + console.print(f" [dim]{line}[/dim]") + if step.warning: + console.print(f" [yellow]⚠ {step.warning}[/yellow]") + + +def render_summary(steps: list[StepResult], invoke: Optional[LambdaInvokeResult]) -> None: + console.print(Rule("[bold]Summary[/bold]")) + + table = Table(show_header=True, header_style="bold cyan", expand=True) + table.add_column("Step", style="bold") + table.add_column("Result", justify="center") + table.add_column("Notes", overflow="fold") + + for s in steps: + icon = STATUS_ICONS[s.passed] + notes = (s.detail.splitlines()[0] if s.detail else "") or s.warning + table.add_row(s.name, icon, notes) + + if invoke: + icon = INVOKE_ICONS.get(invoke.status, "?") + notes = invoke.repo_url or invoke.error_message or "" + table.add_row("Lambda Execution", icon, notes[:80]) + + console.print(table) + + if invoke and invoke.status == "SUCCESS": + console.print(Panel( + f"[bold green]Workflow completed successfully![/bold green]\n\n" + f"[bold]Repository URL:[/bold] {invoke.repo_url}\n" + f"[bold]Pull Request URL:[/bold] {invoke.pr_url}\n" + f"[bold]Duration:[/bold] {invoke.duration_ms / 1000:.1f}s", + title="[bold green]✔ PASS[/bold green]", + border_style="green", + )) + elif invoke and invoke.status == "FAILED": + console.print(Panel( + f"[bold red]Lambda reported FAILED.[/bold red]\n\n" + f"[bold]Error:[/bold] {invoke.error_message}", + title="[bold red]✗ FAIL[/bold red]", + border_style="red", + )) + elif invoke and invoke.status == "TIMEOUT": + console.print(Panel( + f"[yellow]Lambda did not report completion within the wait window.[/yellow]\n\n" + f"Check CloudWatch Logs:\n" + f" Log group: {LOG_GROUP}\n" + f" Stream: {invoke.log_stream}", + title="[yellow]⏱ TIMEOUT[/yellow]", + border_style="yellow", + )) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + repo_name = sys.argv[1] if len(sys.argv) > 1 else f"workflow-test-{int(time.time())}" + + console.print(Rule("[bold cyan]Service Catalog Repo Generator – Workflow Test[/bold cyan]")) + console.print(f"[dim]Timestamp : {datetime.now(timezone.utc).isoformat()}[/dim]") + console.print(f"[dim]Test repo : [bold]{repo_name}[/bold][/dim]") + console.print(f"[dim]Lambda : {FUNCTION_NAME}[/dim]") + console.print(f"[dim]Region : {REGION}[/dim]\n") + + session = _aws_session() + steps: list[StepResult] = [] + invoke_result: Optional[LambdaInvokeResult] = None + + # ── Step 1: AWS credentials ─────────────────────────────────────────────── + console.print(Rule("Step 1 · AWS Credentials")) + creds = _check_aws_credentials(session) + steps.append(creds) + render_step(creds) + if not creds.passed: + render_summary(steps, None) + sys.exit(1) + + # ── Step 2: Lambda environment variables ────────────────────────────────── + console.print(Rule("Step 2 · Lambda Environment Variables")) + env_step, lambda_cfg = _get_lambda_env(session) + steps.append(env_step) + render_step(env_step) + if not env_step.passed or lambda_cfg is None: + render_summary(steps, None) + sys.exit(1) + + # ── Step 3: Secrets Manager – retrieve token ────────────────────────────── + console.print(Rule("Step 3 · Secrets Manager")) + secret_step, token = _get_secret(session, lambda_cfg.github_token_secret) + steps.append(secret_step) + render_step(secret_step) + if not secret_step.passed: + render_summary(steps, None) + sys.exit(1) + + # ── Step 4: GitHub API connectivity ─────────────────────────────────────── + console.print(Rule("Step 4 · GitHub API Connectivity")) + gh_step, gh_result = _check_github(lambda_cfg, token) + steps.append(gh_step) + render_step(gh_step) + if not gh_step.passed: + console.print("[yellow]⚠ Skipping Lambda invocation due to GitHub connectivity failure.[/yellow]") + render_summary(steps, None) + sys.exit(1) + + # ── Step 5: Lambda invocation (async) ───────────────────────────────────── + console.print(Rule("Step 5 · Lambda Invocation")) + inv_step, invoke_result = _invoke_lambda(session, repo_name) + steps.append(inv_step) + render_step(inv_step) + if not inv_step.passed: + render_summary(steps, invoke_result) + sys.exit(1) + + # ── Step 6: Tail CloudWatch logs for result ─────────────────────────────── + console.print(Rule("Step 6 · CloudWatch Log Tail (max 120s)")) + invoke_result = _tail_logs(session, invoke_result.request_id, repo_name, wait_seconds=120) + + # ── Final summary ───────────────────────────────────────────────────────── + render_summary(steps, invoke_result) + + sys.exit(0 if invoke_result.status == "SUCCESS" else 1) + + +if __name__ == "__main__": + main() diff --git a/template_automation/app.py b/template_automation/app.py index e4d83226..b5ee2142 100644 --- a/template_automation/app.py +++ b/template_automation/app.py @@ -18,7 +18,7 @@ import requests from pydantic import BaseModel, Field -from .repository_provider import MergeRequestSettings, FileContent +from .repository_provider import MergeRequestSettings, FileContent, RepositorySettings from .github_provider import GitHubProvider from .gitlab_provider import GitLabProvider @@ -509,8 +509,14 @@ def lambda_handler(event: dict, context) -> dict: # Get or create repository logger.info(f"[{request_id}] Getting/creating repository: {cfn_input.project_name}") + repo_visibility = os.environ.get("REPO_VISIBILITY", "internal") + logger.info(f"[{request_id}] Using repository visibility: {repo_visibility}") try: - project = provider.get_repository(cfn_input.project_name, create=True) + project = provider.get_repository( + cfn_input.project_name, + create=True, + settings=RepositorySettings(visibility=repo_visibility), + ) logger.info(f"[{request_id}] Repository operation successful") logger.info(f"[{request_id}] Repository details: {json.dumps(project, default=str, indent=2)}") except Exception as e: diff --git a/template_automation/github_provider.py b/template_automation/github_provider.py index 32e97039..6a49e3d2 100644 --- a/template_automation/github_provider.py +++ b/template_automation/github_provider.py @@ -84,7 +84,8 @@ def _request(self, method: str, url: str, **kwargs) -> Any: # For GitHub Enterprise, check if we need to add /api/v3 if self.api_base_url and '.github.com' not in self.api_base_url: # GitHub Enterprise API requires /api/v3 prefix - if not url.startswith('/api/v3'): + # But skip if api_base_url already includes /api/v3 + if '/api/v3' not in self.api_base_url and not url.startswith('/api/v3'): url = f"/api/v3{url}" if url.startswith('/') else f"/api/v3/{url}" url = f"{self.api_base_url}{url}" @@ -157,12 +158,16 @@ def get_repository( if not settings: settings = RepositorySettings() - # Set up repository creation data + # Set up repository creation data. + # NOTE: 'private' must be False for 'internal' visibility; GHE treats + # private=True as a private-repo request and will 403 if the enterprise + # policy blocks private repo creation for org members. + effective_visibility = settings.visibility or 'internal' create_data = { 'name': name, - 'private': settings.visibility == 'private', + 'private': effective_visibility == 'private', 'auto_init': True, # Ensure GitHub creates a default branch with a README - 'visibility': settings.visibility or 'private' + 'visibility': effective_visibility, } if settings.description: create_data['description'] = settings.description diff --git a/template_automation/repository_provider.py b/template_automation/repository_provider.py index dd8ac4ad..e39a3a0a 100644 --- a/template_automation/repository_provider.py +++ b/template_automation/repository_provider.py @@ -3,13 +3,30 @@ This module defines the common interface that all repository providers (GitHub, GitLab) must implement. """ +import os from abc import ABC, abstractmethod from typing import Dict, List, Optional, Any from pydantic import BaseModel, Field + +def _default_visibility() -> str: + """Read REPO_VISIBILITY env var, defaulting to 'internal'. + + GHE enterprise policy commonly blocks private repo creation for org members. + 'internal' repos are visible to all org members but not to the public, + which is the recommended default for government/enterprise GHE instances. + Accepted values: private | internal | public + """ + return os.environ.get("REPO_VISIBILITY", "internal") + + class RepositorySettings(BaseModel): """Settings for repository creation and management.""" - visibility: str = Field(default="private", description="Repository visibility (private, internal, public)") + visibility: str = Field( + default_factory=_default_visibility, + description="Repository visibility (private, internal, public). " + "Overridable via REPO_VISIBILITY env var.", + ) description: Optional[str] = Field(default=None, description="Repository description") topics: Optional[List[str]] = Field(default=None, description="Repository topics/tags")