Skip to content

Commit

Permalink
Refactor GitHubClient: remove clone_repository_contents and commit_re…
Browse files Browse the repository at this point in the history
…pository_contents methods; add update_repository_topics method to manage repository topics; update operate_github to handle topics and specific commit SHA.
  • Loading branch information
Dave Arnold committed Apr 23, 2025
1 parent 263a142 commit 2b9b098
Showing 1 changed file with 51 additions and 184 deletions.
235 changes: 51 additions & 184 deletions eks_automation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,195 +402,29 @@ def create_reference(self, repo_name, ref, sha):
logger.error(error_message)
raise Exception(error_message)

def clone_repository_contents(self, source_repo, target_dir, branch=None):
"""Clone a repository's contents to a local directory using GitHub API
Args:
source_repo (str): Name of the source repository
target_dir (str): Target directory to download files to
branch (str, optional): Branch to clone from. If None, uses default branch.
Returns:
str: The branch name that was cloned
"""
# Create the target directory if it doesn't exist
os.makedirs(target_dir, exist_ok=True)

try:
if branch:
target_branch = branch
# Try to get the branch's reference directly
tree_sha = self.get_reference_sha(source_repo, f"heads/{target_branch}")
else:
# If no branch specified, use default branch
target_branch = self.get_default_branch(source_repo)
tree_sha = self.get_reference_sha(source_repo, f"heads/{target_branch}")
except Exception as e:
logger.warning(f"Failed to get reference for {branch or 'default branch'}: {str(e)}")
target_branch = branch or "main"
# If we can't get the reference, the branch might not exist yet
tree = {"tree": []}
self.download_repository_files(source_repo, tree, target_dir)
return target_branch

# Get the full tree for the branch
logger.info(f"Getting file tree from {source_repo} for branch {target_branch}")
tree = self.get_tree(source_repo, tree_sha, recursive=True)

# Download all files
logger.info(f"Downloading all files from {source_repo} using ref: heads/{target_branch}")
self.download_repository_files(source_repo, tree, target_dir)

return target_branch
def update_repository_topics(self, repo_name, topics):
"""Update the topics for a repository
def commit_repository_contents(self, repo_name, work_dir, commit_message, branch=None):
"""Commit all files from a directory to a repository
Args:
repo_name (str): Name of the repository
work_dir (str): Directory containing the files to commit
commit_message (str): Commit message
branch (str, optional): Branch to commit to. If None, uses default branch.
Returns:
str: The branch name that was committed to
topics (list): List of topic strings to set
"""
# First, get the current state of the target repository
try:
target_branch = branch or self.get_default_branch(repo_name)
except Exception:
# If we can't get the default branch, it might be a new repo
target_branch = branch or "main"
api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/topics"

# Upload all files to the repository
tree_items = []
# GitHub requires a special header for the topics API
headers = self.headers.copy()
headers["Accept"] = "application/vnd.github.mercy-preview+json"

# Add all files from the work directory to the repository
for root, _, files in os.walk(work_dir):
for file in files:
file_path = os.path.join(root, file)
repo_path = os.path.relpath(file_path, work_dir)

# Skip .git directory if it exists
if ".git" in repo_path.split(os.path.sep):
continue

# Read file content
with open(file_path, "rb") as f:
file_content = f.read()

# Create blob for the file
blob_sha = self.create_blob(repo_name, file_content)

# Add to tree items
tree_items.append({
"path": repo_path,
"mode": "100644", # Regular file
"type": "blob",
"sha": blob_sha
})

# Try to get the latest commit SHA from the base branch
base_branch = "main" # Always use main as base when creating new branches
try:
base_commit_sha = self.get_reference_sha(repo_name, f"heads/{base_branch}")
base_commit = self.get_commit(repo_name, base_commit_sha)
base_tree_sha = base_commit["tree"]["sha"]
except Exception:
# If we can't get the reference, assume it's a new repo with no commits
base_tree_sha = None

# Create a new tree with all the files
new_tree_sha = self.create_tree(repo_name, tree_items, base_tree_sha)
data = {
"names": topics
}

# Create a commit with the new tree
if base_tree_sha:
# If we have a base tree, include the parent commit
new_commit_sha = self.create_commit(
repo_name,
commit_message,
new_tree_sha,
[base_commit_sha]
)
else:
# If it's a new repo, create the first commit
new_commit_sha = self.create_commit(
repo_name,
commit_message,
new_tree_sha,
[]
)

# Update or create the reference to point to the new commit
try:
# Try to update existing branch
self.update_reference(
repo_name,
f"heads/{target_branch}",
new_commit_sha
)
except Exception:
# If the branch doesn't exist, create it
try:
self.create_reference(
repo_name,
f"refs/heads/{target_branch}",
new_commit_sha
)
except Exception as e:
# If we still can't create the branch, something is wrong
error_message = f"Failed to create or update branch {target_branch} for {repo_name}: {str(e)}"
logger.error(error_message)
raise Exception(error_message)
response = requests.put(api_url, headers=headers, json=data, verify=False)

return target_branch


# pylint: disable=unused-argument
def lambda_handler(event, context):
"""Main Lambda handler function
Args:
event (dict): Dict containing the Lambda function event data
context (dict): Lambda runtime context
Returns:
dict: Dict containing status message
"""
logger.info(f"Lambda function invoked with RequestId: {context.aws_request_id}")
logger.info(f"Remaining time in milliseconds: {context.get_remaining_time_in_millis()}")
logger.info(f"Received event: {json.dumps(event, indent=2)}")

input_data = event["body"]
logger.info(f"Extracted input data from event body: {json.dumps(input_data, indent=2)}")

project_name = input_data["project_name"]
eks_settings = input_data["eks_settings"]
logger.info(f"Project name: {project_name}")
logger.info(f"EKS settings to be applied: {json.dumps(eks_settings, indent=2)}")

if not project_name:
logger.error("Missing project name in input")
return {
"statusCode": 400,
"body": json.dumps({"error": "Missing project name"})
}

try:
logger.info(f"Starting GitHub operations for project: {project_name}")
operate_github(project_name, eks_settings)
logger.info("GitHub operations completed successfully")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error in operate_github: {str(e)}")
logger.error(f"Stack trace: {traceback.format_exc()}")
return {"statusCode": 400, "body": json.dumps({"error": str(e)})}

logger.info("Lambda execution completed successfully")
return {
"statusCode": 200,
"headers": {"Access-Control-Allow-Origin": "*"},
"body": json.dumps({"result": "Success"})
}
if response.status_code not in (200, 201):
error_message = f"Failed to update topics for {repo_name}: {response.status_code} - {response.text}"
logger.error(error_message)
raise Exception(error_message)

def operate_github(new_repo_name, eks_settings):
"""Write EKS settings to config.json and create/update repository using GitHub API
Expand Down Expand Up @@ -637,16 +471,39 @@ def operate_github(new_repo_name, eks_settings):
config_file_name
)

# Get info about original repo
# Get info about original repo and determine correct commit SHA
logger.info(f"Fetching original repository information: {template_repo_name}")
orig_repo = github.get_repository(template_repo_name)

# Determine source commit SHA based on source_version
if source_version:
try:
# Try as a tag/release first
source_commit_sha = github.get_reference_sha(template_repo_name, f"tags/{source_version}")
except Exception:
try:
# Try as direct SHA
source_commit = github.get_commit(template_repo_name, source_version)
source_commit_sha = source_version
except Exception:
# If both fail, log error and use default branch
logger.warning(f"Could not find version {source_version}, using default branch")
default_branch = orig_repo["default_branch"]
source_commit_sha = github.get_reference_sha(template_repo_name, f"heads/{default_branch}")
else:
# No version specified, use default branch
default_branch = orig_repo["default_branch"]
source_commit_sha = github.get_reference_sha(template_repo_name, f"heads/{default_branch}")

logger.info(f"Using source commit SHA: {source_commit_sha}")

# Get or create the new repository
logger.info(f"Getting or creating repository: {new_repo_name}")
new_repo = github.get_repository(new_repo_name, create=True)

# Clone the original repository contents
github.clone_repository_contents(template_repo_name, work_dir)
# Clone the original repository contents from specific commit
tree = github.get_tree(template_repo_name, source_commit_sha, recursive=True)
github.download_repository_files(template_repo_name, tree, work_dir)

# Write EKS settings directly to config.json
output_file_path = os.path.join(work_dir, config_file_name)
Expand All @@ -657,6 +514,16 @@ def operate_github(new_repo_name, eks_settings):
# Commit all files to the new repository's main branch explicitly
commit_message = "Add the EKS configuration file by the Lambda function"
github.commit_repository_contents(new_repo_name, work_dir, commit_message, branch="main")

# Add relevant topics to the repository including EKS commit SHA
topics = [
"eks",
"kubernetes",
"infrastructure",
f"eks:{source_commit_sha[:7]}" # Use first 7 chars of SHA
]
github.update_repository_topics(new_repo_name, topics)

logger.info(f"Successfully updated {new_repo_name} repository")

def github_token():
Expand Down

0 comments on commit 2b9b098

Please sign in to comment.