Merge pull request #39 from Qlean/master

Add deleting images by age in hours
This commit is contained in:
andrey-pohilko
2018-02-06 01:56:05 +06:00
committed by GitHub
3 changed files with 444 additions and 148 deletions

View File

@@ -121,9 +121,13 @@ Delete all tags for particular image (e.g. delete all ubuntu tags):
Delete all tags for all images (do you really want to do it?): Delete all tags for all images (do you really want to do it?):
``` ```
registry.py -l user:pass -r https://example.com:5000 --delete-all registry.py -l user:pass -r https://example.com:5000 --delete-all --dry-run
``` ```
Delete all tags by age in hours for the particular image (e.g. older than 24 hours, with --keep-tags and --keep-tags-like options, --dry-run for safe).
```
registry.py -r https://example.com:5000 -i api-docs-origin/master --dry-run --delete-by-hours 24 --keep-tags c59c02c25f023263fd4b5d43fc1ff653f08b3d4x --keep-tags-like late
```
## Disable ssl verification ## Disable ssl verification
If you are using docker registry with a self signed ssl certificate, you can disable ssl verification: If you are using docker registry with a self signed ssl certificate, you can disable ssl verification:

View File

@@ -6,39 +6,44 @@ from requests.packages.urllib3.exceptions import InsecureRequestWarning
import json import json
import re import re
import argparse import argparse
from datetime import timedelta, datetime as dt
## this is a registry manipulator, can do following: # this is a registry manipulator, can do following:
## - list all images (including layers) # - list all images (including layers)
## - delete images # - delete images
## - all except last N images # - all except last N images
## - all images and/or tags # - all images and/or tags
## #
## run # run
## registry.py -h # registry.py -h
## to get more help # to get more help
## #
## important: after removing the tags, run the garbage collector # important: after removing the tags, run the garbage collector
## on your registry host: # on your registry host:
## docker-compose -f [path_to_your_docker_compose_file] run \ # docker-compose -f [path_to_your_docker_compose_file] run \
## registry bin/registry garbage-collect \ # registry bin/registry garbage-collect \
## /etc/docker/registry/config.yml # /etc/docker/registry/config.yml
## #
## or if you are not using docker-compose: # or if you are not using docker-compose:
## docker run registry:2 bin/registry garbage-collect \ # docker run registry:2 bin/registry garbage-collect \
## /etc/docker/registry/config.yml # /etc/docker/registry/config.yml
## #
## for more detail on garbage collection read here: # for more detail on garbage collection read here:
## https://docs.docker.com/registry/garbage-collection/ # https://docs.docker.com/registry/garbage-collection/
# number of image versions to keep # number of image versions to keep
CONST_KEEP_LAST_VERSIONS = 10 CONST_KEEP_LAST_VERSIONS = 10
# this class is created for testing # this class is created for testing
class Requests: class Requests:
def request(self, method, url, **kwargs): def request(self, method, url, **kwargs):
return requests.request(method, url, **kwargs) return requests.request(method, url, **kwargs)
def natural_keys(text): def natural_keys(text):
''' '''
alist.sort(key=natural_keys) sorts in human order alist.sort(key=natural_keys) sorts in human order
@@ -49,7 +54,7 @@ def natural_keys(text):
def __atoi(text): def __atoi(text):
return int(text) if text.isdigit() else text return int(text) if text.isdigit() else text
return [ __atoi(c) for c in re.split('(\d+)', text) ] return [__atoi(c) for c in re.split('(\d+)', text)]
# class to manipulate registry # class to manipulate registry
@@ -82,7 +87,6 @@ class Registry:
return (None, None) return (None, None)
@staticmethod @staticmethod
def create(host, login, no_validate_ssl): def create(host, login, no_validate_ssl):
r = Registry() r = Registry()
@@ -97,15 +101,14 @@ class Registry:
r.http = Requests() r.http = Requests()
return r return r
def send(self, path, method="GET"): def send(self, path, method="GET"):
# try: # try:
result = self.http.request( result = self.http.request(
method, "{0}{1}".format(self.hostname, path), method, "{0}{1}".format(self.hostname, path),
headers = self.HEADERS, headers=self.HEADERS,
auth=(None if self.username == "" auth=(None if self.username == ""
else (self.username, self.password)), else (self.username, self.password)),
verify = not self.no_validate_ssl) verify=not self.no_validate_ssl)
# except Exception as error: # except Exception as error:
# print("cannot connect to {0}\nerror {1}".format( # print("cannot connect to {0}\nerror {1}".format(
@@ -116,11 +119,11 @@ class Registry:
self.last_error = None self.last_error = None
return result return result
self.last_error=result.status_code self.last_error = result.status_code
return None return None
def list_images(self): def list_images(self):
result = self.send('/v2/_catalog') result = self.send('/v2/_catalog?n=10000')
if result == None: if result == None:
return [] return []
@@ -169,7 +172,8 @@ class Registry:
tag_digest = self.get_tag_digest(image_name, tag) tag_digest = self.get_tag_digest(image_name, tag)
if tag_digest in tag_digests_to_ignore: if tag_digest in tag_digests_to_ignore:
print("Digest {0} for tag {1} is referenced by another tag or has already been deleted and will be ignored".format(tag_digest, tag)) print("Digest {0} for tag {1} is referenced by another tag or has already been deleted and will be ignored".format(
tag_digest, tag))
return True return True
if tag_digest == None: if tag_digest == None:
@@ -205,7 +209,6 @@ class Registry:
# print("done") # print("done")
# return True # return True
def list_tag_layers(self, image_name, tag): def list_tag_layers(self, image_name, tag):
layers_result = self.send("/v2/{0}/manifests/{1}".format( layers_result = self.send("/v2/{0}/manifests/{1}".format(
image_name, tag)) image_name, tag))
@@ -222,7 +225,45 @@ class Registry:
return layers return layers
def parse_args(args = None): def get_tag_config(self, image_name, tag):
config_result = self.send(
"/v2/{0}/manifests/{1}".format(image_name, tag))
if config_result == None:
print(" tag digest not found: {0}".format(self.last_error))
return []
json_result = json.loads(config_result.text)
if json_result['schemaVersion'] == 1:
print("Docker schemaVersion 1 isn't supported for deleting by age now")
exit(1)
else:
tag_config = json_result['config']
return tag_config
def get_image_age(self, image_name, image_config):
container_header = {"Accept": "{0}".format(
image_config['mediaType'])}
response = self.http.request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format(
image_name, image_config['digest'])),
headers=container_header,
auth=(None if self.username == ""
else (self.username, self.password)),
verify=not self.no_validate_ssl)
if str(response.status_code)[0] == '2':
self.last_error = None
image_age = json.loads(response.text)
return image_age['created']
else:
print(" blob not found: {0}".format(self.last_error))
self.last_error = response.status_code
return []
def parse_args(args=None):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="List or delete images from Docker registry", description="List or delete images from Docker registry",
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
@@ -243,19 +284,19 @@ for more detail on garbage collection read here:
https://docs.docker.com/registry/garbage-collection/ https://docs.docker.com/registry/garbage-collection/
""")) """))
parser.add_argument( parser.add_argument(
'-l','--login', '-l', '--login',
help="Login and password to access to docker registry", help="Login and password to access to docker registry",
required=False, required=False,
metavar="USER:PASSWORD") metavar="USER:PASSWORD")
parser.add_argument( parser.add_argument(
'-r','--host', '-r', '--host',
help="Hostname for registry server, e.g. https://example.com:5000", help="Hostname for registry server, e.g. https://example.com:5000",
required=True, required=True,
metavar="URL") metavar="URL")
parser.add_argument( parser.add_argument(
'-d','--delete', '-d', '--delete',
help=('If specified, delete all but last {0} tags ' help=('If specified, delete all but last {0} tags '
'of all images').format(CONST_KEEP_LAST_VERSIONS), 'of all images').format(CONST_KEEP_LAST_VERSIONS),
action='store_const', action='store_const',
@@ -263,7 +304,7 @@ for more detail on garbage collection read here:
const=True) const=True)
parser.add_argument( parser.add_argument(
'-n','--num', '-n', '--num',
help=('Set the number of tags to keep' help=('Set the number of tags to keep'
'({0} if not set)').format(CONST_KEEP_LAST_VERSIONS), '({0} if not set)').format(CONST_KEEP_LAST_VERSIONS),
default=CONST_KEEP_LAST_VERSIONS, default=CONST_KEEP_LAST_VERSIONS,
@@ -279,7 +320,7 @@ for more detail on garbage collection read here:
const=True) const=True)
parser.add_argument( parser.add_argument(
'-i','--image', '-i', '--image',
help='Specify images and tags to list/delete', help='Specify images and tags to list/delete',
nargs='+', nargs='+',
metavar="IMAGE:[TAG]") metavar="IMAGE:[TAG]")
@@ -326,6 +367,12 @@ for more detail on garbage collection read here:
default=False, default=False,
const=True) const=True)
parser.add_argument(
'--delete-by-hours',
help=('Will delete all tags that older than specified hours. Be careful!'),
default=False,
nargs='?',
metavar='Hours')
return parser.parse_args(args) return parser.parse_args(args)
@@ -342,7 +389,8 @@ def delete_tags(
print("Getting digest for tag {0}".format(tag)) print("Getting digest for tag {0}".format(tag))
digest = registry.get_tag_digest(image_name, tag) digest = registry.get_tag_digest(image_name, tag)
if digest is None: if digest is None:
print("Tag {0} does not exist for image {1}. Ignore here.".format(tag, image_name)) print("Tag {0} does not exist for image {1}. Ignore here.".format(
tag, image_name))
continue continue
print("Keep digest {0} for tag {1}".format(digest, tag)) print("Keep digest {0} for tag {1}".format(digest, tag))
@@ -355,15 +403,16 @@ def delete_tags(
print(" deleting tag {0}".format(tag)) print(" deleting tag {0}".format(tag))
## deleting layers is disabled because # deleting layers is disabled because
## it also deletes shared layers # it also deletes shared layers
## ##
## for layer in registry.list_tag_layers(image_name, tag): # for layer in registry.list_tag_layers(image_name, tag):
## layer_digest = layer['digest'] # layer_digest = layer['digest']
## registry.delete_tag_layer(image_name, layer_digest, dry_run) # registry.delete_tag_layer(image_name, layer_digest, dry_run)
registry.delete_tag(image_name, tag, dry_run, keep_tag_digests) registry.delete_tag(image_name, tag, dry_run, keep_tag_digests)
def get_tags_like(args_tags_like, tags_list): def get_tags_like(args_tags_like, tags_list):
result = set() result = set()
for tag_like in args_tags_like: for tag_like in args_tags_like:
@@ -374,6 +423,7 @@ def get_tags_like(args_tags_like, tags_list):
result.add(tag) result.add(tag)
return result return result
def get_tags(all_tags_list, image_name, tags_like): def get_tags(all_tags_list, image_name, tags_like):
# check if there are args for special tags # check if there are args for special tags
result = set() result = set()
@@ -389,6 +439,33 @@ def get_tags(all_tags_list, image_name, tags_like):
return result return result
def delete_tags_by_age(registry, image_name, dry_run, hours, tags_to_keep):
image_tags = registry.list_tags(image_name)
tags_to_delete = []
print('---------------------------------')
for tag in image_tags:
image_config = registry.get_tag_config(image_name, tag)
if image_config == []:
print("tag not found")
continue
image_age = registry.get_image_age(image_name, image_config)
if image_age == []:
print("timestamp not found")
continue
if dt.strptime(image_age[:-4], "%Y-%m-%dT%H:%M:%S.%f") < dt.now() - timedelta(hours=int(hours)):
print("will be deleted tag: {0} timestamp: {1}".format(
tag, image_age))
tags_to_delete.append(tag)
print('------------deleting-------------')
delete_tags(registry, image_name, dry_run, tags_to_delete, tags_to_keep)
def main_loop(args): def main_loop(args):
keep_last_versions = int(args.num) keep_last_versions = int(args.num)
@@ -431,29 +508,37 @@ def main_loop(args):
print(" layer: {0}".format( print(" layer: {0}".format(
layer['blobSum'])) layer['blobSum']))
# add tags to "tags_to_keep" list, if we have regexp "tags_to_keep" entries: # add tags to "tags_to_keep" list, if we have regexp "tags_to_keep"
keep_tags=[] # entries:
keep_tags = []
if args.keep_tags_like: if args.keep_tags_like:
keep_tags.extend(get_tags_like(args.keep_tags_like, tags_list)) keep_tags.extend(get_tags_like(args.keep_tags_like, tags_list))
# delete tags if told so # delete tags if told so
if args.delete or args.delete_all: if args.delete or args.delete_all:
if args.delete_all: if args.delete_all:
tags_list_to_delete = list(tags_list) tags_list_to_delete = list(tags_list)
else: else:
tags_list_to_delete = sorted(tags_list, key=natural_keys)[:-keep_last_versions] tags_list_to_delete = sorted(tags_list, key=natural_keys)[
:-keep_last_versions]
# A manifest might be shared between different tags. Explicitly add those # A manifest might be shared between different tags. Explicitly add those
# tags that we want to preserve to the keep_tags list, to prevent # tags that we want to preserve to the keep_tags list, to prevent
# any manifest they are using from being deleted. # any manifest they are using from being deleted.
tags_list_to_keep = [tag for tag in tags_list if tag not in tags_list_to_delete] tags_list_to_keep = [
tag for tag in tags_list if tag not in tags_list_to_delete]
keep_tags.extend(tags_list_to_keep) keep_tags.extend(tags_list_to_keep)
delete_tags( delete_tags(
registry, image_name, args.dry_run, registry, image_name, args.dry_run,
tags_list_to_delete, keep_tags) tags_list_to_delete, keep_tags)
# delete tags by age in hours
if args.delete_by_hours:
keep_tags.extend(args.keep_tags)
delete_tags_by_age(registry, image_name, args.dry_run,
args.delete_by_hours, keep_tags)
if __name__ == "__main__": if __name__ == "__main__":
args = parse_args() args = parse_args()
try: try:
@@ -461,4 +546,3 @@ if __name__ == "__main__":
except KeyboardInterrupt: except KeyboardInterrupt:
print("Ctrl-C pressed, quitting") print("Ctrl-C pressed, quitting")
exit(1) exit(1)

310
test.py
View File

@@ -1,27 +1,29 @@
import unittest import unittest
from registry import Registry, Requests, get_tags, parse_args, delete_tags from registry import Registry, Requests, get_tags, parse_args, delete_tags, delete_tags_by_age
from mock import MagicMock from mock import MagicMock, patch
import requests import requests
class ReturnValue: class ReturnValue:
def __init__(self, status_code = 200, text = ""):
def __init__(self, status_code=200, text=""):
self.status_code = status_code self.status_code = status_code
self.text = text self.text = text
class MockRequests: class MockRequests:
def __init__(self, return_value = ReturnValue()): def __init__(self, return_value=ReturnValue()):
self.return_value = return_value self.return_value = return_value
self.request = MagicMock(return_value = self.return_value) self.request = MagicMock(return_value=self.return_value)
def reset_return_value(self, status_code = 200, text = ""): def reset_return_value(self, status_code=200, text=""):
self.return_value.status_code = status_code self.return_value.status_code = status_code
self.return_value.text = text self.return_value.text = text
class TestRequestsClass(unittest.TestCase): class TestRequestsClass(unittest.TestCase):
def test_requests_created(self): def test_requests_created(self):
# simply create requests class and make sure it raises an exception # simply create requests class and make sure it raises an exception
# from requests module # from requests module
@@ -33,6 +35,7 @@ class TestRequestsClass(unittest.TestCase):
class TestCreateMethod(unittest.TestCase): class TestCreateMethod(unittest.TestCase):
def test_create_nologin(self): def test_create_nologin(self):
r = Registry.create("testhost", None, False) r = Registry.create("testhost", None, False)
self.assertTrue(isinstance(r.http, Requests)) self.assertTrue(isinstance(r.http, Requests))
@@ -57,6 +60,7 @@ class TestCreateMethod(unittest.TestCase):
with self.assertRaises(SystemExit): with self.assertRaises(SystemExit):
Registry.create("testhost4", "invalid_login", False) Registry.create("testhost4", "invalid_login", False)
class TestParseLogin(unittest.TestCase): class TestParseLogin(unittest.TestCase):
def setUp(self): def setUp(self):
@@ -79,7 +83,8 @@ class TestParseLogin(unittest.TestCase):
(username, password) = self.registry.parse_login("username/password") (username, password) = self.registry.parse_login("username/password")
self.assertEqual(username, None) self.assertEqual(username, None)
self.assertEqual(password, None) self.assertEqual(password, None)
self.assertEqual(self.registry.last_error, "Please provide -l in the form USER:PASSWORD") self.assertEqual(self.registry.last_error,
"Please provide -l in the form USER:PASSWORD")
def test_login_args_singlequoted(self): def test_login_args_singlequoted(self):
(username, password) = self.registry.parse_login("'username':password") (username, password) = self.registry.parse_login("'username':password")
@@ -99,10 +104,12 @@ class TestParseLogin(unittest.TestCase):
then the result will be invalid in this case then the result will be invalid in this case
and no error will be printed and no error will be printed
""" """
(username, password) = self.registry.parse_login("'user:name':'pass:word'") (username, password) = self.registry.parse_login(
"'user:name':'pass:word'")
self.assertEqual(username, 'user') self.assertEqual(username, 'user')
self.assertEqual(password, "name':'pass:word") self.assertEqual(password, "name':'pass:word")
class TestRegistrySend(unittest.TestCase): class TestRegistrySend(unittest.TestCase):
def setUp(self): def setUp(self):
@@ -118,9 +125,9 @@ class TestRegistrySend(unittest.TestCase):
self.assertEqual(self.registry.last_error, None) self.assertEqual(self.registry.last_error, None)
self.registry.http.request.assert_called_with('GET', self.registry.http.request.assert_called_with('GET',
'http://testdomain.com/test_string', 'http://testdomain.com/test_string',
auth = (None, None), auth=(None, None),
headers = self.registry.HEADERS, headers=self.registry.HEADERS,
verify = True) verify=True)
def test_invalid_status_code(self): def test_invalid_status_code(self):
self.registry.http.reset_return_value(400) self.registry.http.reset_return_value(400)
@@ -128,7 +135,6 @@ class TestRegistrySend(unittest.TestCase):
self.assertEqual(response, None) self.assertEqual(response, None)
self.assertEqual(self.registry.last_error, 400) self.assertEqual(self.registry.last_error, 400)
def test_login_pass(self): def test_login_pass(self):
self.registry.username = "test_login" self.registry.username = "test_login"
self.registry.password = "test_password" self.registry.password = "test_password"
@@ -138,9 +144,10 @@ class TestRegistrySend(unittest.TestCase):
self.assertEqual(self.registry.last_error, None) self.assertEqual(self.registry.last_error, None)
self.registry.http.request.assert_called_with('GET', self.registry.http.request.assert_called_with('GET',
'http://testdomain.com/v2/catalog', 'http://testdomain.com/v2/catalog',
auth = ("test_login", "test_password"), auth=("test_login",
headers = self.registry.HEADERS, "test_password"),
verify = True) headers=self.registry.HEADERS,
verify=True)
class TestListImages(unittest.TestCase): class TestListImages(unittest.TestCase):
@@ -151,8 +158,8 @@ class TestListImages(unittest.TestCase):
self.registry.hostname = "http://testdomain.com" self.registry.hostname = "http://testdomain.com"
def test_list_images_ok(self): def test_list_images_ok(self):
self.registry.http.reset_return_value(status_code = 200, self.registry.http.reset_return_value(status_code=200,
text = '{"repositories":["image1","image2"]}') text='{"repositories":["image1","image2"]}')
response = self.registry.list_images() response = self.registry.list_images()
self.assertEqual(response, ["image1", "image2"]) self.assertEqual(response, ["image1", "image2"])
self.assertEqual(self.registry.last_error, None) self.assertEqual(self.registry.last_error, None)
@@ -165,6 +172,7 @@ class TestListImages(unittest.TestCase):
class TestListTags(unittest.TestCase): class TestListTags(unittest.TestCase):
def setUp(self): def setUp(self):
self.registry = Registry() self.registry = Registry()
self.registry.http = MockRequests() self.registry.http = MockRequests()
@@ -172,16 +180,16 @@ class TestListTags(unittest.TestCase):
self.registry.http.reset_return_value(200) self.registry.http.reset_return_value(200)
def test_list_one_tag_ok(self): def test_list_one_tag_ok(self):
self.registry.http.reset_return_value(status_code = 200, self.registry.http.reset_return_value(status_code=200,
text = u'{"name":"image1","tags":["0.1.306"]}') text=u'{"name":"image1","tags":["0.1.306"]}')
response = self.registry.list_tags('image1') response = self.registry.list_tags('image1')
self.assertEqual(response, ["0.1.306"]) self.assertEqual(response, ["0.1.306"])
self.assertEqual(self.registry.last_error, None) self.assertEqual(self.registry.last_error, None)
def test_list_tags_invalid_http_response(self): def test_list_tags_invalid_http_response(self):
self.registry.http.reset_return_value(status_code = 400, self.registry.http.reset_return_value(status_code=400,
text = "") text="")
response = self.registry.list_tags('image1') response = self.registry.list_tags('image1')
self.assertEqual(response, []) self.assertEqual(response, [])
@@ -193,7 +201,8 @@ class TestListTags(unittest.TestCase):
response = self.registry.list_tags('image1') response = self.registry.list_tags('image1')
self.assertEqual(response, []) self.assertEqual(response, [])
self.assertEqual(self.registry.last_error, "list_tags: invalid json response") self.assertEqual(self.registry.last_error,
"list_tags: invalid json response")
def test_list_one_tag_sorted(self): def test_list_one_tag_sorted(self):
self.registry.http.reset_return_value(status_code=200, self.registry.http.reset_return_value(status_code=200,
@@ -204,21 +213,24 @@ class TestListTags(unittest.TestCase):
self.assertEqual(self.registry.last_error, None) self.assertEqual(self.registry.last_error, None)
def test_list_tags_like_various(self): def test_list_tags_like_various(self):
tags_list = set(['FINAL_0.1', 'SNAPSHOT_0.1', "0.1.SNAP", "1.0.0_FINAL"]) tags_list = set(['FINAL_0.1', 'SNAPSHOT_0.1',
self.assertEqual(get_tags(tags_list, "", set(["FINAL"])), set(["FINAL_0.1", "1.0.0_FINAL"])) "0.1.SNAP", "1.0.0_FINAL"])
self.assertEqual(get_tags(tags_list, "", set(["SNAPSHOT"])), set(['SNAPSHOT_0.1'])) self.assertEqual(get_tags(tags_list, "", set(
["FINAL"])), set(["FINAL_0.1", "1.0.0_FINAL"]))
self.assertEqual(get_tags(tags_list, "", set(
["SNAPSHOT"])), set(['SNAPSHOT_0.1']))
self.assertEqual(get_tags(tags_list, "", set()), self.assertEqual(get_tags(tags_list, "", set()),
set(['FINAL_0.1', 'SNAPSHOT_0.1', "0.1.SNAP", "1.0.0_FINAL"])) set(['FINAL_0.1', 'SNAPSHOT_0.1', "0.1.SNAP", "1.0.0_FINAL"]))
self.assertEqual(get_tags(tags_list, "", set(["ABSENT"])), set()) self.assertEqual(get_tags(tags_list, "", set(["ABSENT"])), set())
self.assertEqual(get_tags(tags_list, "IMAGE:TAG00", ""), set(["TAG00"])) self.assertEqual(
self.assertEqual(get_tags(tags_list, "IMAGE:TAG00", set(["WILL_NOT_BE_CONSIDERED"])), set(["TAG00"])) get_tags(tags_list, "IMAGE:TAG00", ""), set(["TAG00"]))
self.assertEqual(get_tags(tags_list, "IMAGE:TAG00", set(
["WILL_NOT_BE_CONSIDERED"])), set(["TAG00"]))
class TestListDigest(unittest.TestCase): class TestListDigest(unittest.TestCase):
def setUp(self): def setUp(self):
self.registry = Registry() self.registry = Registry()
self.registry.http = MockRequests() self.registry.http = MockRequests()
@@ -226,8 +238,8 @@ class TestListDigest(unittest.TestCase):
self.registry.http.reset_return_value(200) self.registry.http.reset_return_value(200)
def test_get_digest_ok(self): def test_get_digest_ok(self):
self.registry.http.reset_return_value(status_code = 200, self.registry.http.reset_return_value(status_code=200,
text = ('{' text=('{'
'"schemaVersion": 2,\n ' '"schemaVersion": 2,\n '
'"mediaType": "application/vnd.docker.distribution.manifest.v2+json"' '"mediaType": "application/vnd.docker.distribution.manifest.v2+json"'
'"digest": "sha256:357ea8c3d80bc25792e010facfc98aee5972ebc47e290eb0d5aea3671a901cab"' '"digest": "sha256:357ea8c3d80bc25792e010facfc98aee5972ebc47e290eb0d5aea3671a901cab"'
@@ -243,12 +255,13 @@ class TestListDigest(unittest.TestCase):
self.registry.http.request.assert_called_with( self.registry.http.request.assert_called_with(
"HEAD", "HEAD",
"http://testdomain.com/v2/image1/manifests/0.1.300", "http://testdomain.com/v2/image1/manifests/0.1.300",
auth = (None, None), auth=(None, None),
headers = self.registry.HEADERS, headers=self.registry.HEADERS,
verify = True verify=True
) )
self.assertEqual(response, 'sha256:85295b0e7456a8fbbc886722b483f87f2bff553fa0beeaf37f5d807aff7c1e52') self.assertEqual(
response, 'sha256:85295b0e7456a8fbbc886722b483f87f2bff553fa0beeaf37f5d807aff7c1e52')
self.assertEqual(self.registry.last_error, None) self.assertEqual(self.registry.last_error, None)
def test_invalid_status_code(self): def test_invalid_status_code(self):
@@ -263,7 +276,139 @@ class TestListDigest(unittest.TestCase):
self.registry.get_tag_digest('image1', '0.1.300') self.registry.get_tag_digest('image1', '0.1.300')
class TestTagConfig(unittest.TestCase):
def setUp(self):
self.registry = Registry()
self.registry.http = MockRequests()
self.registry.hostname = "http://testdomain.com"
self.registry.http.reset_return_value(200)
def test_get_tag_config_ok(self):
self.registry.http.reset_return_value(
200,
'''
{
"schemaVersion": 2,
"mediaType": "application/vnd.docker.distribution.manifest.v2+json",
"config": {
"mediaType": "application/vnd.docker.container.image.v1+json",
"size": 12953,
"digest": "sha256:8d71dfbf239c0015ad66993d55d3954cee2d52d86f829fdff9ccfb9f23b75aa8"
},
"layers": [
{
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
"size": 51480140,
"digest": "sha256:c6b13209f43b945816b7658a567720983ac5037e3805a779d5772c61599b4f73"
}
]
}
'''
)
response = self.registry.get_tag_config('image1', '0.1.300')
self.registry.http.request.assert_called_with(
"GET",
"http://testdomain.com/v2/image1/manifests/0.1.300",
auth=(None, None),
headers=self.registry.HEADERS,
verify=True)
self.assertEqual(
response, {'mediaType': 'application/vnd.docker.container.image.v1+json', 'size': 12953, 'digest': 'sha256:8d71dfbf239c0015ad66993d55d3954cee2d52d86f829fdff9ccfb9f23b75aa8'})
self.assertEqual(self.registry.last_error, None)
def test_tag_config_scheme_v1(self):
with self.assertRaises(SystemExit):
self.registry.http.reset_return_value(
200,
'''
{
"schemaVersion": 1
}
'''
)
response = self.registry.get_tag_config('image1', '0.1.300')
def test_invalid_status_code(self):
self.registry.http.reset_return_value(400, "whatever")
response = self.registry.get_tag_config('image1', '0.1.300')
self.registry.http.request.assert_called_with(
"GET",
"http://testdomain.com/v2/image1/manifests/0.1.300",
auth=(None, None),
headers=self.registry.HEADERS,
verify=True
)
self.assertEqual(response, [])
self.assertEqual(self.registry.last_error, 400)
class TestImageAge(unittest.TestCase):
def setUp(self):
self.registry = Registry()
self.registry.http = MockRequests()
self.registry.hostname = "http://testdomain.com"
self.registry.http.reset_return_value(200)
def test_image_age_ok(self):
self.registry.http.reset_return_value(
200,
'''
{
"architecture": "amd64",
"author": "Test",
"config": {},
"container": "c467822c5981cd446068eebafd81cb5cde60d4341a945f3fbf67e456dde5af51",
"container_config": {},
"created": "2017-12-27T12:47:33.511765448Z",
"docker_version": "1.11.2",
"history": []
}
'''
)
json_payload = {'mediaType': 'application/vnd.docker.container.image.v1+json', 'size': 12953,
'digest': 'sha256:8d71dfbf239c0015ad66993d55d3954cee2d52d86f829fdff9ccfb9f23b75aa8'}
header = {"Accept": "{0}".format(json_payload['mediaType'])}
response = self.registry.get_image_age('image1', json_payload)
self.registry.http.request.assert_called_with(
"GET",
"http://testdomain.com/v2/image1/blobs/sha256:8d71dfbf239c0015ad66993d55d3954cee2d52d86f829fdff9ccfb9f23b75aa8",
auth=(None, None),
headers=header,
verify=True)
self.assertEqual(
response, "2017-12-27T12:47:33.511765448Z")
self.assertEqual(self.registry.last_error, None)
def test_image_age_nok(self):
self.registry.http.reset_return_value(400, "err")
json_payload = {'mediaType': 'application/vnd.docker.container.image.v1+json', 'size': 12953,
'digest': 'sha256:8d71dfbf239c0015ad66993d55d3954cee2d52d86f829fdff9ccfb9f23b75aa8'}
header = {"Accept": "{0}".format(json_payload['mediaType'])}
response = self.registry.get_image_age('image1', json_payload)
self.registry.http.request.assert_called_with(
"GET",
"http://testdomain.com/v2/image1/blobs/sha256:8d71dfbf239c0015ad66993d55d3954cee2d52d86f829fdff9ccfb9f23b75aa8",
auth=(None, None),
headers=header,
verify=True)
self.assertEqual(response, [])
self.assertEqual(self.registry.last_error, 400)
class TestListLayers(unittest.TestCase): class TestListLayers(unittest.TestCase):
def setUp(self): def setUp(self):
self.registry = Registry() self.registry = Registry()
self.registry.http = MockRequests() self.registry.http = MockRequests()
@@ -286,15 +431,14 @@ class TestListLayers(unittest.TestCase):
self.registry.http.request.assert_called_with( self.registry.http.request.assert_called_with(
"GET", "GET",
"http://testdomain.com/v2/image1/manifests/0.1.300", "http://testdomain.com/v2/image1/manifests/0.1.300",
auth = (None, None), auth=(None, None),
headers = self.registry.HEADERS, headers=self.registry.HEADERS,
verify = True verify=True
) )
self.assertEqual(response, "layers_list") self.assertEqual(response, "layers_list")
self.assertEqual(self.registry.last_error, None) self.assertEqual(self.registry.last_error, None)
def test_list_layers_schema_version_1_ok(self): def test_list_layers_schema_version_1_ok(self):
self.registry.http.reset_return_value( self.registry.http.reset_return_value(
200, 200,
@@ -311,9 +455,9 @@ class TestListLayers(unittest.TestCase):
self.registry.http.request.assert_called_with( self.registry.http.request.assert_called_with(
"GET", "GET",
"http://testdomain.com/v2/image1/manifests/0.1.300", "http://testdomain.com/v2/image1/manifests/0.1.300",
auth = (None, None), auth=(None, None),
headers = self.registry.HEADERS, headers=self.registry.HEADERS,
verify = True verify=True
) )
self.assertEqual(response, "layers_list") self.assertEqual(response, "layers_list")
@@ -335,7 +479,9 @@ class TestListLayers(unittest.TestCase):
self.assertEqual(response, []) self.assertEqual(response, [])
self.assertEqual(self.registry.last_error, 400) self.assertEqual(self.registry.last_error, 400)
class TestDeletion(unittest.TestCase): class TestDeletion(unittest.TestCase):
def setUp(self): def setUp(self):
self.registry = Registry() self.registry = Registry()
self.registry.http = MockRequests() self.registry.http = MockRequests()
@@ -347,14 +493,14 @@ class TestDeletion(unittest.TestCase):
'X-Content-Type-Options': 'nosniff' 'X-Content-Type-Options': 'nosniff'
} }
def test_delete_tag_dry_run(self): def test_delete_tag_dry_run(self):
response = self.registry.delete_tag("image1", 'test_tag', True, []) response = self.registry.delete_tag("image1", 'test_tag', True, [])
self.assertFalse(response) self.assertFalse(response)
def test_delete_tag_ok(self): def test_delete_tag_ok(self):
keep_tag_digests = ['DIGEST1', 'DIGEST2'] keep_tag_digests = ['DIGEST1', 'DIGEST2']
response = self.registry.delete_tag('image1', 'test_tag', False, keep_tag_digests) response = self.registry.delete_tag(
'image1', 'test_tag', False, keep_tag_digests)
self.assertEqual(response, True) self.assertEqual(response, True)
self.assertEqual(self.registry.http.request.call_count, 2) self.assertEqual(self.registry.http.request.call_count, 2)
self.registry.http.request.assert_called_with( self.registry.http.request.assert_called_with(
@@ -367,7 +513,8 @@ class TestDeletion(unittest.TestCase):
self.assertTrue("MOCK_DIGEST_HEADER" in keep_tag_digests) self.assertTrue("MOCK_DIGEST_HEADER" in keep_tag_digests)
def test_delete_tag_ignored(self): def test_delete_tag_ignored(self):
response = self.registry.delete_tag('image1', 'test_tag', False, ['MOCK_DIGEST_HEADER']) response = self.registry.delete_tag(
'image1', 'test_tag', False, ['MOCK_DIGEST_HEADER'])
self.assertEqual(response, True) self.assertEqual(response, True)
self.assertEqual(self.registry.http.request.call_count, 1) self.assertEqual(self.registry.http.request.call_count, 1)
self.registry.http.request.assert_called_with( self.registry.http.request.assert_called_with(
@@ -384,7 +531,9 @@ class TestDeletion(unittest.TestCase):
self.assertFalse(response) self.assertFalse(response)
self.assertEqual(self.registry.last_error, 400) self.assertEqual(self.registry.last_error, 400)
class TestDeleteTagsFunction(unittest.TestCase): class TestDeleteTagsFunction(unittest.TestCase):
def setUp(self): def setUp(self):
self.registry = Registry() self.registry = Registry()
self.delete_mock = MagicMock() self.delete_mock = MagicMock()
@@ -408,10 +557,11 @@ class TestDeleteTagsFunction(unittest.TestCase):
) )
def test_delete_tags_keep(self): def test_delete_tags_keep(self):
digest_mock = MagicMock(return_value = "DIGEST_MOCK") digest_mock = MagicMock(return_value="DIGEST_MOCK")
self.registry.get_tag_digest = digest_mock self.registry.get_tag_digest = digest_mock
delete_tags(self.registry, "imagename", False, ["tag1", "tag2"], ["tag2"]) delete_tags(self.registry, "imagename",
False, ["tag1", "tag2"], ["tag2"])
digest_mock.assert_called_with("imagename", "tag2") digest_mock.assert_called_with("imagename", "tag2")
@@ -423,9 +573,10 @@ class TestDeleteTagsFunction(unittest.TestCase):
) )
def test_delete_tags_digest_none(self): def test_delete_tags_digest_none(self):
digest_mock = MagicMock(return_value = None) digest_mock = MagicMock(return_value=None)
self.registry.get_tag_digest = digest_mock self.registry.get_tag_digest = digest_mock
delete_tags(self.registry, "imagename", False, ["tag1", "tag2"], ["tag2"]) delete_tags(self.registry, "imagename",
False, ["tag1", "tag2"], ["tag2"])
digest_mock.assert_called_with("imagename", "tag2") digest_mock.assert_called_with("imagename", "tag2")
@@ -437,7 +588,62 @@ class TestDeleteTagsFunction(unittest.TestCase):
) )
class TestDeleteTagsByAge(unittest.TestCase):
def setUp(self):
self.registry = Registry()
self.registry.http = MockRequests()
self.get_tag_config_mock = MagicMock(return_value={'mediaType': 'application/vnd.docker.container.image.v1+json', 'size': 12953,
'digest': 'sha256:8d71dfbf239c0015ad66993d55d3954cee2d52d86f829fdff9ccfb9f23b75aa8'})
self.registry.get_tag_config = self.get_tag_config_mock
self.get_image_age_mock = MagicMock(
return_value="2017-12-27T12:47:33.511765448Z")
self.registry.get_image_age = self.get_image_age_mock
self.list_tags_mock = MagicMock(return_value=["image"])
self.registry.list_tags = self.list_tags_mock
self.get_tag_digest_mock = MagicMock()
self.registry.get_tag_digest = self.get_tag_digest_mock
self.registry.http = MockRequests()
self.registry.hostname = "http://testdomain.com"
self.registry.http.reset_return_value(200, "MOCK_DIGEST")
@patch('registry.delete_tags')
def test_delete_tags_by_age_no_keep(self, delete_tags_patched):
delete_tags_by_age(self.registry, "imagename", False, 24, [])
self.list_tags_mock.assert_called_with(
"imagename"
)
self.list_tags_mock.assert_called_with("imagename")
self.get_tag_config_mock.assert_called_with("imagename", "image")
delete_tags_patched.assert_called_with(
self.registry, "imagename", False, ["image"], [])
@patch('registry.delete_tags')
def test_delete_tags_by_age_keep_tags(self, delete_tags_patched):
delete_tags_by_age(self.registry, "imagename", False, 24, ["latest"])
self.list_tags_mock.assert_called_with(
"imagename"
)
self.list_tags_mock.assert_called_with("imagename")
self.get_tag_config_mock.assert_called_with("imagename", "image")
delete_tags_patched.assert_called_with(
self.registry, "imagename", False, ["image"], ["latest"])
@patch('registry.delete_tags')
def test_delete_tags_by_age_dry_run(self, delete_tags_patched):
delete_tags_by_age(self.registry, "imagename", True, 24, ["latest"])
self.list_tags_mock.assert_called_with(
"imagename"
)
self.list_tags_mock.assert_called_with("imagename")
self.get_tag_config_mock.assert_called_with("imagename", "image")
delete_tags_patched.assert_called_with(
self.registry, "imagename", True, ["image"], ["latest"])
class TestArgParser(unittest.TestCase): class TestArgParser(unittest.TestCase):
def test_no_args(self): def test_no_args(self):
with self.assertRaises(SystemExit): with self.assertRaises(SystemExit):
parse_args("") parse_args("")
@@ -453,7 +659,8 @@ class TestArgParser(unittest.TestCase):
"--tags-like", "tags_like_text", "--tags-like", "tags_like_text",
"--no-validate-ssl", "--no-validate-ssl",
"--delete-all", "--delete-all",
"--layers"] "--layers",
"--delete-by-hours", "24"]
args = parse_args(args_list) args = parse_args(args_list)
self.assertTrue(args.delete) self.assertTrue(args.delete)
self.assertTrue(args.layers) self.assertTrue(args.layers)
@@ -466,6 +673,7 @@ class TestArgParser(unittest.TestCase):
self.assertEqual(args.tags_like, ["tags_like_text"]) self.assertEqual(args.tags_like, ["tags_like_text"])
self.assertEqual(args.host, "hostname") self.assertEqual(args.host, "hostname")
self.assertEqual(args.keep_tags, ["keep1", "keep2"]) self.assertEqual(args.keep_tags, ["keep1", "keep2"])
self.assertEqual(args.delete_by_hours, "24")
if __name__ == '__main__': if __name__ == '__main__':