refactor to fix original tests

This commit is contained in:
Дмитрий Есарев
2018-02-26 18:41:07 +05:00
parent 92a2925fa3
commit 7de41084ca

View File

@@ -39,14 +39,70 @@ from datetime import timedelta, datetime as dt
# number of image versions to keep # number of image versions to keep
CONST_KEEP_LAST_VERSIONS = 10 CONST_KEEP_LAST_VERSIONS = 10
# print debug messages
DEBUG = False
# this class is created for testing # this class is created for testing
class Requests: class Requests:
def request(self, method, url, **kwargs): def request(self, method, url, **kwargs):
return requests.request(method, url, **kwargs) return requests.request(method, url, **kwargs)
def bearer_request(self, method, url, auth, **kwargs):
global DEBUG
if DEBUG: print("[debug][funcname]: bearer_request()")
if DEBUG:
print('[debug][registry][request]: {0} {1}'.format(method, url))
if 'Authorization' in kwargs['headers']:
print('[debug][registry][request]: Authorization header:')
token_parsed = kwargs['headers']['Authorization'].split('.')
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0])))
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1])))
res = requests.request(method, url, **kwargs)
if str(res.status_code)[0] == '2':
if DEBUG: print("[debug][registry] accepted")
return (res, kwargs['headers']['Authorization'])
if res.status_code == 401:
if DEBUG: print("[debug][registry] Access denied. Refreshing token...")
oauth = www_authenticate.parse(res.headers['Www-Authenticate'])
if DEBUG:
print('[debug][auth][answer] Auth header:')
pprint.pprint(oauth['bearer'])
# print('[info] retreiving bearer token for {0}'.format(oauth['bearer']['scope']))
request_url = '{0}?service={1}&scope={2}'.format(oauth['bearer']['realm'],
oauth['bearer']['service'],
oauth['bearer']['scope'])
if DEBUG:
print('[debug][auth][request] Refreshing auth token: POST {0}'.format(request_url))
try_oauth = requests.post(request_url, auth=auth, **kwargs)
try:
token = ast.literal_eval(try_oauth._content)['token']
except SyntaxError:
print('\n\n[ERROR] couldnt accure token: {0}'.format(try_oauth._content))
sys.exit(1)
if DEBUG:
print('[debug][auth] token issued: ')
token_parsed=token.split('.')
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0])))
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1])))
kwargs['headers']['Authorization'] = 'Bearer {0}'.format(token)
else:
return (res, kwargs['headers']['Authorization'])
res = requests.request(method, url, **kwargs)
return (res, kwargs['headers']['Authorization'])
def natural_keys(text): def natural_keys(text):
""" """
@@ -76,26 +132,26 @@ def decode_base64(data):
return base64.decodestring(data) return base64.decodestring(data)
def get_auth_schemes(r,path, debug): def get_auth_schemes(r,path):
""" Returns list of auth schemes(lowcased) if www-authenticate: header exists """ Returns list of auth schemes(lowcased) if www-authenticate: header exists
returns None if no header found returns None if no header found
- www-authenticate: basic - www-authenticate: basic
- www-authenticate: bearer - www-authenticate: bearer
""" """
if debug: print("[debug][funcname]: get_auth_schemes()") if DEBUG: print("[debug][funcname]: get_auth_schemes()")
try_oauth = requests.head('{0}{1}'.format(r.hostname,path)) try_oauth = requests.head('{0}{1}'.format(r.hostname,path))
if 'Www-Authenticate' in try_oauth.headers: if 'Www-Authenticate' in try_oauth.headers:
oauth = www_authenticate.parse(try_oauth.headers['Www-Authenticate']) oauth = www_authenticate.parse(try_oauth.headers['Www-Authenticate'])
if debug: if DEBUG:
print('[debug][docker] Auth schemes found:{0}'.format([m for m in oauth])) print('[debug][docker] Auth schemes found:{0}'.format([m for m in oauth]))
return [m.lower() for m in oauth] return [m.lower() for m in oauth]
else: else:
if debug: if DEBUG:
print('[debug][docker] No Auth schemes found') print('[debug][docker] No Auth schemes found')
return None return []
# class to manipulate registry # class to manipulate registry
class Registry: class Registry:
@@ -107,12 +163,11 @@ class Registry:
def __init__(self): def __init__(self):
self.username = None self.username = None
self.password = None self.password = None
self.auth_schemes = None self.auth_schemes = []
self.hostname = None self.hostname = None
self.no_validate_ssl = False self.no_validate_ssl = False
self.http = None self.http = None
self.last_error = None self.last_error = None
self.debug = False
def parse_login(self, login): def parse_login(self, login):
if login is not None: if login is not None:
@@ -131,7 +186,7 @@ class Registry:
@staticmethod @staticmethod
def create(host, login, no_validate_ssl, debug): def create(host, login, no_validate_ssl):
r = Registry() r = Registry()
(r.username, r.password) = r.parse_login(login) (r.username, r.password) = r.parse_login(login)
@@ -139,77 +194,18 @@ class Registry:
print(r.last_error) print(r.last_error)
exit(1) exit(1)
r.debug = debug
r.hostname = host r.hostname = host
r.no_validate_ssl = no_validate_ssl r.no_validate_ssl = no_validate_ssl
r.auth_schemes = get_auth_schemes(r,'/v2/_catalog',debug)
r.http = Requests() r.http = Requests()
return r return r
def bearer_request(self, method, url, **kwargs):
if self.debug: print("[debug][funcname]: bearer_request()")
if self.username is None:
oauth_creds = ('', '')
else:
oauth_creds = (self.username, self.password)
if self.debug:
print('[debug][registry][request]: {0} {1}'.format(method, url))
if 'Authorization' in kwargs['headers']:
print('[debug][registry][request]: Authorization header:')
token_parsed = kwargs['headers']['Authorization'].split('.')
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0])))
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1])))
res = requests.request(method, url, **kwargs)
if str(res.status_code)[0] == '2':
if self.debug: print("[debug][registry] accepted")
return res
if res.status_code == 401:
if self.debug: print("[debug][registry] Access denied. Refreshing token...")
oauth = www_authenticate.parse(res.headers['Www-Authenticate'])
if self.debug:
print('[debug][auth][answer] Auth header:')
pprint.pprint(oauth['bearer'])
# print('[info] retreiving bearer token for {0}'.format(oauth['bearer']['scope']))
request_url = '{0}?service={1}&scope={2}'.format(oauth['bearer']['realm'],
oauth['bearer']['service'],
oauth['bearer']['scope'])
if self.debug:
print('[debug][auth][request] Refreshing auth token: POST {0}'.format(request_url))
try_oauth = requests.post(request_url, auth=oauth_creds)
try:
token = ast.literal_eval(try_oauth._content)['token']
except SyntaxError:
print('\n\n[ERROR] couldnt accure token: {0}'.format(try_oauth._content))
sys.exit(1)
if self.debug:
print('[debug][auth] token issued: ')
token_parsed=token.split('.')
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0])))
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1])))
kwargs['headers']['Authorization'] = self.HEADERS['Authorization'] = 'Bearer {0}'.format(token)
else:
return res
return requests.request(method, url, **kwargs)
def send(self, path, method="GET"): def send(self, path, method="GET"):
# token-based auth detected in create method
if 'bearer' in self.auth_schemes: if 'bearer' in self.auth_schemes:
result = self.bearer_request( (result, self.HEADERS['Authorization']) = self.http.bearer_request(
method, "{0}{1}".format(self.hostname, path), method, "{0}{1}".format(self.hostname, path),
auth=(('', '') if self.username in ["", None]
else (self.username, self.password)),
headers=self.HEADERS, headers=self.HEADERS,
verify=not self.no_validate_ssl) verify=not self.no_validate_ssl)
else: else:
@@ -341,8 +337,10 @@ class Registry:
if 'bearer' in self.auth_schemes: if 'bearer' in self.auth_schemes:
container_header['Authorization'] = self.HEADERS['Authorization'] container_header['Authorization'] = self.HEADERS['Authorization']
response = self.bearer_request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format( (response, self.HEADERS['Authorization']) = self.http.bearer_request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format(
image_name, image_config['digest'])), image_name, image_config['digest'])),
auth=(('', '') if self.username in ["", None]
else (self.username, self.password)),
headers=container_header, headers=container_header,
verify=not self.no_validate_ssl) verify=not self.no_validate_ssl)
else: else:
@@ -574,13 +572,19 @@ def delete_tags_by_age(registry, image_name, dry_run, hours, tags_to_keep):
def main_loop(args): def main_loop(args):
global DEBUG
DEBUG = True if args.debug else False
keep_last_versions = int(args.num) keep_last_versions = int(args.num)
if args.no_validate_ssl: if args.no_validate_ssl:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
registry = Registry.create(args.host, args.login, args.no_validate_ssl, args.debug) registry = Registry.create(args.host, args.login, args.no_validate_ssl)
registry.auth_schemes = get_auth_schemes(registry,'/v2/_catalog')
if args.delete: if args.delete:
print("Will delete all but {0} last tags".format(keep_last_versions)) print("Will delete all but {0} last tags".format(keep_last_versions))