From 2c632c90862093de7f74f2bcae2e9d4c7c558ccb Mon Sep 17 00:00:00 2001 From: Andrey Pohilko Date: Fri, 10 Feb 2017 19:46:11 +0600 Subject: [PATCH] added tests, made few changes in registry.py --- .gitignore | 3 + registry.py | 91 ++++++++++-------- requirements.txt | 2 + test.py | 240 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 298 insertions(+), 38 deletions(-) create mode 100644 .gitignore create mode 100644 requirements.txt create mode 100644 test.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5cc6066 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +# Created by .ignore support plugin (hsz.mobi) +.gitignore +.idea/ diff --git a/registry.py b/registry.py index 927312c..8d11025 100755 --- a/registry.py +++ b/registry.py @@ -34,31 +34,41 @@ import argparse # number of image versions to keep CONST_KEEP_LAST_VERSIONS = 10 +# this class is created for testing +class Requests: + def request(self, method, url, **kwargs): + return requests.request(method, url, **kwargs) # class to manipulate registry class Registry: - username = "" - password = "" - hostname = "" - no_validate_ssl = False; # this is required for proper digest processing HEADERS = {"Accept": "application/vnd.docker.distribution.manifest.v2+json"} - # store last error if any - __error = None + def __init__(self): + self.username = None + self.password = None + self.hostname = None + self.no_validate_ssl = False + self.http = None + self.last_error = None - def __init__(self, host, login, no_validate_ssl): + @staticmethod + def create(host, login, no_validate_ssl): + r = Registry if login != None: if not ':' in login: - print "Please provide -l in the form USER:PASSWORD" + r.last_error = "Please provide -l in the form USER:PASSWORD" + print(r.last_error) exit(1) + (r.username, r.password) = login.split(':') - (self.username, self.password) = login.split(':') + r.hostname = host + r.no_validate_ssl = no_validate_ssl + r.http = Requests + return r - self.hostname = host - self.no_validate_ssl = no_validate_ssl @staticmethod def __atoi(text): @@ -73,25 +83,26 @@ class Registry: ''' return [ Registry.__atoi(c) for c in re.split('(\d+)', text) ] - def send(self, path, method="GET"): - try: - result = requests.request( - method, "{0}{1}".format(self.hostname, path), - headers = self.HEADERS, - auth=(None if self.username == "" - else (self.username, self.password)), - verify = not self.no_validate_ssl) - except Exception as error: - print "cannot connect to {0}\nerror {1}".format( - self.hostname, - error) - exit(1) + def send(self, path, method="GET"): + # try: + result = self.http.request( + method, "{0}{1}".format(self.hostname, path), + headers = self.HEADERS, + auth=(None if self.username == "" + else (self.username, self.password)), + verify = not self.no_validate_ssl) + + # except Exception as error: + # print("cannot connect to {0}\nerror {1}".format( + # self.hostname, + # error)) + # exit(1) if str(result.status_code)[0] == '2': - self.__error = None + self.last_error = None return result - self.__error=result.status_code + self.last_error=result.status_code return None def list_images(self): @@ -106,7 +117,11 @@ class Registry: if result == None: return [] - tags_list = json.loads(result.text)['tags'] + try: + tags_list = json.loads(result.text)['tags'] + except ValueError: + self.last_error = "list_tags: invalid json response" + return [] if tags_list != None: tags_list.sort(key=Registry.natural_keys) @@ -118,8 +133,7 @@ class Registry: image_name, tag), method="HEAD") if image_headers == None: - - print " tag digest not found: {0}".format(self.__error) + print(" tag digest not found: {0}".format(self.last_error)) return None tag_digest = image_headers.headers['Docker-Content-Digest'] @@ -144,7 +158,7 @@ class Registry: image_name, tag_digest), method="DELETE") if delete_result == None: - print "failed, error: {0}".format(self.__error) + print "failed, error: {0}".format(self.last_error) return False tag_digests_to_ignore.append(tag_digest) @@ -163,7 +177,7 @@ class Registry: image_name, layer_digest), method='DELETE') if delete_result == None: - print "failed, error: {0}".format(self.__error) + print "failed, error: {0}".format(self.last_error) return False print "done" @@ -175,17 +189,18 @@ class Registry: image_name, tag)) if layers_result == None: - print "error {0}".format(self.__error) + print "error {0}".format(self.last_error) return [] - if json.loads(layers_result.text)['schemaVersion'] == 1: - layers = json.loads(layers_result.text)['fsLayers'] + json_result = json.loads(layers_result.text) + if json_result['schemaVersion'] == 1: + layers = json_result['fsLayers'] else: - layers = json.loads(layers_result.text)['layers'] + layers = json_result['layers'] return layers -def parse_args(): +def parse_args(args = None): parser = argparse.ArgumentParser( description="List or delete images from Docker registry", formatter_class=argparse.RawDescriptionHelpFormatter, @@ -290,7 +305,7 @@ for more detail on garbage collection read here: const=True) - return parser.parse_args() + return parser.parse_args(args) def delete_tags( @@ -335,7 +350,7 @@ def main_loop(args): if args.no_validate_ssl: requests.packages.urllib3.disable_warnings(InsecureRequestWarning) - registry = Registry(args.host, args.login, args.no_validate_ssl) + registry = Registry.create(args.host, args.login, args.no_validate_ssl) if args.delete: print "Will delete all but {0} last tags".format(keep_last_versions) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d5f93b9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests +mock \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..8bd56ee --- /dev/null +++ b/test.py @@ -0,0 +1,240 @@ +import unittest +from registry import Registry +from mock import MagicMock + + +class ReturnValue: + def __init__(self, status_code = 200, text = ""): + self.status_code = status_code + self.text = text + + +class TestRequests: + + def __init__(self, return_value = ReturnValue()): + self.return_value = return_value + self.request = MagicMock(return_value = self.return_value) + + def reset_return_value(self, status_code = 200, text = ""): + self.return_value.status_code = status_code + self.return_value.text = text + + +class TestRegistrySend(unittest.TestCase): + + def setUp(self): + self.registry = Registry() + self.registry.http = TestRequests() + self.registry.hostname = "http://testdomain.com" + + def test_get_ok(self): + self.registry.http.reset_return_value(200) + response = self.registry.send('/test_string') + + self.assertEqual(response.status_code, 200) + self.assertEqual(self.registry.last_error, None) + self.registry.http.request.assert_called_with('GET', + 'http://testdomain.com/test_string', + auth = (None, None), + headers = self.registry.HEADERS, + verify = True) + + def test_invalid_status_code(self): + self.registry.http.reset_return_value(400) + response = self.registry.send('/v2/catalog') + self.assertEqual(response, None) + self.assertEqual(self.registry.last_error, 400) + + + def test_login_pass(self): + self.registry.username = "test_login" + self.registry.password = "test_password" + self.registry.http.reset_return_value(200) + response = self.registry.send('/v2/catalog') + self.assertEqual(response.status_code, 200) + self.registry.http.request.assert_called_with('GET', + 'http://testdomain.com/v2/catalog', + auth = ("test_login", "test_password"), + headers = self.registry.HEADERS, + verify = True) + + +class TestListImages(unittest.TestCase): + + def setUp(self): + self.registry = Registry() + self.registry.http = TestRequests() + self.registry.hostname = "http://testdomain.com" + + def test_list_images_ok(self): + self.registry.http.reset_return_value(status_code = 200, + text = '{"repositories":["image1","image2"]}') + response = self.registry.list_images() + self.assertEqual(response, ["image1", "image2"]) + self.assertEqual(self.registry.last_error, None) + + def test_list_images_invalid_http_response(self): + self.registry.http.reset_return_value(404) + response = self.registry.list_images() + self.assertEqual(response, []) + self.assertEqual(self.registry.last_error, 404) + + +class TestListTags(unittest.TestCase): + def setUp(self): + self.registry = Registry() + self.registry.http = TestRequests() + self.registry.hostname = "http://testdomain.com" + self.registry.http.reset_return_value(200) + + def test_list_one_tag_ok(self): + self.registry.http.reset_return_value(status_code = 200, + text = u'{"name":"image1","tags":["0.1.306"]}') + + response = self.registry.list_tags('image1') + self.assertEqual(response, ["0.1.306"]) + self.assertEqual(self.registry.last_error, None) + + def test_list_tags_invalid_http_response(self): + self.registry.http.reset_return_value(status_code = 400, + text = "") + + response = self.registry.list_tags('image1') + self.assertEqual(response, []) + self.assertEqual(self.registry.last_error, 400) + + def test_list_tags_invalid_json(self): + self.registry.http.reset_return_value(status_code=200, + text="invalid_json") + + response = self.registry.list_tags('image1') + self.assertEqual(response, []) + self.assertEqual(self.registry.last_error, "list_tags: invalid json response") + + def test_list_tags_ok_sorted(self): + def test_list_one_tag_ok(self): + self.registry.http.reset_return_value(status_code=200, + text=u'{"name":"image1","tags":["0.1.306", "0.1.300", "0.1.290"]}') + + response = self.registry.list_tags('image1') + self.assertEqual(response, ["0.1.290", "0.1.300", "0.1.306"]) + self.assertEqual(self.registry.last_error, None) + + +class TestListDigest(unittest.TestCase): + def setUp(self): + self.registry = Registry() + self.registry.http = TestRequests() + self.registry.hostname = "http://testdomain.com" + self.registry.http.reset_return_value(200) + + def test_get_digest_ok(self): + self.registry.http.reset_return_value(status_code = 200, + text = ('{' + '"schemaVersion": 2,\n ' + '"mediaType": "application/vnd.docker.distribution.manifest.v2+json"' + '"digest": "sha256:357ea8c3d80bc25792e010facfc98aee5972ebc47e290eb0d5aea3671a901cab"' + )) + + self.registry.http.return_value.headers = { + 'Content-Length': '4935', + 'Docker-Content-Digest': 'sha256:85295b0e7456a8fbbc886722b483f87f2bff553fa0beeaf37f5d807aff7c1e52', + 'X-Content-Type-Options': 'nosniff' + } + + response = self.registry.get_tag_digest('image1', '0.1.300') + self.registry.http.request.assert_called_with( + "HEAD", + "http://testdomain.com/v2/image1/manifests/0.1.300", + auth = (None, None), + headers = self.registry.HEADERS, + verify = True + ) + + self.assertEqual(response, 'sha256:85295b0e7456a8fbbc886722b483f87f2bff553fa0beeaf37f5d807aff7c1e52') + + def test_invalid_status_code(self): + self.registry.http.reset_return_value(400) + response = self.registry.get_tag_digest('image1', '0.1.300') + self.assertEqual(response, None) + + def test_invalid_headers(self): + self.registry.http.reset_return_value(200, "invalid json") + self.registry.http.return_value.headers = "invalid headers" + with self.assertRaises(TypeError): + self.registry.get_tag_digest('image1', '0.1.300') + + +class TestListLayers(unittest.TestCase): + def setUp(self): + self.registry = Registry() + self.registry.http = TestRequests() + self.registry.hostname = "http://testdomain.com" + self.registry.http.reset_return_value(200) + + def test_list_layers_schema_version_2_ok(self): + self.registry.http.reset_return_value( + 200, + ''' + { + "schemaVersion": 2, + "layers": "layers_list" + } + ''' + ) + + response = self.registry.list_tag_layers('image1', '0.1.300') + + self.registry.http.request.assert_called_with( + "GET", + "http://testdomain.com/v2/image1/manifests/0.1.300", + auth = (None, None), + headers = self.registry.HEADERS, + verify = True + ) + + self.assertEqual(response, "layers_list") + + + def test_list_layers_schema_version_1_ok(self): + self.registry.http.reset_return_value( + 200, + ''' + { + "schemaVersion": 1, + "fsLayers": "layers_list" + } + ''' + ) + + response = self.registry.list_tag_layers('image1', '0.1.300') + + self.registry.http.request.assert_called_with( + "GET", + "http://testdomain.com/v2/image1/manifests/0.1.300", + auth = (None, None), + headers = self.registry.HEADERS, + verify = True + ) + + self.assertEqual(response, "layers_list") + + def test_list_layers_invalid_status_code(self): + self.registry.http.reset_return_value(400, "whatever") + + response = self.registry.list_tag_layers('image1', '0.1.300') + + self.registry.http.request.assert_called_with( + "GET", + "http://testdomain.com/v2/image1/manifests/0.1.300", + auth=(None, None), + headers=self.registry.HEADERS, + verify=True + ) + + self.assertEqual(response, []) + self.assertEqual(self.registry.last_error, 400) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file