From d4e2fdc950b525ac9dec0035ed2517208aac0496 Mon Sep 17 00:00:00 2001 From: Dave Arnold Date: Wed, 23 Apr 2025 11:44:38 -0700 Subject: [PATCH] Update .gitignore to include additional Python, IDE, and project-specific files; refactor app.py for improved error handling and logging; clear requirements.txt for a fresh start. --- .gitignore | 177 ++++++++++++- eks_automation/app.py | 571 +++++++++++++++++++++--------------------- requirements.txt | 3 + 3 files changed, 470 insertions(+), 281 deletions(-) create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore index 8af78e5..e4b6362 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,179 @@ htmlcov/ .coverage .coverage.* coverage.xml -*.cover \ No newline at end of file +*.cover + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc \ No newline at end of file diff --git a/eks_automation/app.py b/eks_automation/app.py index ce1e876..9f49bd6 100644 --- a/eks_automation/app.py +++ b/eks_automation/app.py @@ -116,531 +116,544 @@ def get_repository(self, repo_name, create=False): self.get_reference_sha(repo_name, "heads/main") return repo except Exception: - # If reference doesn't exist yet, wait and retryexist yet, wait and retry - time.sleep(retry_delay)ep(retry_delay) + # If reference doesn't exist yet, wait and retry + time.sleep(retry_delay) continue # If we got here, initialization failed - raise Exception(f"Repository {repo_name} initialization timed out")aise Exception(f"Repository {repo_name} initialization timed out") + raise Exception(f"Repository {repo_name} initialization timed out") else: - error_message = f"Failed to create repository: {create_response.status_code} - {create_response.text}" create repository: {create_response.status_code} - {create_response.text}" + error_message = f"Failed to create repository: {create_response.status_code} - {create_response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) else: - error_message = f"Repository {repo_name} not found and create=False"y {repo_name} not found and create=False" + error_message = f"Repository {repo_name} not found and create=False" logger.error(error_message) raise Exception(error_message) except requests.exceptions.RequestException as e: - error_message = f"Error accessing GitHub API: {str(e)}"essing GitHub API: {str(e)}" + error_message = f"Error accessing GitHub API: {str(e)}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) def get_default_branch(self, repo_name): - """Get the default branch of a repository """Get the default branch of a repository + """Get the default branch of a repository Args: - repo_name (str): Name of the repository repo_name (str): Name of the repository - + repo_name (str): Name of the repository + Returns: - str: Default branch name str: Default branch name + str: Default branch name """ repo_api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}" - response = requests.get(repo_api_url, headers=self.headers, verify=False)response = requests.get(repo_api_url, headers=self.headers, verify=False) + response = requests.get(repo_api_url, headers=self.headers, verify=False) if response.status_code == 200: repo_info = response.json() - return repo_info["default_branch"]eturn repo_info["default_branch"] + return repo_info["default_branch"] else: - error_message = f"Failed to get default branch for {repo_name}: {response.status_code} - {response.text}" get default branch for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to get default branch for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) - + raise Exception(error_message) + def get_reference_sha(self, repo_name, ref): - """Get the SHA for a reference (branch, tag, etc) """Get the SHA for a reference (branch, tag, etc) + """Get the SHA for a reference (branch, tag, etc) Args: repo_name (str): Name of the repository - ref (str): Reference name (e.g., 'heads/main') ref (str): Reference name (e.g., 'heads/main') + ref (str): Reference name (e.g., 'heads/main') Returns: - str: SHA of the reference str: SHA of the reference + str: SHA of the reference """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs/{ref}"t/refs/{ref}" - response = requests.get(api_url, headers=self.headers, verify=False)response = requests.get(api_url, headers=self.headers, verify=False) + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs/{ref}" + response = requests.get(api_url, headers=self.headers, verify=False) - if response.status_code == 200:: + if response.status_code == 200: ref_info = response.json() - return ref_info["object"]["sha"]eturn ref_info["object"]["sha"] + return ref_info["object"]["sha"] else: - error_message = f"Failed to get reference {ref} for {repo_name}: {response.status_code} - {response.text}" get reference {ref} for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to get reference {ref} for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) - - def get_commit(self, repo_name, commit_sha):name, commit_sha): - """Get a commit by SHA """Get a commit by SHA + raise Exception(error_message) + + def get_commit(self, repo_name, commit_sha): + """Get a commit by SHA Args: - repo_name (str): Name of the repository repository - commit_sha (str): Commit SHA commit_sha (str): Commit SHA + repo_name (str): Name of the repository + commit_sha (str): Commit SHA Returns: - dict: Commit information dict: Commit information + dict: Commit information """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/commits/{commit_sha}"t/commits/{commit_sha}" - response = requests.get(api_url, headers=self.headers, verify=False)response = requests.get(api_url, headers=self.headers, verify=False) + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/commits/{commit_sha}" + response = requests.get(api_url, headers=self.headers, verify=False) - if response.status_code == 200: 200: - return response.json()urn response.json() + if response.status_code == 200: + return response.json() else: - error_message = f"Failed to get commit {commit_sha} for {repo_name}: {response.status_code} - {response.text}" get commit {commit_sha} for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to get commit {commit_sha} for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) - def get_tree(self, repo_name, tree_sha, recursive=False):name, tree_sha, recursive=False): - """Get a tree by SHA """Get a tree by SHA + def get_tree(self, repo_name, tree_sha, recursive=False): + """Get a tree by SHA Args: - repo_name (str): Name of the repository the repository + repo_name (str): Name of the repository tree_sha (str): Tree SHA - recursive (bool): Whether to get the tree recursively recursive (bool): Whether to get the tree recursively + recursive (bool): Whether to get the tree recursively Returns: - dict: Tree information dict: Tree information + dict: Tree information """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/trees/{tree_sha}"self.api_base_url}/repos/{self.org_name}/{repo_name}/git/trees/{tree_sha}" + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/trees/{tree_sha}" if recursive: - api_url += "?recursive=1"api_url += "?recursive=1" + api_url += "?recursive=1" - response = requests.get(api_url, headers=self.headers, verify=False)response = requests.get(api_url, headers=self.headers, verify=False) + response = requests.get(api_url, headers=self.headers, verify=False) - if response.status_code == 200: 200: - return response.json()urn response.json() + if response.status_code == 200: + return response.json() else: - error_message = f"Failed to get tree {tree_sha} for {repo_name}: {response.status_code} - {response.text}" get tree {tree_sha} for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to get tree {tree_sha} for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) def download_repository_files(self, repo_name, tree, target_dir): - """Download all files from a repository tree to a local directory """Download all files from a repository tree to a local directory + """Download all files from a repository tree to a local directory Args: repo_name (str): Name of the repository tree (dict): Tree information from get_tree() - target_dir (str): Directory to download files to target_dir (str): Directory to download files to + target_dir (str): Directory to download files to """ - # Ensure target directory exists even if there are no filesif there are no files - os.makedirs(target_dir, exist_ok=True)os.makedirs(target_dir, exist_ok=True) + # Ensure target directory exists even if there are no files + os.makedirs(target_dir, exist_ok=True) - for item in tree.get("tree", []):]): + for item in tree.get("tree", []): if item["type"] == "blob": # Get the blob contents - blob_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/blobs/{item['sha']}"bs/{item['sha']}" - blob_response = requests.get(blob_url, headers=self.headers, verify=False)blob_response = requests.get(blob_url, headers=self.headers, verify=False) + blob_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/blobs/{item['sha']}" + blob_response = requests.get(blob_url, headers=self.headers, verify=False) if blob_response.status_code == 200: - blob_data = blob_response.json()ob_response.json() - content = Nonecontent = None + blob_data = blob_response.json() + content = None # Ensure the target directory exists - file_path = os.path.join(target_dir, item["path"])item["path"]) - dir_path = os.path.dirname(file_path)) - os.makedirs(dir_path, exist_ok=True)os.makedirs(dir_path, exist_ok=True) + file_path = os.path.join(target_dir, item["path"]) + dir_path = os.path.dirname(file_path) + os.makedirs(dir_path, exist_ok=True) - # GitHub API returns base64 encoded contentnt + # GitHub API returns base64 encoded content if blob_data.get("encoding") == "base64": - content = base64.b64decode(blob_data.get("content", ""))ontent = base64.b64decode(blob_data.get("content", "")) + content = base64.b64decode(blob_data.get("content", "")) else: # Handle non-base64 content if needed - logger.warning(f"Unexpected encoding for blob {item['sha']}: {blob_data.get('encoding')}")logger.warning(f"Unexpected encoding for blob {item['sha']}: {blob_data.get('encoding')}") + logger.warning(f"Unexpected encoding for blob {item['sha']}: {blob_data.get('encoding')}") if content is not None: - logger.info(f"Writing file to {file_path}")ile_path}") - with open(file_path, "wb") as f: "wb") as f: - f.write(content) f.write(content) + logger.info(f"Writing file to {file_path}") + with open(file_path, "wb") as f: + f.write(content) - def create_blob(self, repo_name, content):nt): - """Create a blob in the repository """Create a blob in the repository + def create_blob(self, repo_name, content): + """Create a blob in the repository Args: - repo_name (str): Name of the repositoryory - content (bytes): Content of the blob content (bytes): Content of the blob + repo_name (str): Name of the repository + content (bytes): Content of the blob Returns: - str: SHA of the created blob str: SHA of the created blob + str: SHA of the created blob """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/blobs"api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/blobs" + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/blobs" # Base64 encode the content - content_b64 = base64.b64encode(content).decode('utf-8')content_b64 = base64.b64encode(content).decode('utf-8') + content_b64 = base64.b64encode(content).decode('utf-8') data = { - "content": content_b64,64, - "encoding": "base64" "encoding": "base64" - }} + "content": content_b64, + "encoding": "base64" + } - response = requests.post(api_url, headers=self.headers, json=data, verify=False)response = requests.post(api_url, headers=self.headers, json=data, verify=False) + response = requests.post(api_url, headers=self.headers, json=data, verify=False) - if response.status_code in (201, 200):200): - return response.json()["sha"]urn response.json()["sha"] + if response.status_code in (201, 200): + return response.json()["sha"] else: - error_message = f"Failed to create blob for {repo_name}: {response.status_code} - {response.text}" create blob for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to create blob for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) - def create_tree(self, repo_name, tree_items, base_tree_sha=None):items, base_tree_sha=None): - """Create a tree in the repository """Create a tree in the repository + def create_tree(self, repo_name, tree_items, base_tree_sha=None): + """Create a tree in the repository Args: repo_name (str): Name of the repository - tree_items (list): List of tree items (path, mode, type, sha)ype, sha) - base_tree_sha (str): SHA of the base tree (optional) base_tree_sha (str): SHA of the base tree (optional) + tree_items (list): List of tree items (path, mode, type, sha) + base_tree_sha (str): SHA of the base tree (optional) Returns: - str: SHA of the created tree str: SHA of the created tree + str: SHA of the created tree """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/trees"api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/trees" + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/trees" data = { - "tree": tree_items "tree": tree_items - }} + "tree": tree_items + } if base_tree_sha: - data["base_tree"] = base_tree_sha data["base_tree"] = base_tree_sha + data["base_tree"] = base_tree_sha - response = requests.post(api_url, headers=self.headers, json=data, verify=False)response = requests.post(api_url, headers=self.headers, json=data, verify=False) + response = requests.post(api_url, headers=self.headers, json=data, verify=False) - if response.status_code in (201, 200):200): - return response.json()["sha"]urn response.json()["sha"] + if response.status_code in (201, 200): + return response.json()["sha"] else: - error_message = f"Failed to create tree for {repo_name}: {response.status_code} - {response.text}" create tree for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to create tree for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) - def create_commit(self, repo_name, message, tree_sha, parent_shas):ge, tree_sha, parent_shas): - """Create a commit in the repository """Create a commit in the repository + def create_commit(self, repo_name, message, tree_sha, parent_shas): + """Create a commit in the repository Args: - repo_name (str): Name of the repositoryrepository + repo_name (str): Name of the repository message (str): Commit message tree_sha (str): SHA of the tree - parent_shas (list): List of parent commit SHAs parent_shas (list): List of parent commit SHAs + parent_shas (list): List of parent commit SHAs Returns: - str: SHA of the created commit str: SHA of the created commit + str: SHA of the created commit """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/commits"api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/commits" + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/commits" # Add committer/author information - current_time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + current_time = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") author_info = { "name": self.commit_author_name, - "email": self.commit_author_email,_author_email, - "date": current_time "date": current_time - }} + "email": self.commit_author_email, + "date": current_time + } data = { - "message": message,e, + "message": message, "tree": tree_sha, - "parents": parent_shas,, + "parents": parent_shas, "author": author_info, - "committer": author_info "committer": author_info - }} + "committer": author_info + } - response = requests.post(api_url, headers=self.headers, json=data, verify=False)response = requests.post(api_url, headers=self.headers, json=data, verify=False) + response = requests.post(api_url, headers=self.headers, json=data, verify=False) - if response.status_code in (201, 200):200): - return response.json()["sha"]urn response.json()["sha"] + if response.status_code in (201, 200): + return response.json()["sha"] else: - error_message = f"Failed to create commit for {repo_name}: {response.status_code} - {response.text}" create commit for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to create commit for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) - def update_reference(self, repo_name, ref, sha):sha): - """Update a reference in the repository """Update a reference in the repository + def update_reference(self, repo_name, ref, sha): + """Update a reference in the repository Args: repo_name (str): Name of the repository - ref (str): Reference name (e.g., 'heads/main')ain') - sha (str): SHA to update the reference to sha (str): SHA to update the reference to + ref (str): Reference name (e.g., 'heads/main') + sha (str): SHA to update the reference to """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs/{ref}"api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs/{ref}" + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs/{ref}" data = { "sha": sha, - "force": True "force": True - }} + "force": True + } - response = requests.patch(api_url, headers=self.headers, json=data, verify=False)response = requests.patch(api_url, headers=self.headers, json=data, verify=False) + response = requests.patch(api_url, headers=self.headers, json=data, verify=False) if response.status_code not in (200, 201): - error_message = f"Failed to update reference {ref} for {repo_name}: {response.status_code} - {response.text}" update reference {ref} for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to update reference {ref} for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) - def create_reference(self, repo_name, ref, sha):sha): - """Create a reference in the repository """Create a reference in the repository + def create_reference(self, repo_name, ref, sha): + """Create a reference in the repository Args: - repo_name (str): Full reference name (e.g., 'refs/heads/main')., 'refs/heads/main') - sha (str): SHA to create the reference at sha (str): SHA to create the reference at + repo_name (str): Name of the repository + ref (str): Full reference name (e.g., 'refs/heads/main') + sha (str): SHA to create the reference at """ - api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs"api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs" + api_url = f"{self.api_base_url}/repos/{self.org_name}/{repo_name}/git/refs" data = { - "ref": ref,, - "sha": sha "sha": sha - }} + "ref": ref, + "sha": sha + } - response = requests.post(api_url, headers=self.headers, json=data, verify=False)response = requests.post(api_url, headers=self.headers, json=data, verify=False) + response = requests.post(api_url, headers=self.headers, json=data, verify=False) if response.status_code not in (201, 200): - error_message = f"Failed to create reference {ref} for {repo_name}: {response.status_code} - {response.text}" create reference {ref} for {repo_name}: {response.status_code} - {response.text}" + error_message = f"Failed to create reference {ref} for {repo_name}: {response.status_code} - {response.text}" logger.error(error_message) - raise Exception(error_message)raise Exception(error_message) + raise Exception(error_message) def clone_repository_contents(self, source_repo, target_dir, branch=None): - """Clone a repository's contents to a local directory using GitHub API"""Clone a repository's contents to a local directory using GitHub API - + """Clone a repository's contents to a local directory using GitHub API + Args: source_repo (str): Name of the source repository target_dir (str): Target directory to download files to - branch (str, optional): Branch to clone from. If None, uses default branch.branch (str, optional): Branch to clone from. If None, uses default branch. - + branch (str, optional): Branch to clone from. If None, uses default branch. + Returns: - str: The branch name that was cloned str: The branch name that was cloned + str: The branch name that was cloned """ - # Create the target directory if it doesn't existesn't exist - os.makedirs(target_dir, exist_ok=True)os.makedirs(target_dir, exist_ok=True) + # Create the target directory if it doesn't exist + os.makedirs(target_dir, exist_ok=True) try: if branch: target_branch = branch # Try to get the branch's reference directly - tree_sha = self.get_reference_sha(source_repo, f"heads/{target_branch}")ree_sha = self.get_reference_sha(source_repo, f"heads/{target_branch}") + tree_sha = self.get_reference_sha(source_repo, f"heads/{target_branch}") else: # If no branch specified, use default branch target_branch = self.get_default_branch(source_repo) - tree_sha = self.get_reference_sha(source_repo, f"heads/{target_branch}")f.get_reference_sha(source_repo, f"heads/{target_branch}") + tree_sha = self.get_reference_sha(source_repo, f"heads/{target_branch}") except Exception as e: - logger.warning(f"Failed to get reference for {branch or 'default branch'}: {str(e)}")eference for {branch or 'default branch'}: {str(e)}") + logger.warning(f"Failed to get reference for {branch or 'default branch'}: {str(e)}") target_branch = branch or "main" - # If we can't get the reference, the branch might not exist yethe reference, the branch might not exist yet + # If we can't get the reference, the branch might not exist yet tree = {"tree": []} - self.download_repository_files(source_repo, tree, target_dir)tory_files(source_repo, tree, target_dir) - return target_branch return target_branch + self.download_repository_files(source_repo, tree, target_dir) + return target_branch # Get the full tree for the branch - logger.info(f"Getting file tree from {source_repo} for branch {target_branch}")ch {target_branch}") - tree = self.get_tree(source_repo, tree_sha, recursive=True) tree = self.get_tree(source_repo, tree_sha, recursive=True) + logger.info(f"Getting file tree from {source_repo} for branch {target_branch}") + tree = self.get_tree(source_repo, tree_sha, recursive=True) # Download all files - logger.info(f"Downloading all files from {source_repo} using ref: heads/{target_branch}")ref: heads/{target_branch}") - self.download_repository_files(source_repo, tree, target_dir) self.download_repository_files(source_repo, tree, target_dir) + logger.info(f"Downloading all files from {source_repo} using ref: heads/{target_branch}") + self.download_repository_files(source_repo, tree, target_dir) - return target_branch return target_branch + return target_branch - def commit_repository_contents(self, repo_name, work_dir, commit_message, branch=None):, commit_message, branch=None): - """Commit all files from a directory to a repository"""Commit all files from a directory to a repository - + def commit_repository_contents(self, repo_name, work_dir, commit_message, branch=None): + """Commit all files from a directory to a repository + Args: repo_name (str): Name of the repository - work_dir (str): Directory containing the files to commit the files to commit + work_dir (str): Directory containing the files to commit commit_message (str): Commit message - branch (str, optional): Branch to commit to. If None, uses default branch.branch (str, optional): Branch to commit to. If None, uses default branch. - + branch (str, optional): Branch to commit to. If None, uses default branch. + Returns: - str: The branch name that was committed to str: The branch name that was committed to + str: The branch name that was committed to """ - # First, get the current state of the target repositoryrst, get the current state of the target repository + # First, get the current state of the target repository try: - target_branch = branch or self.get_default_branch(repo_name) = branch or self.get_default_branch(repo_name) + target_branch = branch or self.get_default_branch(repo_name) except Exception: - # If we can't get the default branch, it might be a new repoanch, it might be a new repo - target_branch = branch or "main" target_branch = branch or "main" + # If we can't get the default branch, it might be a new repo + target_branch = branch or "main" - # Upload all files to the repositoryles to the repository - tree_items = []tree_items = [] + # Upload all files to the repository + tree_items = [] - # Add all files from the work directory to the repositoryto the repository - for root, _, files in os.walk(work_dir):os.walk(work_dir): + # Add all files from the work directory to the repository + for root, _, files in os.walk(work_dir): for file in files: file_path = os.path.join(root, file) - repo_path = os.path.relpath(file_path, work_dir)repo_path = os.path.relpath(file_path, work_dir) + repo_path = os.path.relpath(file_path, work_dir) # Skip .git directory if it exists - if ".git" in repo_path.split(os.path.sep): repo_path.split(os.path.sep): - continuecontinue + if ".git" in repo_path.split(os.path.sep): + continue # Read file content - with open(file_path, "rb") as f:as f: - file_content = f.read()file_content = f.read() + with open(file_path, "rb") as f: + file_content = f.read() # Create blob for the file - blob_sha = self.create_blob(repo_name, file_content)blob_sha = self.create_blob(repo_name, file_content) + blob_sha = self.create_blob(repo_name, file_content) # Add to tree items tree_items.append({ "path": repo_path, - "mode": "100644", # Regular file", # Regular file + "mode": "100644", # Regular file "type": "blob", - "sha": blob_sha "sha": blob_sha - }) }) - - # Try to get the latest commit SHA from the base branch - base_branch = "main" # Always use main as base when creating new branches_branch = "main" # Always use main as base when creating new branches - try: - base_commit_sha = self.get_reference_sha(repo_name, f"heads/{base_branch}")ds/{base_branch}") - base_commit = self.get_commit(repo_name, base_commit_sha)ase_commit_sha) - base_tree_sha = base_commit["tree"]["sha"] = base_commit["tree"]["sha"] - except Exception: - # If we can't get the reference, assume it's a new repo with no commitse reference, assume it's a new repo with no commits - base_tree_sha = None base_tree_sha = None - + "sha": blob_sha + }) + + # Try to get the latest commit SHA from the base branch + base_branch = "main" # Always use main as base when creating new branches + try: + base_commit_sha = self.get_reference_sha(repo_name, f"heads/{base_branch}") + base_commit = self.get_commit(repo_name, base_commit_sha) + base_tree_sha = base_commit["tree"]["sha"] + except Exception: + # If we can't get the reference, assume it's a new repo with no commits + base_tree_sha = None + # Create a new tree with all the files - new_tree_sha = self.create_tree(repo_name, tree_items, base_tree_sha)new_tree_sha = self.create_tree(repo_name, tree_items, base_tree_sha) + new_tree_sha = self.create_tree(repo_name, tree_items, base_tree_sha) - # Create a commit with the new tree with the new tree + # Create a commit with the new tree if base_tree_sha: - # If we have a base tree, include the parent commite parent commit - new_commit_sha = self.create_commit(= self.create_commit( + # If we have a base tree, include the parent commit + new_commit_sha = self.create_commit( repo_name, - commit_message, , - new_tree_sha, - [base_commit_sha] [base_commit_sha] + commit_message, + new_tree_sha, + [base_commit_sha] ) else: - # If it's a new repo, create the first commitst commit - new_commit_sha = self.create_commit(= self.create_commit( - repo_name, - commit_message, , - new_tree_sha, w_tree_sha, - [] [] - ) ) + # If it's a new repo, create the first commit + new_commit_sha = self.create_commit( + repo_name, + commit_message, + new_tree_sha, + [] + ) - # Update or create the reference to point to the new commitdate or create the reference to point to the new commit + # Update or create the reference to point to the new commit try: - # Try to update existing branchng branch - self.update_reference(erence( - repo_name, - f"heads/{target_branch}", t_branch}", - new_commit_sha new_commit_sha + # Try to update existing branch + self.update_reference( + repo_name, + f"heads/{target_branch}", + new_commit_sha ) except Exception: - # If the branch doesn't exist, create it the branch doesn't exist, create it + # If the branch doesn't exist, create it try: - self.create_reference(erence( - repo_name, - f"refs/heads/{target_branch}", target_branch}", - new_commit_sha new_commit_sha + self.create_reference( + repo_name, + f"refs/heads/{target_branch}", + new_commit_sha ) except Exception as e: # If we still can't create the branch, something is wrong - error_message = f"Failed to create or update branch {target_branch} for {repo_name}: {str(e)}" create or update branch {target_branch} for {repo_name}: {str(e)}" + error_message = f"Failed to create or update branch {target_branch} for {repo_name}: {str(e)}" logger.error(error_message) - raise Exception(error_message) raise Exception(error_message) + raise Exception(error_message) - return target_branch return target_branch + return target_branch # pylint: disable=unused-argument -def lambda_handler(event, context):bda_handler(event, context): - """ - Main Lambda handler function Main Lambda handler function +def lambda_handler(event, context): + """Main Lambda handler function Args: - event (dict): Dict containing the Lambda function event data.a function event data. - context (dict): Lambda runtime context. context (dict): Lambda runtime context. + event (dict): Dict containing the Lambda function event data + context (dict): Lambda runtime context Returns: - dict: Dict containing status message. dict: Dict containing status message. + dict: Dict containing status message """ - input_data = event["body"] logger.info(f"Lambda function invoked with RequestId: {context.aws_request_id}") -nds: {context.get_remaining_time_in_millis()}") - project_name = input_data["project_name"](event, indent=2)}") + logger.info(f"Lambda function invoked with RequestId: {context.aws_request_id}") + logger.info(f"Remaining time in milliseconds: {context.get_remaining_time_in_millis()}") + logger.info(f"Received event: {json.dumps(event, indent=2)}") + + input_data = event["body"] + logger.info(f"Extracted input data from event body: {json.dumps(input_data, indent=2)}") + + project_name = input_data["project_name"] eks_settings = input_data["eks_settings"] - if not project_name: event["body"] - return {put data from event body: {json.dumps(input_data, indent=2)}") + logger.info(f"Project name: {project_name}") + logger.info(f"EKS settings to be applied: {json.dumps(eks_settings, indent=2)}") + + if not project_name: + logger.error("Missing project name in input") + return { "statusCode": 400, - "body": json.dumps({"error": "Missing project name"}),ct_name = input_data["project_name"] - } eks_settings = input_data["eks_settings"] -er.info(f"Project name: {project_name}") - try:n.dumps(eks_settings, indent=2)}") + "body": json.dumps({"error": "Missing project name"}) + } + + try: + logger.info(f"Starting GitHub operations for project: {project_name}") operate_github(project_name, eks_settings) + logger.info("GitHub operations completed successfully") except Exception as e: # pylint: disable=broad-exception-caught logger.error(f"Error in operate_github: {str(e)}") - logger.error(traceback.format_exc()) # Log the stack trace return { - return {"statusCode": 400, "body": json.dumps({"error": str(e)})}"statusCode": 400, -umps({"error": "Missing project name"}), + logger.error(f"Stack trace: {traceback.format_exc()}") + return {"statusCode": 400, "body": json.dumps({"error": str(e)})} + + logger.info("Lambda execution completed successfully") return { "statusCode": 200, - "headers": {"Access-Control-Allow-Origin": "*"},ry: - "body": json.dumps({"result": "Success"}), logger.info(f"Starting GitHub operations for project: {project_name}") - } operate_github(project_name, eks_settings) -Hub operations for project: {project_name}") + "headers": {"Access-Control-Allow-Origin": "*"}, + "body": json.dumps({"result": "Success"}) + } -def operate_github(new_repo_name, eks_settings): logger.error(f"Error in operate_github: {str(e)}") +def operate_github(new_repo_name, eks_settings): """Write EKS settings to config.json and create/update repository using GitHub API - 400, "body": json.dumps({"error": str(e)})} + This implementation uses only the requests library and does not rely on git CLI - or any external binaries.r.info("Lambda execution completed successfully") + or any external binaries. Args: - new_repo_name (str): Name of the new GitHub repo. "headers": {"Access-Control-Allow-Origin": "*"}, - eks_settings (json): Input JSON data with all the EKS parameter values.y": json.dumps({"result": "Success"}), + new_repo_name (str): Name of the new GitHub repo + eks_settings (json): Input JSON data with all the EKS parameter values Returns: None - """o_name, eks_settings): - # Get GitHub access token and environment variablesub API + """ + logger.info("Starting GitHub repository operation") + logger.info(f"Target repository name: {new_repo_name}") + token = github_token() + logger.info("Successfully retrieved GitHub token from Secrets Manager") + github_api = os.environ.get("GITHUB_API") # No default - must be configured org_name = os.environ.get("GITHUB_ORG_NAME") # No default - must be configured commit_author_email = os.environ.get("GITHUB_COMMIT_AUTHOR_EMAIL", "eks-automation@example.com") commit_author_name = os.environ.get("GITHUB_COMMIT_AUTHOR_NAME", "EKS Automation Lambda") - source_version = os.environ.get("TEMPLATE_SOURCE_VERSION") # Optional the new GitHub repo. - template_repo_name = os.environ.get("TEMPLATE_REPO_NAME", "template-eks-cluster") eks_settings (json): Input JSON data with all the EKS parameter values. + source_version = os.environ.get("TEMPLATE_SOURCE_VERSION") # Optional + template_repo_name = os.environ.get("TEMPLATE_REPO_NAME", "template-eks-cluster") config_file_name = "config.json" # Create work directory if it doesn't exist work_dir = f"/tmp/{new_repo_name}" - if os.path.exists(work_dir):ory operation") - shutil.rmtree(work_dir, ignore_errors=False, onerror=remove_readonly)logger.info(f"Target repository name: {new_repo_name}") + if os.path.exists(work_dir): + shutil.rmtree(work_dir, ignore_errors=False, onerror=remove_readonly) os.makedirs(work_dir, exist_ok=True) - ken and environment variables - # Initialize GitHub client with all required parametersrieving GitHub access token and environment variables") - github = GitHubClient(hub_token() - github_api, Successfully retrieved GitHub token") - token, - org_name,get("GITHUB_API") - commit_author_name,on.get("GITHUB_ORG_NAME") - commit_author_email,s.environ.get("GITHUB_COMMIT_AUTHOR_EMAIL", "eks-automation@example.com") - source_version, os.environ.get("GITHUB_COMMIT_AUTHOR_NAME", "EKS Automation Lambda") - template_repo_name,ource_version = os.environ.get("TEMPLATE_SOURCE_VERSION") - config_file_nametemplate_repo_name = os.environ.get("TEMPLATE_REPO_NAME", "template-eks-cluster") - )n" - - # Get info about original repo") - logger.info(f"Fetching original repository information: {template_repo_name}")logger.info(f"Target organization: {org_name}") - orig_repo = github.get_repository(template_repo_name)it_author_name} <{commit_author_email}>") - - # Get or create the new repository) + + # Initialize GitHub client with all required parameters + github = GitHubClient( + github_api, + token, + org_name, + commit_author_name, + commit_author_email, + source_version, + template_repo_name, + config_file_name + ) + + # Get info about original repo + logger.info(f"Fetching original repository information: {template_repo_name}") + orig_repo = github.get_repository(template_repo_name) + + # Get or create the new repository logger.info(f"Getting or creating repository: {new_repo_name}") new_repo = github.get_repository(new_repo_name, create=True) - # Clone the original repository contentslogger.info(f"Setting up work directory: {work_dir}") + # Clone the original repository contents github.clone_repository_contents(template_repo_name, work_dir) - ir}") - # Write EKS settings directly to config.jsonemove_readonly) + + # Write EKS settings directly to config.json output_file_path = os.path.join(work_dir, config_file_name) - logger.info(f"Writing EKS settings to {output_file_path}")_dir}") + logger.info(f"Writing EKS settings to {output_file_path}") with open(output_file_path, "w") as file: json.dump(eks_settings, file, indent=2) # Commit all files to the new repository's main branch explicitly - commit_message = "Add the EKS configuration file by the Lambda function" token, + commit_message = "Add the EKS configuration file by the Lambda function" github.commit_repository_contents(new_repo_name, work_dir, commit_message, branch="main") - commit_author_name, - logger.info(f"Successfully updated {new_repo_name} repository") commit_author_email, -ion, + logger.info(f"Successfully updated {new_repo_name} repository") def github_token(): """Retrieve GitHub access token from AWS Secrets Manager @@ -656,10 +669,8 @@ def github_token(): logger.error(f"Error occurred when retrieving GitHub token from Secrets Manager: {str(e)}") raise - def remove_readonly(func, path, _): - """ - Clear the readonly bit and reattempt the removal. + """Clear the readonly bit and reattempt the removal. This function is used by `shutil.rmtree` function. """ os.chmod(path, stat.S_IWRITE) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0a7dd9f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +boto3>=1.34.0 +botocore>=1.34.0 +requests>=2.31.0