Add keep_by_hours option

To be more adaptive to user needs, keep_by_hours option works as the
inverse of the "delete_by_hours" option. "keep_by_hours" will keep
any tag newer than the specified number of hours.
This commit is contained in:
Eric Ball
2018-06-05 19:21:07 -07:00
parent 42848d911d
commit c6fe2d592e

View File

@@ -58,7 +58,7 @@ class Requests:
print('[debug][registry][request]: {0} {1}'.format(method, url))
if 'Authorization' in kwargs['headers']:
print('[debug][registry][request]: Authorization header:')
token_parsed = kwargs['headers']['Authorization'].split('.')
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0])))
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1])))
@@ -67,12 +67,12 @@ class Requests:
if str(res.status_code)[0] == '2':
if DEBUG: print("[debug][registry] accepted")
return (res, kwargs['headers']['Authorization'])
if res.status_code == 401:
if DEBUG: print("[debug][registry] Access denied. Refreshing token...")
oauth = www_authenticate.parse(res.headers['Www-Authenticate'])
if DEBUG:
if DEBUG:
print('[debug][auth][answer] Auth header:')
pprint.pprint(oauth['bearer'])
@@ -81,11 +81,11 @@ class Requests:
oauth['bearer']['service'],
oauth['bearer']['scope'])
if DEBUG:
if DEBUG:
print('[debug][auth][request] Refreshing auth token: POST {0}'.format(request_url))
try_oauth = requests.post(request_url, auth=auth, **kwargs)
try:
token = ast.literal_eval(try_oauth._content)['token']
except SyntaxError:
@@ -101,7 +101,7 @@ class Requests:
kwargs['headers']['Authorization'] = 'Bearer {0}'.format(token)
else:
return (res, kwargs['headers']['Authorization'])
res = requests.request(method, url, **kwargs)
return (res, kwargs['headers']['Authorization'])
@@ -151,11 +151,11 @@ def get_auth_schemes(r,path):
- www-authenticate: basic
- www-authenticate: bearer
"""
if DEBUG: print("[debug][funcname]: get_auth_schemes()")
try_oauth = requests.head('{0}{1}'.format(r.hostname,path), verify=not r.no_validate_ssl)
if 'Www-Authenticate' in try_oauth.headers:
oauth = www_authenticate.parse(try_oauth.headers['Www-Authenticate'])
if DEBUG:
@@ -501,7 +501,14 @@ for more detail on garbage collection read here:
parser.add_argument(
'--delete-by-hours',
help=('Will delete all tags that older than specified hours. Be careful!'),
help=('Will delete all tags that are older than specified hours. Be careful!'),
default=False,
nargs='?',
metavar='Hours')
parser.add_argument(
'--keep-by-hours',
help=('Will keep all tags that are newer than specified hours.'),
default=False,
nargs='?',
metavar='Hours')
@@ -605,11 +612,35 @@ def delete_tags_by_age(registry, image_name, dry_run, hours, tags_to_keep):
delete_tags(registry, image_name, dry_run, tags_to_delete, tags_to_keep)
def get_newer_tags(registry, image_name, hours, tags_list):
newer_tags = []
print('---------------------------------')
for tag in tags_list:
image_config = registry.get_tag_config(image_name, tag)
if image_config == []:
print("tag not found")
continue
image_age = registry.get_image_age(image_name, image_config)
if image_age == []:
print("timestamp not found")
continue
if dt.strptime(image_age[:-4], "%Y-%m-%dT%H:%M:%S.%f") >= dt.now() - timedelta(hours=int(hours)):
print("Keeping tag: {0} timestamp: {1}".format(
tag, image_age))
newer_tags.append(tag)
return newer_tags
def main_loop(args):
global DEBUG
DEBUG = True if args.debug else False
keep_last_versions = int(args.num)
if args.no_validate_ssl:
@@ -644,9 +675,9 @@ def main_loop(args):
registry = Registry.create(args.host, args.login, args.no_validate_ssl,
args.digest_method)
registry.auth_schemes = get_auth_schemes(registry,'/v2/_catalog')
if args.delete:
print("Will delete all but {0} last tags".format(keep_last_versions))
@@ -682,10 +713,14 @@ def main_loop(args):
layer['blobSum']))
# add tags to "tags_to_keep" list, if we have regexp "tags_to_keep"
# entries:
# entries or a number of hours for "keep_by_hours":
keep_tags = []
if args.keep_tags_like:
keep_tags.extend(get_tags_like(args.keep_tags_like, tags_list))
if args.keep_by_hours:
keep_tags.extend(get_newer_tags(registry, image_name,
args.keep_by_hours, tags_list))
keep_tags = list(set(keep_tags)) # Eliminate duplicates
# delete tags if told so
if args.delete or args.delete_all: