more tests
This commit is contained in:
68
registry.py
68
registry.py
@@ -146,12 +146,12 @@ class Registry:
|
||||
|
||||
return tags_list
|
||||
|
||||
def list_tags_like(self, tag_like, args_tags_like):
|
||||
for tag_like in args_tags_like:
|
||||
print "tag like: {0}".format(tag_like)
|
||||
for tag in all_tags_list:
|
||||
if re.search(tag_like, tag):
|
||||
print "Adding {0} to tags list".format(tag)
|
||||
# def list_tags_like(self, tag_like, args_tags_like):
|
||||
# for tag_like in args_tags_like:
|
||||
# print("tag like: {0}".format(tag_like))
|
||||
# for tag in all_tags_list:
|
||||
# if re.search(tag_like, tag):
|
||||
# print("Adding {0} to tags list".format(tag))
|
||||
|
||||
def get_tag_digest(self, image_name, tag):
|
||||
image_headers = self.send("/v2/{0}/manifests/{1}".format(
|
||||
@@ -167,13 +167,13 @@ class Registry:
|
||||
|
||||
def delete_tag(self, image_name, tag, dry_run, tag_digests_to_ignore):
|
||||
if dry_run:
|
||||
print 'would delete tag {0}'.format(tag)
|
||||
print('would delete tag {0}'.format(tag))
|
||||
return False
|
||||
|
||||
tag_digest = self.get_tag_digest(image_name, tag)
|
||||
|
||||
if tag_digest in tag_digests_to_ignore:
|
||||
print "Digest {0} for tag {1} is referenced by another tag or has already been deleted and will be ignored".format(tag_digest, tag)
|
||||
print("Digest {0} for tag {1} is referenced by another tag or has already been deleted and will be ignored".format(tag_digest, tag))
|
||||
return True
|
||||
|
||||
if tag_digest == None:
|
||||
@@ -183,30 +183,30 @@ class Registry:
|
||||
image_name, tag_digest), method="DELETE")
|
||||
|
||||
if delete_result == None:
|
||||
print "failed, error: {0}".format(self.last_error)
|
||||
print("failed, error: {0}".format(self.last_error))
|
||||
return False
|
||||
|
||||
tag_digests_to_ignore.append(tag_digest)
|
||||
|
||||
print "done"
|
||||
print("done")
|
||||
return True
|
||||
|
||||
# this function is not used and thus not tested
|
||||
def delete_tag_layer(self, image_name, layer_digest, dry_run):
|
||||
if dry_run:
|
||||
print 'would delete layer {0}'.format(layer_digest)
|
||||
print('would delete layer {0}'.format(layer_digest))
|
||||
return False
|
||||
|
||||
print 'deleting layer {0}'.format(layer_digest),
|
||||
print('deleting layer {0}'.format(layer_digest),)
|
||||
|
||||
delete_result = self.send('/v2/{0}/blobs/{1}'.format(
|
||||
image_name, layer_digest), method='DELETE')
|
||||
|
||||
if delete_result == None:
|
||||
print "failed, error: {0}".format(self.last_error)
|
||||
print("failed, error: {0}".format(self.last_error))
|
||||
return False
|
||||
|
||||
print "done"
|
||||
print("done")
|
||||
return True
|
||||
|
||||
|
||||
@@ -215,7 +215,7 @@ class Registry:
|
||||
image_name, tag))
|
||||
|
||||
if layers_result == None:
|
||||
print "error {0}".format(self.last_error)
|
||||
print("error {0}".format(self.last_error))
|
||||
return []
|
||||
|
||||
json_result = json.loads(layers_result.text)
|
||||
@@ -340,16 +340,16 @@ def delete_tags(
|
||||
keep_tag_digests = []
|
||||
|
||||
if tags_to_keep:
|
||||
print "Getting digests for tags to keep:"
|
||||
print("Getting digests for tags to keep:")
|
||||
for tag in tags_to_keep:
|
||||
|
||||
print "Getting digest for tag {0}".format(tag)
|
||||
print("Getting digest for tag {0}".format(tag))
|
||||
digest = registry.get_tag_digest(image_name, tag)
|
||||
if digest is None:
|
||||
print "Tag {0} does not exist for image {1}. Ignore here.".format(tag, image_name)
|
||||
print("Tag {0} does not exist for image {1}. Ignore here.".format(tag, image_name))
|
||||
continue
|
||||
|
||||
print "Keep digest {0} for tag {1}".format(digest, tag)
|
||||
print("Keep digest {0} for tag {1}".format(digest, tag))
|
||||
|
||||
keep_tag_digests.append(digest)
|
||||
|
||||
@@ -357,7 +357,7 @@ def delete_tags(
|
||||
if tag in tags_to_keep:
|
||||
continue
|
||||
|
||||
print " deleting tag {0}".format(tag)
|
||||
print(" deleting tag {0}".format(tag))
|
||||
|
||||
## deleting layers is disabled because
|
||||
## it also deletes shared layers
|
||||
@@ -371,10 +371,10 @@ def delete_tags(
|
||||
def get_tags_like(args_tags_like, tags_list):
|
||||
result = set()
|
||||
for tag_like in args_tags_like:
|
||||
print "tag like: {0}".format(tag_like)
|
||||
print("tag like: {0}".format(tag_like))
|
||||
for tag in tags_list:
|
||||
if re.search(tag_like, tag):
|
||||
print "Adding {0} to tags list".format(tag)
|
||||
print("Adding {0} to tags list".format(tag))
|
||||
result.add(tag)
|
||||
return result
|
||||
|
||||
@@ -389,7 +389,7 @@ def get_tags(all_tags_list, image_name, tags_like):
|
||||
# get tags from image name if any
|
||||
if ":" in image_name:
|
||||
(image_name, tag_name) = image_name.split(":")
|
||||
result = set(tag_name)
|
||||
result = set([tag_name])
|
||||
|
||||
return result
|
||||
|
||||
@@ -402,7 +402,7 @@ def main_loop(args):
|
||||
|
||||
registry = Registry.create(args.host, args.login, args.no_validate_ssl)
|
||||
if args.delete:
|
||||
print "Will delete all but {0} last tags".format(keep_last_versions)
|
||||
print("Will delete all but {0} last tags".format(keep_last_versions))
|
||||
|
||||
if args.image != None:
|
||||
image_list = args.image
|
||||
@@ -412,28 +412,28 @@ def main_loop(args):
|
||||
# loop through registry's images
|
||||
# or through the ones given in command line
|
||||
for image_name in image_list:
|
||||
print "---------------------------------"
|
||||
print "Image: {0}".format(image_name)
|
||||
print("---------------------------------")
|
||||
print("Image: {0}".format(image_name))
|
||||
|
||||
all_tags_list = registry.list_tags(image_name)
|
||||
|
||||
if not all_tags_list:
|
||||
print " no tags!"
|
||||
print(" no tags!")
|
||||
continue
|
||||
|
||||
tags_list = get_tags(all_tags_list, image_name, args.tags_like)
|
||||
|
||||
# print tags and optionally layers
|
||||
# print(tags and optionally layers
|
||||
for tag in tags_list:
|
||||
print " tag: {0}".format(tag)
|
||||
print(" tag: {0}".format(tag))
|
||||
if args.layers:
|
||||
for layer in registry.list_tag_layers(image_name, tag):
|
||||
if layer.has_key('size'):
|
||||
print " layer: {0}, size: {1}".format(
|
||||
layer['digest'], layer['size'])
|
||||
print(" layer: {0}, size: {1}".format(
|
||||
layer['digest'], layer['size']))
|
||||
else:
|
||||
print " layer: {0}".format(
|
||||
layer['blobSum'])
|
||||
print(" layer: {0}".format(
|
||||
layer['blobSum']))
|
||||
|
||||
# add tags to "tags_to_keep" list, if we have regexp "tags_to_keep" entries:
|
||||
if args.keep_tags_like:
|
||||
@@ -456,6 +456,6 @@ if __name__ == "__main__":
|
||||
try:
|
||||
main_loop(args)
|
||||
except KeyboardInterrupt:
|
||||
print "Ctrl-C pressed, quitting"
|
||||
print("Ctrl-C pressed, quitting")
|
||||
exit(1)
|
||||
|
||||
|
||||
13
test.py
13
test.py
@@ -21,6 +21,15 @@ class MockRequests:
|
||||
self.return_value.text = text
|
||||
|
||||
|
||||
class TestRequestsClass(unittest.TestCase):
|
||||
def test_requests_created(self):
|
||||
# simply create requests class and make sure it raises an exception
|
||||
# from requests module
|
||||
# this test will fail if port 45272 is open on local machine
|
||||
# is so, either change port below or check what is this service you are
|
||||
# running on port 45272
|
||||
with self.assertRaises(requests.exceptions.ConnectionError):
|
||||
Requests().request("GET", "http://localhost:45272")
|
||||
|
||||
|
||||
class TestCreateMethod(unittest.TestCase):
|
||||
@@ -197,6 +206,10 @@ class TestListTags(unittest.TestCase):
|
||||
set(['FINAL_0.1', 'SNAPSHOT_0.1', "0.1.SNAP", "1.0.0_FINAL"]))
|
||||
self.assertEqual(get_tags(tags_list, "", set(["ABSENT"])), set())
|
||||
|
||||
self.assertEqual(get_tags(tags_list, "IMAGE:TAG00", ""), set(["TAG00"]))
|
||||
self.assertEqual(get_tags(tags_list, "IMAGE:TAG00", set(["WILL_NOT_BE_CONSIDERED"])), set(["TAG00"]))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user