add bearer auth scheme support

This commit is contained in:
Дмитрий Есарев
2018-02-21 20:38:20 +05:00
parent 975edfaefa
commit 9e0bdf3698
2 changed files with 156 additions and 48 deletions

View File

@@ -1,11 +1,15 @@
#!/usr/bin/env python
import requests
from requests.auth import HTTPBasicAuth
import ast
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import json
import pprint
import base64
import re
import sys
import argparse
import www_authenticate
from datetime import timedelta, datetime as dt
# this is a registry manipulator, can do following:
@@ -45,11 +49,11 @@ class Requests:
def natural_keys(text):
'''
"""
alist.sort(key=natural_keys) sorts in human order
http://nedbatchelder.com/blog/200712/human_sorting.html
(See Toothy's implementation in the comments)
'''
"""
def __atoi(text):
return int(text) if text.isdigit() else text
@@ -57,6 +61,42 @@ def natural_keys(text):
return [__atoi(c) for c in re.split('(\d+)', text)]
def decode_base64(data):
"""Decode base64, padding being optional.
:param data: Base64 data as an ASCII byte string
:returns: The decoded byte string.
"""
data = data.replace('Bearer ','')
# print('[debug] base64 string to decode:\n{0}'.format(data))
missing_padding = len(data) % 4
if missing_padding != 0:
data += b'='* (4 - missing_padding)
return base64.decodestring(data)
def get_auth_schemes(r,path, debug):
""" Returns list of auth schemes(lowcased) if www-authenticate: header exists
returns None if no header found
- www-authenticate: basic
- www-authenticate: bearer
"""
if debug: print("[debug][funcname]: get_auth_schemes()")
try_oauth = requests.head('{0}{1}'.format(r.hostname,path))
if 'Www-Authenticate' in try_oauth.headers:
oauth = www_authenticate.parse(try_oauth.headers['Www-Authenticate'])
if debug:
print('[debug][docker] Auth schemes found:{0}'.format([m for m in oauth]))
return [m.lower() for m in oauth]
else:
if debug:
print('[debug][docker] No Auth schemes found')
return None
# class to manipulate registry
class Registry:
@@ -67,15 +107,17 @@ class Registry:
def __init__(self):
self.username = None
self.password = None
self.auth_schemes = None
self.hostname = None
self.no_validate_ssl = False
self.http = None
self.last_error = None
self.debug = False
def parse_login(self, login):
if login != None:
if login is not None:
if not ':' in login:
if ':' not in login:
self.last_error = "Please provide -l in the form USER:PASSWORD"
return (None, None)
@@ -87,28 +129,96 @@ class Registry:
return (None, None)
@staticmethod
def create(host, login, no_validate_ssl):
def create(host, login, no_validate_ssl, debug):
r = Registry()
(r.username, r.password) = r.parse_login(login)
if r.last_error != None:
if r.last_error is not None:
print(r.last_error)
exit(1)
r.debug = debug
r.hostname = host
r.no_validate_ssl = no_validate_ssl
r.no_validate_ssl = no_validate_ssl
r.auth_schemes = get_auth_schemes(r,'/v2/_catalog',debug)
r.http = Requests()
return r
def bearer_request(self, method, url, **kwargs):
if self.debug: print("[debug][funcname]: bearer_request()")
if self.username is None:
oauth_creds = ('', '')
else:
oauth_creds = (self.username, self.password)
if self.debug:
print('[debug][registry][request]: {0} {1}'.format(method, url))
if 'Authorization' in kwargs['headers']:
print('[debug][registry][request]: Authorization header:')
token_parsed = kwargs['headers']['Authorization'].split('.')
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0])))
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1])))
res = requests.request(method, url, **kwargs)
if str(res.status_code)[0] == '2':
if self.debug: print("[debug][registry] accepted")
return res
if res.status_code == 401:
if self.debug: print("[debug][registry] Access denied. Refreshing token...")
oauth = www_authenticate.parse(res.headers['Www-Authenticate'])
if self.debug:
print('[debug][auth][answer] Auth header:')
pprint.pprint(oauth['bearer'])
# print('[info] retreiving bearer token for {0}'.format(oauth['bearer']['scope']))
request_url = '{0}?service={1}&scope={2}'.format(oauth['bearer']['realm'],
oauth['bearer']['service'],
oauth['bearer']['scope'])
if self.debug:
print('[debug][auth][request] Refreshing auth token: POST {0}'.format(request_url))
try_oauth = requests.post(request_url, auth=oauth_creds)
try:
token = ast.literal_eval(try_oauth._content)['token']
except SyntaxError:
print('\n\n[ERROR] couldnt accure token: {0}'.format(try_oauth._content))
sys.exit(1)
if self.debug:
print('[debug][auth] token issued: ')
token_parsed=token.split('.')
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[0])))
pprint.pprint(ast.literal_eval(decode_base64(token_parsed[1])))
kwargs['headers']['Authorization'] = self.HEADERS['Authorization'] = 'Bearer {0}'.format(token)
else:
return res
return requests.request(method, url, **kwargs)
def send(self, path, method="GET"):
# try:
result = self.http.request(
method, "{0}{1}".format(self.hostname, path),
headers=self.HEADERS,
auth=(None if self.username == ""
else (self.username, self.password)),
verify=not self.no_validate_ssl)
# token-based auth detected in create method
if 'bearer' in self.auth_schemes:
result = self.bearer_request(
method, "{0}{1}".format(self.hostname, path),
headers=self.HEADERS,
verify=not self.no_validate_ssl)
else:
result = self.http.request(
method, "{0}{1}".format(self.hostname, path),
headers=self.HEADERS,
auth=(None if self.username == ""
else (self.username, self.password)),
verify=not self.no_validate_ssl)
# except Exception as error:
# print("cannot connect to {0}\nerror {1}".format(
@@ -124,14 +234,14 @@ class Registry:
def list_images(self):
result = self.send('/v2/_catalog?n=10000')
if result == None:
if result is None:
return []
return json.loads(result.text)['repositories']
def list_tags(self, image_name):
result = self.send("/v2/{0}/tags/list".format(image_name))
if result == None:
if result is None:
return []
try:
@@ -140,7 +250,7 @@ class Registry:
self.last_error = "list_tags: invalid json response"
return []
if tags_list != None:
if tags_list is not None:
tags_list.sort(key=natural_keys)
return tags_list
@@ -156,7 +266,7 @@ class Registry:
image_headers = self.send("/v2/{0}/manifests/{1}".format(
image_name, tag), method="HEAD")
if image_headers == None:
if image_headers is None:
print(" tag digest not found: {0}".format(self.last_error))
return None
@@ -176,13 +286,13 @@ class Registry:
tag_digest, tag))
return True
if tag_digest == None:
if tag_digest is None:
return False
delete_result = self.send("/v2/{0}/manifests/{1}".format(
image_name, tag_digest), method="DELETE")
if delete_result == None:
if delete_result is None:
print("failed, error: {0}".format(self.last_error))
return False
@@ -191,29 +301,12 @@ class Registry:
print("done")
return True
# this function is not used and thus not tested
# def delete_tag_layer(self, image_name, layer_digest, dry_run):
# if dry_run:
# print('would delete layer {0}'.format(layer_digest))
# return False
#
# print('deleting layer {0}'.format(layer_digest),)
#
# delete_result = self.send('/v2/{0}/blobs/{1}'.format(
# image_name, layer_digest), method='DELETE')
#
# if delete_result == None:
# print("failed, error: {0}".format(self.last_error))
# return False
#
# print("done")
# return True
def list_tag_layers(self, image_name, tag):
layers_result = self.send("/v2/{0}/manifests/{1}".format(
image_name, tag))
if layers_result == None:
if layers_result is None:
print("error {0}".format(self.last_error))
return []
@@ -229,7 +322,7 @@ class Registry:
config_result = self.send(
"/v2/{0}/manifests/{1}".format(image_name, tag))
if config_result == None:
if config_result is None:
print(" tag digest not found: {0}".format(self.last_error))
return []
@@ -246,12 +339,19 @@ class Registry:
container_header = {"Accept": "{0}".format(
image_config['mediaType'])}
response = self.http.request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format(
image_name, image_config['digest'])),
headers=container_header,
auth=(None if self.username == ""
else (self.username, self.password)),
verify=not self.no_validate_ssl)
if 'bearer' in self.auth_schemes:
container_header['Authorization'] = self.HEADERS['Authorization']
response = self.bearer_request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format(
image_name, image_config['digest'])),
headers=container_header,
verify=not self.no_validate_ssl)
else:
response = self.http.request("GET", "{0}{1}".format(self.hostname, "/v2/{0}/blobs/{1}".format(
image_name, image_config['digest'])),
headers=container_header,
auth=(None if self.username == ""
else (self.username, self.password)),
verify=not self.no_validate_ssl)
if str(response.status_code)[0] == '2':
self.last_error = None
@@ -311,6 +411,13 @@ for more detail on garbage collection read here:
nargs='?',
metavar='N')
parser.add_argument(
'--debug',
help=('Turn debug output'),
action='store_const',
default=False,
const=True)
parser.add_argument(
'--dry-run',
help=('If used in combination with --delete,'
@@ -473,11 +580,11 @@ def main_loop(args):
if args.no_validate_ssl:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
registry = Registry.create(args.host, args.login, args.no_validate_ssl)
registry = Registry.create(args.host, args.login, args.no_validate_ssl, args.debug)
if args.delete:
print("Will delete all but {0} last tags".format(keep_last_versions))
if args.image != None:
if args.image is not None:
image_list = args.image
else:
image_list = registry.list_images()