#!/usr/bin/env python ###### # github repository: https://github.com/andrey-pohilko/registry-cli # # please read more details about the script, usage options and license info there ###### import requests import ast from requests.packages.urllib3.exceptions import InsecureRequestWarning import json import pprint import base64 import re import sys import os import argparse import www_authenticate from datetime import timedelta, datetime as dt from getpass import getpass from multiprocessing.pool import ThreadPool from dateutil.parser import parse from dateutil.tz import tzutc # this is a registry manipulator, can do following: # - list all images (including layers) # - delete images # - all except last N images # - all images and/or tags # # run # registry.py -h # to get more help # # important: after removing the tags, run the garbage collector # on your registry host: # docker-compose -f [path_to_your_docker_compose_file] run \ # registry bin/registry garbage-collect \ # /etc/docker/registry/config.yml # # or if you are not using docker-compose: # docker run registry:2 bin/registry garbage-collect \ # /etc/docker/registry/config.yml # # for more detail on garbage collection read here: # https://docs.docker.com/registry/garbage-collection/ # number of image versions to keep CONST_KEEP_LAST_VERSIONS = 10 # print debug messages DEBUG = False # this class is created for testing class Requests: def request(self, method, url, **kwargs): return requests.request(method, url, **kwargs) def bearer_request(self, method, url, auth, **kwargs): global DEBUG if DEBUG: print("[debug][funcname]: bearer_request()") if DEBUG: print('[debug][registry][request]: {0} {1}'.format(method, url)) if 'Authorization' in kwargs['headers']: print('[debug][registry][request]: Authorization header:') token_parsed = kwargs['headers']['Authorization'].split('.') pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0]))) pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1]))) res = requests.request(method, url, **kwargs) if str(res.status_code)[0] == '2': if DEBUG: print("[debug][registry] accepted") return (res, kwargs['headers']['Authorization']) if res.status_code == 401: if DEBUG: print("[debug][registry] Access denied. Refreshing token...") oauth = www_authenticate.parse(res.headers['Www-Authenticate']) if DEBUG: print('[debug][auth][answer] Auth header:') pprint.pprint(oauth['bearer']) # print('[info] retreiving bearer token for {0}'.format(oauth['bearer']['scope'])) request_url = '{0}'.format(oauth['bearer']['realm']) query_separator = '?' if 'service' in oauth['bearer']: request_url += '{0}service={1}'.format(query_separator, oauth['bearer']['service']) query_separator = '&' if 'scope' in oauth['bearer']: request_url += '{0}scope={1}'.format(query_separator, oauth['bearer']['scope']) if DEBUG: print('[debug][auth][request] Refreshing auth token: POST {0}'.format(request_url)) if args.auth_method == 'GET': try_oauth = requests.get(request_url, auth=auth, **kwargs) else: try_oauth = requests.post(request_url, auth=auth, **kwargs) try: oauth_response = ast.literal_eval(try_oauth._content.decode('utf-8')) token = oauth_response['access_token'] if 'access_token' in oauth_response else oauth_response['token'] except SyntaxError: print('\n\n[ERROR] couldnt accure token: {0}'.format(try_oauth._content)) sys.exit(1) if DEBUG: print('[debug][auth] token issued: ') token_parsed=token.split('.') pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0]))) pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1]))) kwargs['headers']['Authorization'] = 'Bearer {0}'.format(token) else: return (res, kwargs['headers']['Authorization']) res = requests.request(method, url, **kwargs) return (res, kwargs['headers']['Authorization']) def natural_keys(text): """ alist.sort(key=natural_keys) sorts in human order http://nedbatchelder.com/blog/200712/human_sorting.html (See Toothy's implementation in the comments) """ def __atoi(text): return int(text) if text.isdigit() else text return [__atoi(c) for c in re.split('(\d+)', text)] def decode_base64(data): """Decode base64, padding being optional. :param data: Base64 data as an ASCII byte string :returns: The decoded byte string. """ data = data.replace('Bearer ','') # print('[debug] base64 string to decode:\n{0}'.format(data)) missing_padding = len(data) % 4 if missing_padding != 0: data += b'='* (4 - missing_padding) return base64.decodestring(data) def get_error_explanation(context, error_code): error_list = {"delete_tag_405": 'You might want to set REGISTRY_STORAGE_DELETE_ENABLED: "true" in your registry', "get_tag_digest_404": "Try adding flag --digest-method=GET"} key = "%s_%s" % (context, error_code) if key in error_list.keys(): return(error_list[key]) return '' def get_auth_schemes(r,path): """ Returns list of auth schemes(lowcased) if www-authenticate: header exists returns None if no header found - www-authenticate: basic - www-authenticate: bearer """ if DEBUG: print("[debug][funcname]: get_auth_schemes()") try_oauth = requests.head('{0}{1}'.format(r.hostname,path), verify=not r.no_validate_ssl) if 'Www-Authenticate' in try_oauth.headers: oauth = www_authenticate.parse(try_oauth.headers['Www-Authenticate']) if DEBUG: print('[debug][docker] Auth schemes found:{0}'.format([m for m in oauth])) return [m.lower() for m in oauth] else: if DEBUG: print('[debug][docker] No Auth schemes found') return [] # class to manipulate registry class Registry: # this is required for proper digest processing HEADERS = {"Accept": "application/vnd.docker.distribution.manifest.v2+json"} def __init__(self): self.username = None self.password = None self.auth_schemes = [] self.hostname = None self.no_validate_ssl = False self.http = None self.last_error = None self.digest_method = "HEAD" def parse_login(self, login): if login is not None: if ':' not in login: self.last_error = "Please provide -l in the form USER:PASSWORD" return (None, None) self.last_error = None (username, password) = login.split(':', 1) username = username.strip('"').strip("'") password = password.strip('"').strip("'") return (username, password) return (None, None) @staticmethod def _create(host, login, no_validate_ssl, digest_method = "HEAD"): r = Registry() (r.username, r.password) = r.parse_login(login) if r.last_error is not None: print(r.last_error) sys.exit(1) r.hostname = host r.no_validate_ssl = no_validate_ssl r.http = Requests() r.digest_method = digest_method return r @staticmethod def create(*args, **kw): return Registry._create(*args, **kw) def send(self, path, method="GET"): if 'bearer' in self.auth_schemes: (result, self.HEADERS['Authorization']) = self.http.bearer_request( method, "{0}{1}".format(self.hostname, path), auth=(('', '') if self.username in ["", None] else (self.username, self.password)), headers=self.HEADERS, verify=not self.no_validate_ssl) else: result = self.http.request( method, "{0}{1}".format(self.hostname, path), headers=self.HEADERS, auth=(None if self.username == "" else (self.username, self.password)), verify=not self.no_validate_ssl) # except Exception as error: # print("cannot connect to {0}\nerror {1}".format( # self.hostname, # error)) # sys.exit(1) if str(result.status_code)[0] == '2': self.last_error = None return result self.last_error = result.status_code return None def list_images(self): result = self.send('/v2/_catalog?n=10000') if result is None: return [] return json.loads(result.text)['repositories'] def list_tags(self, image_name): result = self.send("/v2/{0}/tags/list".format(image_name)) if result is None: return [] try: tags_list = json.loads(result.text)['tags'] except ValueError: self.last_error = "list_tags: invalid json response" return [] if tags_list is not None: tags_list.sort(key=natural_keys) return tags_list # def list_tags_like(self, tag_like, args_tags_like): # for tag_like in args_tags_like: # print("tag like: {0}".format(tag_like)) # for tag in all_tags_list: # if re.search(tag_like, tag): # print("Adding {0} to tags list".format(tag)) def get_tag_digest(self, image_name, tag): image_headers = self.send("/v2/{0}/manifests/{1}".format( image_name, tag), method=self.digest_method) if image_headers is None: print(" tag digest not found: {0}.".format(self.last_error)) print(get_error_explanation("get_tag_digest", self.last_error)) return None tag_digest = image_headers.headers['Docker-Content-Digest'] return tag_digest def delete_tag(self, image_name, tag, dry_run, tag_digests_to_ignore): if dry_run: print('would delete tag {0}'.format(tag)) return False tag_digest = self.get_tag_digest(image_name, tag) if tag_digest in tag_digests_to_ignore: print("Digest {0} for tag {1} is referenced by another tag or has already been deleted and will be ignored".format( tag_digest, tag)) return True if tag_digest is None: return False delete_result = self.send("/v2/{0}/manifests/{1}".format( image_name, tag_digest), method="DELETE") if delete_result is None: print("failed, error: {0}".format(self.last_error)) print(get_error_explanation("delete_tag", self.last_error)) return False tag_digests_to_ignore.append(tag_digest) print("done") return True def list_tag_layers(self, image_name, tag): layers_result = self.send("/v2/{0}/manifests/{1}".format( image_name, tag)) if layers_result is None: print("error {0}".format(self.last_error)) return [] json_result = json.loads(layers_result.text) if json_result['schemaVersion'] == 1: layers = json_result['fsLayers'] else: layers = json_result['layers'] return layers def get_tag_config(self, image_name, tag): config_result = self.send( "/v2/{0}/manifests/{1}".format(image_name, tag)) if config_result is None: print(" tag digest not found: {0}".format(self.last_error)) return [] json_result = json.loads(config_result.text) if json_result['schemaVersion'] == 1: print("Docker schemaVersion 1 isn't supported for deleting by age now") sys.exit(1) else: tag_config = json_result['config'] return tag_config def get_image_age(self, image_name, image_config): container_header = {"Accept": "{0}".format( image_config['mediaType'])} if 'bearer' in self.auth_schemes: container_header['Authorization'] = self.HEADERS['Authorization'] (response, self.HEADERS['Authorization']) = self.http.bearer_request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format( image_name, image_config['digest'])), auth=(('', '') if self.username in ["", None] else (self.username, self.password)), headers=container_header, verify=not self.no_validate_ssl) else: response = self.http.request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format( image_name, image_config['digest'])), headers=container_header, auth=(None if self.username == "" else (self.username, self.password)), verify=not self.no_validate_ssl) if str(response.status_code)[0] == '2': self.last_error = None image_age = json.loads(response.text) return image_age['created'] else: print(" blob not found: {0}".format(self.last_error)) self.last_error = response.status_code return [] def parse_args(args=None): parser = argparse.ArgumentParser( description="List or delete images from Docker registry", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=(""" IMPORTANT: after removing the tags, run the garbage collector on your registry host: docker-compose -f [path_to_your_docker_compose_file] run \\ registry bin/registry garbage-collect \\ /etc/docker/registry/config.yml or if you are not using docker-compose: docker run registry:2 bin/registry garbage-collect \\ /etc/docker/registry/config.yml for more detail on garbage collection read here: https://docs.docker.com/registry/garbage-collection/ """)) parser.add_argument( '-l', '--login', help="Login and password for access to docker registry", required=False, metavar="USER:PASSWORD") parser.add_argument( '-w', '--read-password', help="Read password from stdin (and prompt if stdin is a TTY); " + "the final line-ending character(s) will be removed; " + "the :PASSWORD portion of the -l option is not required and " + "will be ignored", action='store_const', default=False, const=True) parser.add_argument( '-r', '--host', help="Hostname for registry server, e.g. https://example.com:5000", required=True, metavar="URL") parser.add_argument( '-d', '--delete', help=('If specified, delete all but last {0} tags ' 'of all images').format(CONST_KEEP_LAST_VERSIONS), action='store_const', default=False, const=True) parser.add_argument( '-n', '--num', help=('Set the number of tags to keep' '({0} if not set)').format(CONST_KEEP_LAST_VERSIONS), default=CONST_KEEP_LAST_VERSIONS, nargs='?', metavar='N') parser.add_argument( '--debug', help=('Turn debug output'), action='store_const', default=False, const=True) parser.add_argument( '--dry-run', help=('If used in combination with --delete,' 'then images will not be deleted'), action='store_const', default=False, const=True) parser.add_argument( '-i', '--image', help='Specify images and tags to list/delete', nargs='+', metavar="IMAGE:[TAG]") parser.add_argument( '--images-like', nargs='+', help="List of images (regexp check) that will be handled", required=False, default=[]) parser.add_argument( '--keep-tags', nargs='+', help="List of tags that will be omitted from deletion if used in combination with --delete or --delete-all", required=False, default=[]) parser.add_argument( '--tags-like', nargs='+', help="List of tags (regexp check) that will be handled", required=False, default=[]) parser.add_argument( '--keep-tags-like', nargs='+', help="List of tags (regexp check) that will be omitted from deletion if used in combination with --delete or --delete-all", required=False, default=[]) parser.add_argument( '--no-validate-ssl', help="Disable ssl validation", action='store_const', default=False, const=True) parser.add_argument( '--delete-all', help="Will delete all tags. Be careful with this!", const=True, default=False, action="store_const") parser.add_argument( '--layers', help=('Show layers digests for all images and all tags'), action='store_const', default=False, const=True) parser.add_argument( '--delete-by-hours', help=('Will delete all tags that are older than specified hours. Be careful!'), default=False, nargs='?', metavar='Hours') parser.add_argument( '--keep-by-hours', help=('Will keep all tags that are newer than specified hours.'), default=False, nargs='?', metavar='Hours') parser.add_argument( '--digest-method', help=('Use HEAD for standard docker registry or GET for NEXUS'), default='HEAD', metavar="HEAD|GET" ) parser.add_argument( '--auth-method', help=('Use POST or GET to get JWT tokens'), default='POST', metavar="POST|GET" ) parser.add_argument( '--order-by-date', help=('Orders images by date instead of by tag name.' 'Useful if your tag names are not in a fixed order.'), action='store_true' ) parser.add_argument( '--plain', help=('Turn plain output, one image:tag per line.' 'Useful if your want to send the results to another command.'), action='store_true', default=False ) return parser.parse_args(args) def delete_tags( registry, image_name, dry_run, tags_to_delete, tags_to_keep): keep_tag_digests = [] if tags_to_keep: print("Getting digests for tags to keep:") for tag in tags_to_keep: print("Getting digest for tag {0}".format(tag)) digest = registry.get_tag_digest(image_name, tag) if digest is None: print("Tag {0} does not exist for image {1}. Ignore here.".format( tag, image_name)) continue print("Keep digest {0} for tag {1}".format(digest, tag)) keep_tag_digests.append(digest) def delete(tag): print(" deleting tag {0}".format(tag)) registry.delete_tag(image_name, tag, dry_run, keep_tag_digests) p = ThreadPool(4) tasks = [] for tag in tags_to_delete: if tag in tags_to_keep: continue tasks.append(p.apply_async(delete, args=(tag,))) for task in tasks: task.get() p.close() p.join() # deleting layers is disabled because # it also deletes shared layers ## # for layer in registry.list_tag_layers(image_name, tag): # layer_digest = layer['digest'] # registry.delete_tag_layer(image_name, layer_digest, dry_run) def get_tags_like(args_tags_like, tags_list, plain): result = set() for tag_like in args_tags_like: if not plain: print("tag like: {0}".format(tag_like)) for tag in tags_list: if re.search(tag_like, tag): if not plain: print("Adding {0} to tags list".format(tag)) result.add(tag) return result def get_tags(all_tags_list, image_name, tags_like, plain): # check if there are args for special tags result = set() if tags_like: result = get_tags_like(tags_like, all_tags_list, plain) else: result.update(all_tags_list) # get tags from image name if any if ":" in image_name: (image_name, tag_name) = image_name.split(":") result = set([tag_name]) return result def delete_tags_by_age(registry, image_name, dry_run, hours, tags_to_keep): image_tags = registry.list_tags(image_name) tags_to_delete = [] print('---------------------------------') for tag in image_tags: image_config = registry.get_tag_config(image_name, tag) if image_config == []: print("tag not found") continue image_age = registry.get_image_age(image_name, image_config) if image_age == []: print("timestamp not found") continue if parse(image_age).astimezone(tzutc()) < dt.now(tzutc()) - timedelta(hours=int(hours)): print("will be deleted tag: {0} timestamp: {1}".format( tag, image_age)) tags_to_delete.append(tag) print('------------deleting-------------') delete_tags(registry, image_name, dry_run, tags_to_delete, tags_to_keep) def get_newer_tags(registry, image_name, hours, tags_list): def newer(tag): image_config = registry.get_tag_config(image_name, tag) if image_config == []: print("tag not found") return None image_age = registry.get_image_age(image_name, image_config) if image_age == []: print("timestamp not found") return None if parse(image_age).astimezone(tzutc()) >= dt.now(tzutc()) - timedelta(hours=int(hours)): print("Keeping tag: {0} timestamp: {1}".format( tag, image_age)) return tag else: print("Will delete tag: {0} timestamp: {1}".format( tag, image_age)) return None print('---------------------------------') p = ThreadPool(4) result = list(x for x in p.map(newer, tags_list) if x) p.close() p.join() return result def get_datetime_tags(registry, image_name, tags_list, plain): def newer(tag): image_config = registry.get_tag_config(image_name, tag) if image_config == []: print("tag not found") return None image_age = registry.get_image_age(image_name, image_config) if image_age == []: print("timestamp not found") return None return { "tag": tag, "datetime": parse(image_age).astimezone(tzutc()) } if not plain: print('---------------------------------') p = ThreadPool(4) result = list(x for x in p.map(newer, tags_list) if x) p.close() p.join() return result def keep_images_like(image_list, regexp_list): if image_list is None or regexp_list is None: return [] result = [] regexp_list = list(map(re.compile, regexp_list)) for image in image_list: for regexp in regexp_list: if re.search(regexp, image): result.append(image) break return result def get_ordered_tags(registry, image_name, tags_list, order_by_date=False): if order_by_date: tags_date = get_datetime_tags(registry, image_name, tags_list) sorted_tags_by_date = sorted( tags_date, key=lambda x: x["datetime"] ) return [x["tag"] for x in sorted_tags_by_date] return sorted(tags_list, key=natural_keys) def main_loop(args): global DEBUG DEBUG = True if args.debug else False keep_last_versions = int(args.num) if args.no_validate_ssl: requests.packages.urllib3.disable_warnings(InsecureRequestWarning) if args.read_password: if args.login is None: print("Please provide -l when using -w") sys.exit(1) if ':' in args.login: (username, password) = args.login.split(':', 1) else: username = args.login if sys.stdin.isatty(): # likely interactive usage password = getpass() else: # allow password to be piped or redirected in password = sys.stdin.read() if len(password) == 0: print("Password was not provided") sys.exit(1) if password[-(len(os.linesep)):] == os.linesep: password = password[0:-(len(os.linesep))] args.login = username + ':' + password registry = Registry.create(args.host, args.login, args.no_validate_ssl, args.digest_method) registry.auth_schemes = get_auth_schemes(registry,'/v2/_catalog') if args.delete: print("Will delete all but {0} last tags".format(keep_last_versions)) if args.image is not None: image_list = args.image else: image_list = registry.list_images() if args.images_like: image_list = keep_images_like(image_list, args.images_like) # loop through registry's images # or through the ones given in command line for image_name in image_list: if not args.plain: print("---------------------------------") print("Image: {0}".format(image_name)) all_tags_list = registry.list_tags(image_name) if not all_tags_list: print(" no tags!") continue if args.order_by_date: tags_list = get_ordered_tags(registry, image_name, all_tags_list, args.order_by_date) else: tags_list = get_tags(all_tags_list, image_name, args.tags_like, args.plain) # print(tags and optionally layers for tag in tags_list: if not args.plain: print(" tag: {0}".format(tag)) else: print("{0}:{1}".format(image_name, tag)) if args.layers: for layer in registry.list_tag_layers(image_name, tag): if 'size' in layer: print(" layer: {0}, size: {1}".format( layer['digest'], layer['size'])) else: print(" layer: {0}".format( layer['blobSum'])) # add tags to "tags_to_keep" list if we have regexp "tags_to_keep" # entries, a number of hours for "keep_by_hours" or if the user # explicitly specified tags to always keep. keep_tags = [] keep_tags.extend(args.keep_tags) if args.keep_tags_like: keep_tags.extend(get_tags_like(args.keep_tags_like, tags_list)) if args.keep_by_hours: keep_tags.extend(get_newer_tags(registry, image_name, args.keep_by_hours, tags_list)) keep_tags = list(set(keep_tags)) # Eliminate duplicates # delete tags if told so if args.delete or args.delete_all: if args.delete_all: tags_list_to_delete = list(tags_list) else: ordered_tags_list = get_ordered_tags(registry, image_name, tags_list, args.order_by_date) tags_list_to_delete = ordered_tags_list[:-keep_last_versions] # A manifest might be shared between different tags. Explicitly add those # tags that we want to preserve to the keep_tags list, to prevent # any manifest they are using from being deleted. tags_list_to_keep = [ tag for tag in tags_list if tag not in tags_list_to_delete] keep_tags.extend(tags_list_to_keep) keep_tags.sort() # Make order deterministic for testing delete_tags( registry, image_name, args.dry_run, tags_list_to_delete, keep_tags) # delete tags by age in hours if args.delete_by_hours: delete_tags_by_age(registry, image_name, args.dry_run, args.delete_by_hours, keep_tags) if __name__ == "__main__": args = parse_args() try: main_loop(args) except KeyboardInterrupt: print("Ctrl-C pressed, quitting") sys.exit(1)