Skip to content

Tool for listing the bucket lifecycle policies of S3 buckets. #86

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[roles]
ma8=None
ma10=None
[regions]
west=us-gov-west-1

Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import boto3
import argparse
import configparser
import io
import json
import pandas as pd
import os
import threading

from datetime import datetime, date

def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))

def main():
parser = argparse.ArgumentParser(description='Connect to AWS accounts')
parser.add_argument('--profile', help='AWS profile')
parser.add_argument('--config', help='Config file')
parser.add_argument('--me', help='Run using available credentials',
action='store_true')
parser.add_argument('--assume', help='Run using assumed roles',
action='store_true')
parser.add_argument('--region', help='A region to run against')
parser.add_argument('--read', help = 'Read API calls from cached results',
action='store_true')
parser.add_argument('--write', help = 'Write API calls to cached results',
action='store_true')
parser.add_argument('--run_token', help ='ID for cached results to read and/or write')

args = parser.parse_args()

run_token = None

if args.read:
read_from_cache = True
run_token = args.run_token
else:
read_from_cache = False

if args.write:
write_to_cache = True
run_token = args.run_token
else:
write_to_cache = False

if(args.config):
configParser = configparser.RawConfigParser(allow_no_value=True)
if args.config:
with open(args.config) as file:
configParser.read_file(file)

if args.profile:
profiles = [args.profile]
elif args.config:
profiles = [item[0] for item in configParser.items("roles")]
profile_roles = {item[0]:item[1] for item in configParser.items("roles")}
else:
profiles = ['dev']

if args.region:
regions = [args.region]
elif args.config:
regions = [item[1] for item in configParser.items("regions")]
else:
regions =['us-gov-west-1']

if args.me:
use_roles = False
elif args.assume:
use_roles = True
else:
print ('Inconsistent configuration. Exiting. Use one of "me" or "assume".')
exit()

session = {}

headers = ['account','region','bucket_name', 'owner_display_name','owner_id',
'id', 'prefix', 'status', 'expiration_date',
'expiration_days', 'noncurrent_days', 'final_filter']

master_list_of_lists = []

thread_list = []

for profile in profiles:
for region in regions:
profile_region = f'{profile}_{region}'
if not use_roles:
session[profile_region] = boto3.Session(profile_name = profile, region_name = region)
else:
role = profile_roles[profile]
stsClient = boto3.client('sts')
response = stsClient.assume_role(RoleArn=role, RoleSessionName='AWSConnect',ExternalId='ti-jbr-2010')
accessKeyId = response['Credentials']['AccessKeyId']
secretAccessKey = response['Credentials']['SecretAccessKey']
sessionToken = response['Credentials']['SessionToken']
session[profile_region] = boto3.Session(aws_access_key_id = accessKeyId,
aws_secret_access_key = secretAccessKey,
aws_session_token = sessionToken)
for profile in profiles:
account = profile.split('-')[0]
print(f'account [{account}]')
for region in regions:
profile_region = f'{profile}_{region}'
boto_client_s3 = session[profile_region].client('s3')
method_token = 'describe_instances'
thread = threading.Thread(name=profile_region+'_'+method_token, target = tabulate_buckets, args = (account, region, boto_client_s3, profile_region, master_list_of_lists, read_from_cache, write_to_cache, run_token))
thread.start()
thread_list.append(thread)
#tabulate_buckets(account, region, boto_client_s3, profile_region, method_token, master_list_of_lists, read_from_cache, write_to_cache, run_token)

for t in thread_list:
t.join()
df = data_frame(headers,master_list_of_lists)
print (df)
excel_name = ''.join(['./excel/bucket_lifecycle_',datetime.now().strftime('%Y%m%d%H%M%S'),'.xlsx'])
writer_keys = pd.ExcelWriter(excel_name, engine='xlsxwriter')
df.to_excel(writer_keys,sheet_name='bucket_lifecycle_policies')
writer_keys.save()

def tabulate_buckets(account, region, boto_client, profile_region, master_list_of_lists, read, write, user_token):
response = paginate_wrapper(client = boto_client, client_token = 's3_' + profile_region, method_token = 'list_buckets', read = read, write = write, run_token =user_token)
lifecycle_result_by_bucket_name = {}
print ('profile_region[{0}]'.format(profile_region))
for page in response:
for bucket in page['Buckets']:
bucket_name = bucket['Name']
bucket_owner_id = page['Owner']['ID']
bucket_owner_display_name = page['Owner']['DisplayName']
print(f'{bucket_name=}')
print(f'{bucket_owner_id=}')
print(f'{bucket_owner_display_name=}')
try:
lifecycle_result = paginate_wrapper(client = boto_client, client_token = 's3_' + profile_region,
method_token = 'get_bucket_lifecycle_configuration', read = read, write = write, run_token = user_token,
# parameter_dict = {'Bucket':bucket_name,'ExpectedBucketOwner':bucket_owner_id},parameter_token=bucket_name)
parameter_dict = {'Bucket':bucket_name},parameter_token=bucket_name)
print(f'Successfully got a lifecycle_result for [{bucket_name=}]')
except Exception as exc:
lifecycle_result = None
lifecycle_result_by_bucket_name[bucket_name] = lifecycle_result
list_of_lists = flatten(account, region, response,lifecycle_result_by_bucket_name)
master_list_of_lists.extend(list_of_lists)

def flatten(account, region, response,lifecycle_result_by_bucket_name):
list_of_lists = []
for page in response:
for bucket in page['Buckets']:
name = bucket['Name']
owner_display_name = page['Owner']['DisplayName']
owner_id = page['Owner']['ID']

lifecycle_result = lifecycle_result_by_bucket_name[name]
if lifecycle_result is not None:
for rule in lifecycle_result[0]['Rules']:
id = rule['ID']
print(f'The keys in the rule for bucket [{name}] and id[{id}] are [{rule.keys()}]')
prefix = rule.get('Prefix','')
status = rule['Status']
if 'Expiration' in rule:
expiration_date = rule['Expiration'].get('Date','')
expiration_days = rule['Expiration'].get('Days','')
else:
expiration_date = ''
expiration_days = ''

if 'NoncurrentVersionExpiration' in rule:
noncurrent_days = rule['NoncurrentVersionExpiration'].get('NoncurrentDays','')
newer_noncurrent_versions = rule['NoncurrentVersionExpiration'].get('NewerNoncurrentVersions','')
else:
noncurrent_days = ''
newer_noncurrent_versions = ''

if 'Filter' in rule:
print(f'The buckent named [{name}] has a filter.')
raw_filter = rule['Filter']
json_string_filter = json.dumps(raw_filter)
final_filter = json_string_filter.rstrip()
else:
final_filter = ''
list = [account, region, name, owner_display_name, owner_id, id, prefix, status, expiration_date, expiration_days, noncurrent_days, final_filter ]
list_of_lists.append(list)
else:
list = [account, region, name, owner_display_name, owner_id, '', '', '', '', '', '' ]
list_of_lists.append(list)
return list_of_lists

def data_frame(headers, list_of_lists):
df = pd.DataFrame(columns = headers, data=list_of_lists)
return df

def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))

def serialize(obj, token_list):
serialized = json.dumps(obj, default=json_serial)
filename = build_json_filename(token_list)
with open(filename, 'w') as file:
file.write(serialized)

def deserialize(token_list):
filename = build_json_filename(token_list)
with open(filename) as file:
serialized = file.read()
deserialized = json.loads(serialized)
return deserialized

def serialization_exists(token_list):
filename = build_json_filename(token_list)
return os.path.isfile(filename)

def paginate_wrapper(read=None, write=None, client=None, client_token=None, run_token=None, method_token='default', parameter_dict = {},parameter_token=''):
# print(f'paginate_wrapper: processing client_token[{client_token}] method_token [{method_token}] run_token[{run_token}]')
# if neither read nor write is specified, then look for a serialization. If it doesn't exist, then write
normalized_parameter_token = parameter_token.translate({ord(':'):'-'})
if read and write:
if serialization_exists([client_token, method_token, run_token,normalized_parameter_token]):
read_action = True
write_action = False
else:
write_action = True
read_action = False
else:
read_action = read
write_action = write
if read_action:
result = deserialize([client_token, method_token,run_token,normalized_parameter_token])
else:
if client.can_paginate(method_token):
paginator = client.get_paginator(method_token)
# print(f'paginate_wrapper: method_token[{method_token}] keys in parameter_dict [{parameter_dict.keys()}]')
page_iterator = paginator.paginate(**parameter_dict)
result = [page for page in page_iterator]
else:
func = getattr(client,method_token)
# print(f'paginate_wrapper: method_token [{method_token}] keys in parameter_dict [{parameter_dict.keys()}]')
# parameter_string = ', '.join([f'parameter[{parameter}] value [{parameter_dict[parameter]}]' for parameter in parameter_dict])
# print(f'paginate_wrapper: parameter_string <{parameter_string}>')
result = [func(**parameter_dict)]
# print(f'paginate_wrapper: result [{result}]')

if write_action:
serialize(result, [client_token, method_token, run_token,normalized_parameter_token])
return result

def build_json_filename(string_list):
joined_list = '-'.join(string_list)
return f'c:/cache/json/{joined_list}.json'

if __name__ == "__main__":
main()