diff --git a/conftest.py b/conftest.py index 9667879d..98f98d41 100644 --- a/conftest.py +++ b/conftest.py @@ -82,29 +82,6 @@ def temp_cwd(tmp_path): yield -@pytest.fixture -def pypirc(request, save_env, distutils_managed_tempdir): - from distutils.core import Distribution, PyPIRCCommand - - self = request.instance - self.tmp_dir = self.mkdtemp() - os.environ['HOME'] = self.tmp_dir - os.environ['USERPROFILE'] = self.tmp_dir - self.rc = os.path.join(self.tmp_dir, '.pypirc') - self.dist = Distribution() - - class command(PyPIRCCommand): - def __init__(self, dist): - super().__init__(dist) - - def initialize_options(self): - pass - - finalize_options = initialize_options - - self._cmd = command - - # from pytest-dev/pytest#363 @pytest.fixture(scope="session") def monkeysession(request): diff --git a/distutils/command/__init__.py b/distutils/command/__init__.py index 1e8fbe60..0f8a1692 100644 --- a/distutils/command/__init__.py +++ b/distutils/command/__init__.py @@ -16,10 +16,8 @@ 'install_scripts', 'install_data', 'sdist', - 'register', 'bdist', 'bdist_dumb', 'bdist_rpm', 'check', - 'upload', ] diff --git a/distutils/command/register.py b/distutils/command/register.py deleted file mode 100644 index b3373a3c..00000000 --- a/distutils/command/register.py +++ /dev/null @@ -1,311 +0,0 @@ -"""distutils.command.register - -Implements the Distutils 'register' command (register with the repository). -""" - -# created 2002/10/21, Richard Jones - -import getpass -import io -import logging -import urllib.parse -import urllib.request -import warnings -from distutils._log import log - -from more_itertools import always_iterable - -from ..core import PyPIRCCommand - - -class register(PyPIRCCommand): - description = "register the distribution with the Python package index" - user_options = PyPIRCCommand.user_options + [ - ('list-classifiers', None, 'list the valid Trove classifiers'), - ( - 'strict', - None, - 'Will stop the registering if the meta-data are not fully compliant', - ), - ] - boolean_options = PyPIRCCommand.boolean_options + [ - 'verify', - 'list-classifiers', - 'strict', - ] - - sub_commands = [('check', lambda self: True)] - - def initialize_options(self): - PyPIRCCommand.initialize_options(self) - self.list_classifiers = False - self.strict = False - - def finalize_options(self): - PyPIRCCommand.finalize_options(self) - # setting options for the `check` subcommand - check_options = { - 'strict': ('register', self.strict), - 'restructuredtext': ('register', 1), - } - self.distribution.command_options['check'] = check_options - - def run(self): - warnings.warn("register command is deprecated. Do not use.") - self.finalize_options() - self._set_config() - - # Run sub commands - for cmd_name in self.get_sub_commands(): - self.run_command(cmd_name) - - if self.dry_run: - self.verify_metadata() - elif self.list_classifiers: - self.classifiers() - else: - self.send_metadata() - - def _set_config(self): - """Reads the configuration file and set attributes.""" - config = self._read_pypirc() - if config != {}: - self.username = config['username'] - self.password = config['password'] - self.repository = config['repository'] - self.realm = config['realm'] - self.has_config = True - else: - if self.repository not in ('pypi', self.DEFAULT_REPOSITORY): - raise ValueError(f'{self.repository} not found in .pypirc') - if self.repository == 'pypi': - self.repository = self.DEFAULT_REPOSITORY - self.has_config = False - - def classifiers(self): - """Fetch the list of classifiers from the server.""" - url = self.repository + '?:action=list_classifiers' - response = urllib.request.urlopen(url) - log.info(self._read_pypi_response(response)) - - def verify_metadata(self): - """Send the metadata to the package index server to be checked.""" - # send the info to the server and report the result - (code, result) = self.post_to_server(self.build_post_data('verify')) - log.info('Server response (%s): %s', code, result) - - def send_metadata(self): # noqa: C901 - """Send the metadata to the package index server. - - Well, do the following: - 1. figure who the user is, and then - 2. send the data as a Basic auth'ed POST. - - First we try to read the username/password from $HOME/.pypirc, - which is a ConfigParser-formatted file with a section - [distutils] containing username and password entries (both - in clear text). Eg: - - [distutils] - index-servers = - pypi - - [pypi] - username: fred - password: sekrit - - Otherwise, to figure who the user is, we offer the user three - choices: - - 1. use existing login, - 2. register as a new user, or - 3. set the password to a random string and email the user. - - """ - # see if we can short-cut and get the username/password from the - # config - if self.has_config: - choice = '1' - username = self.username - password = self.password - else: - choice = 'x' - username = password = '' - - # get the user's login info - choices = '1 2 3 4'.split() - while choice not in choices: - self.announce( - """\ -We need to know who you are, so please choose either: - 1. use your existing login, - 2. register as a new user, - 3. have the server generate a new password for you (and email it to you), or - 4. quit -Your selection [default 1]: """, - logging.INFO, - ) - choice = input() - if not choice: - choice = '1' - elif choice not in choices: - print('Please choose one of the four options!') - - if choice == '1': - # get the username and password - while not username: - username = input('Username: ') - while not password: - password = getpass.getpass('Password: ') - - # set up the authentication - auth = urllib.request.HTTPPasswordMgr() - host = urllib.parse.urlparse(self.repository)[1] - auth.add_password(self.realm, host, username, password) - # send the info to the server and report the result - code, result = self.post_to_server(self.build_post_data('submit'), auth) - self.announce(f'Server response ({code}): {result}', logging.INFO) - - # possibly save the login - if code == 200: - if self.has_config: - # sharing the password in the distribution instance - # so the upload command can reuse it - self.distribution.password = password - else: - self.announce( - ( - 'I can store your PyPI login so future ' - 'submissions will be faster.' - ), - logging.INFO, - ) - self.announce( - f'(the login will be stored in {self._get_rc_file()})', - logging.INFO, - ) - choice = 'X' - while choice.lower() not in 'yn': - choice = input('Save your login (y/N)?') - if not choice: - choice = 'n' - if choice.lower() == 'y': - self._store_pypirc(username, password) - - elif choice == '2': - data = {':action': 'user'} - data['name'] = data['password'] = data['email'] = '' - data['confirm'] = None - while not data['name']: - data['name'] = input('Username: ') - while data['password'] != data['confirm']: - while not data['password']: - data['password'] = getpass.getpass('Password: ') - while not data['confirm']: - data['confirm'] = getpass.getpass(' Confirm: ') - if data['password'] != data['confirm']: - data['password'] = '' - data['confirm'] = None - print("Password and confirm don't match!") - while not data['email']: - data['email'] = input(' EMail: ') - code, result = self.post_to_server(data) - if code != 200: - log.info('Server response (%s): %s', code, result) - else: - log.info('You will receive an email shortly.') - log.info('Follow the instructions in it to complete registration.') - elif choice == '3': - data = {':action': 'password_reset'} - data['email'] = '' - while not data['email']: - data['email'] = input('Your email address: ') - code, result = self.post_to_server(data) - log.info('Server response (%s): %s', code, result) - - def build_post_data(self, action): - # figure the data to send - the metadata plus some additional - # information used by the package server - meta = self.distribution.metadata - data = { - ':action': action, - 'metadata_version': '1.0', - 'name': meta.get_name(), - 'version': meta.get_version(), - 'summary': meta.get_description(), - 'home_page': meta.get_url(), - 'author': meta.get_contact(), - 'author_email': meta.get_contact_email(), - 'license': meta.get_licence(), - 'description': meta.get_long_description(), - 'keywords': meta.get_keywords(), - 'platform': meta.get_platforms(), - 'classifiers': meta.get_classifiers(), - 'download_url': meta.get_download_url(), - # PEP 314 - 'provides': meta.get_provides(), - 'requires': meta.get_requires(), - 'obsoletes': meta.get_obsoletes(), - } - if data['provides'] or data['requires'] or data['obsoletes']: - data['metadata_version'] = '1.1' - return data - - def post_to_server(self, data, auth=None): # noqa: C901 - """Post a query to the server, and return a string response.""" - if 'name' in data: - self.announce( - 'Registering {} to {}'.format(data['name'], self.repository), - logging.INFO, - ) - # Build up the MIME payload for the urllib2 POST data - boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254' - sep_boundary = '\n--' + boundary - end_boundary = sep_boundary + '--' - body = io.StringIO() - for key, values in data.items(): - for value in map(str, make_iterable(values)): - body.write(sep_boundary) - body.write(f'\nContent-Disposition: form-data; name="{key}"') - body.write("\n\n") - body.write(value) - if value and value[-1] == '\r': - body.write('\n') # write an extra newline (lurve Macs) - body.write(end_boundary) - body.write("\n") - body = body.getvalue().encode("utf-8") - - # build the Request - headers = { - 'Content-type': f'multipart/form-data; boundary={boundary}; charset=utf-8', - 'Content-length': str(len(body)), - } - req = urllib.request.Request(self.repository, body, headers) - - # handle HTTP and include the Basic Auth handler - opener = urllib.request.build_opener( - urllib.request.HTTPBasicAuthHandler(password_mgr=auth) - ) - data = '' - try: - result = opener.open(req) - except urllib.error.HTTPError as e: - if self.show_response: - data = e.fp.read() - result = e.code, e.msg - except urllib.error.URLError as e: - result = 500, str(e) - else: - if self.show_response: - data = self._read_pypi_response(result) - result = 200, 'OK' - if self.show_response: - msg = '\n'.join(('-' * 75, data, '-' * 75)) - self.announce(msg, logging.INFO) - return result - - -def make_iterable(values): - if values is None: - return [None] - return always_iterable(values) diff --git a/distutils/command/upload.py b/distutils/command/upload.py deleted file mode 100644 index 3428a6b6..00000000 --- a/distutils/command/upload.py +++ /dev/null @@ -1,211 +0,0 @@ -""" -distutils.command.upload - -Implements the Distutils 'upload' subcommand (upload package to a package -index). -""" - -import hashlib -import io -import logging -import os -import warnings -from base64 import standard_b64encode -from urllib.parse import urlparse -from urllib.request import HTTPError, Request, urlopen - -from more_itertools import always_iterable - -from ..core import PyPIRCCommand -from ..errors import DistutilsError, DistutilsOptionError -from ..spawn import spawn - -# PyPI Warehouse supports MD5, SHA256, and Blake2 (blake2-256) -# https://bugs.python.org/issue40698 -_FILE_CONTENT_DIGESTS = { - "md5_digest": getattr(hashlib, "md5", None), - "sha256_digest": getattr(hashlib, "sha256", None), - "blake2_256_digest": getattr(hashlib, "blake2b", None), -} - - -class upload(PyPIRCCommand): - description = "upload binary package to PyPI" - - user_options = PyPIRCCommand.user_options + [ - ('sign', 's', 'sign files to upload using gpg'), - ('identity=', 'i', 'GPG identity used to sign files'), - ] - - boolean_options = PyPIRCCommand.boolean_options + ['sign'] - - def initialize_options(self): - PyPIRCCommand.initialize_options(self) - self.username = '' - self.password = '' - self.show_response = False - self.sign = False - self.identity = None - - def finalize_options(self): - PyPIRCCommand.finalize_options(self) - if self.identity and not self.sign: - raise DistutilsOptionError("Must use --sign for --identity to have meaning") - config = self._read_pypirc() - if config != {}: - self.username = config['username'] - self.password = config['password'] - self.repository = config['repository'] - self.realm = config['realm'] - - # getting the password from the distribution - # if previously set by the register command - if not self.password and self.distribution.password: - self.password = self.distribution.password - - def run(self): - warnings.warn("upload command is deprecated. Do not use.") - if not self.distribution.dist_files: - msg = ( - "Must create and upload files in one command " - "(e.g. setup.py sdist upload)" - ) - raise DistutilsOptionError(msg) - for command, pyversion, filename in self.distribution.dist_files: - self.upload_file(command, pyversion, filename) - - def upload_file(self, command, pyversion, filename): # noqa: C901 - # Makes sure the repository URL is compliant - schema, netloc, url, params, query, fragments = urlparse(self.repository) - if params or query or fragments: - raise AssertionError(f"Incompatible url {self.repository}") - - if schema not in ('http', 'https'): - raise AssertionError("unsupported schema " + schema) - - # Sign if requested - if self.sign: - gpg_args = ["gpg", "--detach-sign", "-a", filename] - if self.identity: - gpg_args[2:2] = ["--local-user", self.identity] - spawn(gpg_args, dry_run=self.dry_run) - - # Fill in the data - send all the meta-data in case we need to - # register a new release - f = open(filename, 'rb') - try: - content = f.read() - finally: - f.close() - - meta = self.distribution.metadata - data = { - # action - ':action': 'file_upload', - 'protocol_version': '1', - # identify release - 'name': meta.get_name(), - 'version': meta.get_version(), - # file content - 'content': (os.path.basename(filename), content), - 'filetype': command, - 'pyversion': pyversion, - # additional meta-data - 'metadata_version': '1.0', - 'summary': meta.get_description(), - 'home_page': meta.get_url(), - 'author': meta.get_contact(), - 'author_email': meta.get_contact_email(), - 'license': meta.get_licence(), - 'description': meta.get_long_description(), - 'keywords': meta.get_keywords(), - 'platform': meta.get_platforms(), - 'classifiers': meta.get_classifiers(), - 'download_url': meta.get_download_url(), - # PEP 314 - 'provides': meta.get_provides(), - 'requires': meta.get_requires(), - 'obsoletes': meta.get_obsoletes(), - } - - data['comment'] = '' - - # file content digests - for digest_name, digest_cons in _FILE_CONTENT_DIGESTS.items(): - if digest_cons is None: - continue - try: - data[digest_name] = digest_cons(content).hexdigest() - except ValueError: - # hash digest not available or blocked by security policy - pass - - if self.sign: - with open(filename + ".asc", "rb") as f: - data['gpg_signature'] = (os.path.basename(filename) + ".asc", f.read()) - - # set up the authentication - user_pass = (self.username + ":" + self.password).encode('ascii') - # The exact encoding of the authentication string is debated. - # Anyway PyPI only accepts ascii for both username or password. - auth = "Basic " + standard_b64encode(user_pass).decode('ascii') - - # Build up the MIME payload for the POST data - boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254' - sep_boundary = b'\r\n--' + boundary.encode('ascii') - end_boundary = sep_boundary + b'--\r\n' - body = io.BytesIO() - for key, values in data.items(): - title = f'\r\nContent-Disposition: form-data; name="{key}"' - for value in make_iterable(values): - if type(value) is tuple: - title += f'; filename="{value[0]}"' - value = value[1] - else: - value = str(value).encode('utf-8') - body.write(sep_boundary) - body.write(title.encode('utf-8')) - body.write(b"\r\n\r\n") - body.write(value) - body.write(end_boundary) - body = body.getvalue() - - msg = f"Submitting {filename} to {self.repository}" - self.announce(msg, logging.INFO) - - # build the Request - headers = { - 'Content-type': f'multipart/form-data; boundary={boundary}', - 'Content-length': str(len(body)), - 'Authorization': auth, - } - - request = Request(self.repository, data=body, headers=headers) - # send the data - try: - result = urlopen(request) - status = result.getcode() - reason = result.msg - except HTTPError as e: - status = e.code - reason = e.msg - except OSError as e: - self.announce(str(e), logging.ERROR) - raise - - if status == 200: - self.announce(f'Server response ({status}): {reason}', logging.INFO) - if self.show_response: - text = self._read_pypi_response(result) - msg = '\n'.join(('-' * 75, text, '-' * 75)) - self.announce(msg, logging.INFO) - else: - msg = f'Upload failed ({status}): {reason}' - self.announce(msg, logging.ERROR) - raise DistutilsError(msg) - - -def make_iterable(values): - if values is None: - return [None] - return always_iterable(values, base_type=(bytes, str, tuple)) diff --git a/distutils/config.py b/distutils/config.py deleted file mode 100644 index ebd2e11d..00000000 --- a/distutils/config.py +++ /dev/null @@ -1,151 +0,0 @@ -"""distutils.pypirc - -Provides the PyPIRCCommand class, the base class for the command classes -that uses .pypirc in the distutils.command package. -""" - -import email.message -import os -from configparser import RawConfigParser - -from .cmd import Command - -DEFAULT_PYPIRC = """\ -[distutils] -index-servers = - pypi - -[pypi] -username:%s -password:%s -""" - - -class PyPIRCCommand(Command): - """Base command that knows how to handle the .pypirc file""" - - DEFAULT_REPOSITORY = 'https://upload.pypi.org/legacy/' - DEFAULT_REALM = 'pypi' - repository = None - realm = None - - user_options = [ - ('repository=', 'r', f"url of repository [default: {DEFAULT_REPOSITORY}]"), - ('show-response', None, 'display full response text from server'), - ] - - boolean_options = ['show-response'] - - def _get_rc_file(self): - """Returns rc file path.""" - return os.path.join(os.path.expanduser('~'), '.pypirc') - - def _store_pypirc(self, username, password): - """Creates a default .pypirc file.""" - rc = self._get_rc_file() - raw = os.open(rc, os.O_CREAT | os.O_WRONLY, 0o600) - with os.fdopen(raw, 'w', encoding='utf-8') as f: - f.write(DEFAULT_PYPIRC % (username, password)) - - def _read_pypirc(self): # noqa: C901 - """Reads the .pypirc file.""" - rc = self._get_rc_file() - if os.path.exists(rc): - self.announce(f'Using PyPI login from {rc}') - repository = self.repository or self.DEFAULT_REPOSITORY - - config = RawConfigParser() - config.read(rc, encoding='utf-8') - sections = config.sections() - if 'distutils' in sections: - # let's get the list of servers - index_servers = config.get('distutils', 'index-servers') - _servers = [ - server.strip() - for server in index_servers.split('\n') - if server.strip() != '' - ] - if _servers == []: - # nothing set, let's try to get the default pypi - if 'pypi' in sections: - _servers = ['pypi'] - else: - # the file is not properly defined, returning - # an empty dict - return {} - for server in _servers: - current = {'server': server} - current['username'] = config.get(server, 'username') - - # optional params - for key, default in ( - ('repository', self.DEFAULT_REPOSITORY), - ('realm', self.DEFAULT_REALM), - ('password', None), - ): - if config.has_option(server, key): - current[key] = config.get(server, key) - else: - current[key] = default - - # work around people having "repository" for the "pypi" - # section of their config set to the HTTP (rather than - # HTTPS) URL - if server == 'pypi' and repository in ( - self.DEFAULT_REPOSITORY, - 'pypi', - ): - current['repository'] = self.DEFAULT_REPOSITORY - return current - - if ( - current['server'] == repository - or current['repository'] == repository - ): - return current - elif 'server-login' in sections: - # old format - server = 'server-login' - if config.has_option(server, 'repository'): - repository = config.get(server, 'repository') - else: - repository = self.DEFAULT_REPOSITORY - return { - 'username': config.get(server, 'username'), - 'password': config.get(server, 'password'), - 'repository': repository, - 'server': server, - 'realm': self.DEFAULT_REALM, - } - - return {} - - def _read_pypi_response(self, response): - """Read and decode a PyPI HTTP response.""" - content_type = response.getheader('content-type', 'text/plain') - return response.read().decode(_extract_encoding(content_type)) - - def initialize_options(self): - """Initialize options.""" - self.repository = None - self.realm = None - self.show_response = False - - def finalize_options(self): - """Finalizes options.""" - if self.repository is None: - self.repository = self.DEFAULT_REPOSITORY - if self.realm is None: - self.realm = self.DEFAULT_REALM - - -def _extract_encoding(content_type): - """ - >>> _extract_encoding('text/plain') - 'ascii' - >>> _extract_encoding('text/html; charset="utf8"') - 'utf8' - """ - msg = email.message.EmailMessage() - msg['content-type'] = content_type - return msg['content-type'].params.get('charset', 'ascii') diff --git a/distutils/core.py b/distutils/core.py index 82113c47..bc06091a 100644 --- a/distutils/core.py +++ b/distutils/core.py @@ -11,7 +11,6 @@ import tokenize from .cmd import Command -from .config import PyPIRCCommand from .debug import DEBUG # Mainly import these so setup scripts can "from distutils.core import" them. @@ -24,7 +23,7 @@ ) from .extension import Extension -__all__ = ['Distribution', 'Command', 'PyPIRCCommand', 'Extension', 'setup'] +__all__ = ['Distribution', 'Command', 'Extension', 'setup'] # This is a barebones help message generated displayed when the user # runs the setup script with no arguments at all. More useful help diff --git a/distutils/tests/test_config.py b/distutils/tests/test_config.py deleted file mode 100644 index 6e3f5f24..00000000 --- a/distutils/tests/test_config.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Tests for distutils.pypirc.pypirc.""" - -import os -import pathlib -from distutils.tests import support - -import pytest - -PYPIRC = """\ -[distutils] - -index-servers = - server1 - server2 - server3 - -[server1] -username:me -password:secret - -[server2] -username:meagain -password: secret -realm:acme -repository:http://another.pypi/ - -[server3] -username:cbiggles -password:yh^%#rest-of-my-password -""" - -PYPIRC_OLD = """\ -[server-login] -username:tarek -password:secret -""" - -WANTED = """\ -[distutils] -index-servers = - pypi - -[pypi] -username:tarek -password:xxx -""" - - -@support.combine_markers -@pytest.mark.usefixtures('pypirc') -class BasePyPIRCCommandTestCase(support.TempdirManager): - pass - - -class TestPyPIRCCommand(BasePyPIRCCommandTestCase): - def test_server_registration(self): - # This test makes sure PyPIRCCommand knows how to: - # 1. handle several sections in .pypirc - # 2. handle the old format - - # new format - self.write_file(self.rc, PYPIRC) - cmd = self._cmd(self.dist) - config = cmd._read_pypirc() - - config = list(sorted(config.items())) - waited = [ - ('password', 'secret'), - ('realm', 'pypi'), - ('repository', 'https://upload.pypi.org/legacy/'), - ('server', 'server1'), - ('username', 'me'), - ] - assert config == waited - - # old format - self.write_file(self.rc, PYPIRC_OLD) - config = cmd._read_pypirc() - config = list(sorted(config.items())) - waited = [ - ('password', 'secret'), - ('realm', 'pypi'), - ('repository', 'https://upload.pypi.org/legacy/'), - ('server', 'server-login'), - ('username', 'tarek'), - ] - assert config == waited - - def test_server_empty_registration(self): - cmd = self._cmd(self.dist) - rc = cmd._get_rc_file() - assert not os.path.exists(rc) - cmd._store_pypirc('tarek', 'xxx') - assert os.path.exists(rc) - assert pathlib.Path(rc).read_text(encoding='utf-8') == WANTED - - def test_config_interpolation(self): - # using the % character in .pypirc should not raise an error (#20120) - self.write_file(self.rc, PYPIRC) - cmd = self._cmd(self.dist) - cmd.repository = 'server3' - config = cmd._read_pypirc() - - config = list(sorted(config.items())) - waited = [ - ('password', 'yh^%#rest-of-my-password'), - ('realm', 'pypi'), - ('repository', 'https://upload.pypi.org/legacy/'), - ('server', 'server3'), - ('username', 'cbiggles'), - ] - assert config == waited diff --git a/distutils/tests/test_register.py b/distutils/tests/test_register.py deleted file mode 100644 index 14dfb832..00000000 --- a/distutils/tests/test_register.py +++ /dev/null @@ -1,314 +0,0 @@ -"""Tests for distutils.command.register.""" - -import getpass -import os -import pathlib -import urllib -from distutils.command import register as register_module -from distutils.command.register import register -from distutils.errors import DistutilsSetupError -from distutils.tests.test_config import BasePyPIRCCommandTestCase - -import pytest - -try: - import docutils -except ImportError: - docutils = None - -PYPIRC_NOPASSWORD = """\ -[distutils] - -index-servers = - server1 - -[server1] -username:me -""" - -WANTED_PYPIRC = """\ -[distutils] -index-servers = - pypi - -[pypi] -username:tarek -password:password -""" - - -class Inputs: - """Fakes user inputs.""" - - def __init__(self, *answers): - self.answers = answers - self.index = 0 - - def __call__(self, prompt=''): - try: - return self.answers[self.index] - finally: - self.index += 1 - - -class FakeOpener: - """Fakes a PyPI server""" - - def __init__(self): - self.reqs = [] - - def __call__(self, *args): - return self - - def open(self, req, data=None, timeout=None): - self.reqs.append(req) - return self - - def read(self): - return b'xxx' - - def getheader(self, name, default=None): - return { - 'content-type': 'text/plain; charset=utf-8', - }.get(name.lower(), default) - - -@pytest.fixture(autouse=True) -def autopass(monkeypatch): - monkeypatch.setattr(getpass, 'getpass', lambda prompt: 'password') - - -@pytest.fixture(autouse=True) -def fake_opener(monkeypatch, request): - opener = FakeOpener() - monkeypatch.setattr(urllib.request, 'build_opener', opener) - monkeypatch.setattr(urllib.request, '_opener', None) - request.instance.conn = opener - - -class TestRegister(BasePyPIRCCommandTestCase): - def _get_cmd(self, metadata=None): - if metadata is None: - metadata = { - 'url': 'xxx', - 'author': 'xxx', - 'author_email': 'xxx', - 'name': 'xxx', - 'version': 'xxx', - 'long_description': 'xxx', - } - pkg_info, dist = self.create_dist(**metadata) - return register(dist) - - def test_create_pypirc(self): - # this test makes sure a .pypirc file - # is created when requested. - - # let's create a register instance - cmd = self._get_cmd() - - # we shouldn't have a .pypirc file yet - assert not os.path.exists(self.rc) - - # patching input and getpass.getpass - # so register gets happy - # - # Here's what we are faking : - # use your existing login (choice 1.) - # Username : 'tarek' - # Password : 'password' - # Save your login (y/N)? : 'y' - inputs = Inputs('1', 'tarek', 'y') - register_module.input = inputs.__call__ - # let's run the command - try: - cmd.run() - finally: - del register_module.input - - # A new .pypirc file should contain WANTED_PYPIRC - assert pathlib.Path(self.rc).read_text(encoding='utf-8') == WANTED_PYPIRC - - # now let's make sure the .pypirc file generated - # really works : we shouldn't be asked anything - # if we run the command again - def _no_way(prompt=''): - raise AssertionError(prompt) - - register_module.input = _no_way - - cmd.show_response = True - cmd.run() - - # let's see what the server received : we should - # have 2 similar requests - assert len(self.conn.reqs) == 2 - req1 = dict(self.conn.reqs[0].headers) - req2 = dict(self.conn.reqs[1].headers) - - assert req1['Content-length'] == '1358' - assert req2['Content-length'] == '1358' - assert b'xxx' in self.conn.reqs[1].data - - def test_password_not_in_file(self): - self.write_file(self.rc, PYPIRC_NOPASSWORD) - cmd = self._get_cmd() - cmd._set_config() - cmd.finalize_options() - cmd.send_metadata() - - # dist.password should be set - # therefore used afterwards by other commands - assert cmd.distribution.password == 'password' - - def test_registering(self): - # this test runs choice 2 - cmd = self._get_cmd() - inputs = Inputs('2', 'tarek', 'tarek@ziade.org') - register_module.input = inputs.__call__ - try: - # let's run the command - cmd.run() - finally: - del register_module.input - - # we should have send a request - assert len(self.conn.reqs) == 1 - req = self.conn.reqs[0] - headers = dict(req.headers) - assert headers['Content-length'] == '608' - assert b'tarek' in req.data - - def test_password_reset(self): - # this test runs choice 3 - cmd = self._get_cmd() - inputs = Inputs('3', 'tarek@ziade.org') - register_module.input = inputs.__call__ - try: - # let's run the command - cmd.run() - finally: - del register_module.input - - # we should have send a request - assert len(self.conn.reqs) == 1 - req = self.conn.reqs[0] - headers = dict(req.headers) - assert headers['Content-length'] == '290' - assert b'tarek' in req.data - - def test_strict(self): - # testing the strict option - # when on, the register command stops if - # the metadata is incomplete or if - # long_description is not reSt compliant - - pytest.importorskip('docutils') - - # empty metadata - cmd = self._get_cmd({}) - cmd.ensure_finalized() - cmd.strict = True - with pytest.raises(DistutilsSetupError): - cmd.run() - - # metadata are OK but long_description is broken - metadata = { - 'url': 'xxx', - 'author': 'xxx', - 'author_email': 'éxéxé', - 'name': 'xxx', - 'version': 'xxx', - 'long_description': 'title\n==\n\ntext', - } - - cmd = self._get_cmd(metadata) - cmd.ensure_finalized() - cmd.strict = True - with pytest.raises(DistutilsSetupError): - cmd.run() - - # now something that works - metadata['long_description'] = 'title\n=====\n\ntext' - cmd = self._get_cmd(metadata) - cmd.ensure_finalized() - cmd.strict = True - inputs = Inputs('1', 'tarek', 'y') - register_module.input = inputs.__call__ - # let's run the command - try: - cmd.run() - finally: - del register_module.input - - # strict is not by default - cmd = self._get_cmd() - cmd.ensure_finalized() - inputs = Inputs('1', 'tarek', 'y') - register_module.input = inputs.__call__ - # let's run the command - try: - cmd.run() - finally: - del register_module.input - - # and finally a Unicode test (bug #12114) - metadata = { - 'url': 'xxx', - 'author': '\u00c9ric', - 'author_email': 'xxx', - 'name': 'xxx', - 'version': 'xxx', - 'description': 'Something about esszet \u00df', - 'long_description': 'More things about esszet \u00df', - } - - cmd = self._get_cmd(metadata) - cmd.ensure_finalized() - cmd.strict = True - inputs = Inputs('1', 'tarek', 'y') - register_module.input = inputs.__call__ - # let's run the command - try: - cmd.run() - finally: - del register_module.input - - def test_register_invalid_long_description(self, monkeypatch): - pytest.importorskip('docutils') - description = ':funkie:`str`' # mimic Sphinx-specific markup - metadata = { - 'url': 'xxx', - 'author': 'xxx', - 'author_email': 'xxx', - 'name': 'xxx', - 'version': 'xxx', - 'long_description': description, - } - cmd = self._get_cmd(metadata) - cmd.ensure_finalized() - cmd.strict = True - inputs = Inputs('2', 'tarek', 'tarek@ziade.org') - monkeypatch.setattr(register_module, 'input', inputs, raising=False) - - with pytest.raises(DistutilsSetupError): - cmd.run() - - def test_list_classifiers(self, caplog): - cmd = self._get_cmd() - cmd.list_classifiers = True - cmd.run() - assert caplog.messages == ['running check', 'xxx'] - - def test_show_response(self, caplog): - # test that the --show-response option return a well formatted response - cmd = self._get_cmd() - inputs = Inputs('1', 'tarek', 'y') - register_module.input = inputs.__call__ - cmd.show_response = True - try: - cmd.run() - finally: - del register_module.input - - assert caplog.messages[3] == 75 * '-' + '\nxxx\n' + 75 * '-' diff --git a/distutils/tests/test_upload.py b/distutils/tests/test_upload.py deleted file mode 100644 index 56df209c..00000000 --- a/distutils/tests/test_upload.py +++ /dev/null @@ -1,215 +0,0 @@ -"""Tests for distutils.command.upload.""" - -import os -import unittest.mock as mock -from distutils.command import upload as upload_mod -from distutils.command.upload import upload -from distutils.core import Distribution -from distutils.errors import DistutilsError -from distutils.tests.test_config import PYPIRC, BasePyPIRCCommandTestCase -from urllib.request import HTTPError - -import pytest - -PYPIRC_LONG_PASSWORD = """\ -[distutils] - -index-servers = - server1 - server2 - -[server1] -username:me -password:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa - -[server2] -username:meagain -password: secret -realm:acme -repository:http://another.pypi/ -""" - - -PYPIRC_NOPASSWORD = """\ -[distutils] - -index-servers = - server1 - -[server1] -username:me -""" - - -class FakeOpen: - def __init__(self, url, msg=None, code=None): - self.url = url - if not isinstance(url, str): - self.req = url - else: - self.req = None - self.msg = msg or 'OK' - self.code = code or 200 - - def getheader(self, name, default=None): - return { - 'content-type': 'text/plain; charset=utf-8', - }.get(name.lower(), default) - - def read(self): - return b'xyzzy' - - def getcode(self): - return self.code - - -@pytest.fixture(autouse=True) -def urlopen(request, monkeypatch): - self = request.instance - monkeypatch.setattr(upload_mod, 'urlopen', self._urlopen) - self.next_msg = self.next_code = None - - -class TestUpload(BasePyPIRCCommandTestCase): - def _urlopen(self, url): - self.last_open = FakeOpen(url, msg=self.next_msg, code=self.next_code) - return self.last_open - - def test_finalize_options(self): - # new format - self.write_file(self.rc, PYPIRC) - dist = Distribution() - cmd = upload(dist) - cmd.finalize_options() - for attr, waited in ( - ('username', 'me'), - ('password', 'secret'), - ('realm', 'pypi'), - ('repository', 'https://upload.pypi.org/legacy/'), - ): - assert getattr(cmd, attr) == waited - - def test_saved_password(self): - # file with no password - self.write_file(self.rc, PYPIRC_NOPASSWORD) - - # make sure it passes - dist = Distribution() - cmd = upload(dist) - cmd.finalize_options() - assert cmd.password is None - - # make sure we get it as well, if another command - # initialized it at the dist level - dist.password = 'xxx' - cmd = upload(dist) - cmd.finalize_options() - assert cmd.password == 'xxx' - - def test_upload(self, caplog): - tmp = self.mkdtemp() - path = os.path.join(tmp, 'xxx') - self.write_file(path) - command, pyversion, filename = 'xxx', '2.6', path - dist_files = [(command, pyversion, filename)] - self.write_file(self.rc, PYPIRC_LONG_PASSWORD) - - # lets run it - pkg_dir, dist = self.create_dist(dist_files=dist_files) - cmd = upload(dist) - cmd.show_response = True - cmd.ensure_finalized() - cmd.run() - - # what did we send ? - headers = dict(self.last_open.req.headers) - assert int(headers['Content-length']) >= 2162 - content_type = headers['Content-type'] - assert content_type.startswith('multipart/form-data') - assert self.last_open.req.get_method() == 'POST' - expected_url = 'https://upload.pypi.org/legacy/' - assert self.last_open.req.get_full_url() == expected_url - data = self.last_open.req.data - assert b'xxx' in data - assert b'protocol_version' in data - assert b'sha256_digest' in data - assert ( - b'cd2eb0837c9b4c962c22d2ff8b5441b7b45805887f051d39bf133b583baf' - b'6860' in data - ) - if b'md5_digest' in data: - assert b'f561aaf6ef0bf14d4208bb46a4ccb3ad' in data - if b'blake2_256_digest' in data: - assert ( - b'b6f289a27d4fe90da63c503bfe0a9b761a8f76bb86148565065f040be' - b'6d1c3044cf7ded78ef800509bccb4b648e507d88dc6383d67642aadcc' - b'ce443f1534330a' in data - ) - - # The PyPI response body was echoed - results = caplog.messages - assert results[-1] == 75 * '-' + '\nxyzzy\n' + 75 * '-' - - # bpo-32304: archives whose last byte was b'\r' were corrupted due to - # normalization intended for Mac OS 9. - def test_upload_correct_cr(self): - # content that ends with \r should not be modified. - tmp = self.mkdtemp() - path = os.path.join(tmp, 'xxx') - self.write_file(path, content='yy\r') - command, pyversion, filename = 'xxx', '2.6', path - dist_files = [(command, pyversion, filename)] - self.write_file(self.rc, PYPIRC_LONG_PASSWORD) - - # other fields that ended with \r used to be modified, now are - # preserved. - pkg_dir, dist = self.create_dist( - dist_files=dist_files, description='long description\r' - ) - cmd = upload(dist) - cmd.show_response = True - cmd.ensure_finalized() - cmd.run() - - headers = dict(self.last_open.req.headers) - assert int(headers['Content-length']) >= 2172 - assert b'long description\r' in self.last_open.req.data - - def test_upload_fails(self, caplog): - self.next_msg = "Not Found" - self.next_code = 404 - with pytest.raises(DistutilsError): - self.test_upload(caplog) - - @pytest.mark.parametrize( - 'exception,expected,raised_exception', - [ - (OSError('oserror'), 'oserror', OSError), - pytest.param( - HTTPError('url', 400, 'httperror', {}, None), - 'Upload failed (400): httperror', - DistutilsError, - id="HTTP 400", - ), - ], - ) - def test_wrong_exception_order(self, exception, expected, raised_exception, caplog): - tmp = self.mkdtemp() - path = os.path.join(tmp, 'xxx') - self.write_file(path) - dist_files = [('xxx', '2.6', path)] # command, pyversion, filename - self.write_file(self.rc, PYPIRC_LONG_PASSWORD) - - pkg_dir, dist = self.create_dist(dist_files=dist_files) - - with mock.patch( - 'distutils.command.upload.urlopen', - new=mock.Mock(side_effect=exception), - ): - with pytest.raises(raised_exception): - cmd = upload(dist) - cmd.ensure_finalized() - cmd.run() - results = caplog.messages - assert expected in results[-1] - caplog.clear()