You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
271 lines
9.5 KiB
271 lines
9.5 KiB
import email.message
|
|
import email.parser
|
|
import logging
|
|
import os
|
|
import zipfile
|
|
from typing import Collection, Iterable, Iterator, List, Mapping, NamedTuple, Optional
|
|
|
|
from pip._vendor import pkg_resources
|
|
from pip._vendor.packaging.requirements import Requirement
|
|
from pip._vendor.packaging.utils import NormalizedName, canonicalize_name
|
|
from pip._vendor.packaging.version import parse as parse_version
|
|
|
|
from pip._internal.exceptions import InvalidWheel, NoneMetadataError, UnsupportedWheel
|
|
from pip._internal.utils.egg_link import egg_link_path_from_location
|
|
from pip._internal.utils.misc import display_path, normalize_path
|
|
from pip._internal.utils.wheel import parse_wheel, read_wheel_metadata_file
|
|
|
|
from .base import (
|
|
BaseDistribution,
|
|
BaseEntryPoint,
|
|
BaseEnvironment,
|
|
DistributionVersion,
|
|
InfoPath,
|
|
Wheel,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EntryPoint(NamedTuple):
|
|
name: str
|
|
value: str
|
|
group: str
|
|
|
|
|
|
class InMemoryMetadata:
|
|
"""IMetadataProvider that reads metadata files from a dictionary.
|
|
|
|
This also maps metadata decoding exceptions to our internal exception type.
|
|
"""
|
|
|
|
def __init__(self, metadata: Mapping[str, bytes], wheel_name: str) -> None:
|
|
self._metadata = metadata
|
|
self._wheel_name = wheel_name
|
|
|
|
def has_metadata(self, name: str) -> bool:
|
|
return name in self._metadata
|
|
|
|
def get_metadata(self, name: str) -> str:
|
|
try:
|
|
return self._metadata[name].decode()
|
|
except UnicodeDecodeError as e:
|
|
# Augment the default error with the origin of the file.
|
|
raise UnsupportedWheel(
|
|
f"Error decoding metadata for {self._wheel_name}: {e} in {name} file"
|
|
)
|
|
|
|
def get_metadata_lines(self, name: str) -> Iterable[str]:
|
|
return pkg_resources.yield_lines(self.get_metadata(name))
|
|
|
|
def metadata_isdir(self, name: str) -> bool:
|
|
return False
|
|
|
|
def metadata_listdir(self, name: str) -> List[str]:
|
|
return []
|
|
|
|
def run_script(self, script_name: str, namespace: str) -> None:
|
|
pass
|
|
|
|
|
|
class Distribution(BaseDistribution):
|
|
def __init__(self, dist: pkg_resources.Distribution) -> None:
|
|
self._dist = dist
|
|
|
|
@classmethod
|
|
def from_directory(cls, directory: str) -> BaseDistribution:
|
|
dist_dir = directory.rstrip(os.sep)
|
|
|
|
# Build a PathMetadata object, from path to metadata. :wink:
|
|
base_dir, dist_dir_name = os.path.split(dist_dir)
|
|
metadata = pkg_resources.PathMetadata(base_dir, dist_dir)
|
|
|
|
# Determine the correct Distribution object type.
|
|
if dist_dir.endswith(".egg-info"):
|
|
dist_cls = pkg_resources.Distribution
|
|
dist_name = os.path.splitext(dist_dir_name)[0]
|
|
else:
|
|
assert dist_dir.endswith(".dist-info")
|
|
dist_cls = pkg_resources.DistInfoDistribution
|
|
dist_name = os.path.splitext(dist_dir_name)[0].split("-")[0]
|
|
|
|
dist = dist_cls(base_dir, project_name=dist_name, metadata=metadata)
|
|
return cls(dist)
|
|
|
|
@classmethod
|
|
def from_metadata_file_contents(
|
|
cls,
|
|
metadata_contents: bytes,
|
|
filename: str,
|
|
project_name: str,
|
|
) -> BaseDistribution:
|
|
metadata_dict = {
|
|
"METADATA": metadata_contents,
|
|
}
|
|
dist = pkg_resources.DistInfoDistribution(
|
|
location=filename,
|
|
metadata=InMemoryMetadata(metadata_dict, filename),
|
|
project_name=project_name,
|
|
)
|
|
return cls(dist)
|
|
|
|
@classmethod
|
|
def from_wheel(cls, wheel: Wheel, name: str) -> BaseDistribution:
|
|
try:
|
|
with wheel.as_zipfile() as zf:
|
|
info_dir, _ = parse_wheel(zf, name)
|
|
metadata_dict = {
|
|
path.split("/", 1)[-1]: read_wheel_metadata_file(zf, path)
|
|
for path in zf.namelist()
|
|
if path.startswith(f"{info_dir}/")
|
|
}
|
|
except zipfile.BadZipFile as e:
|
|
raise InvalidWheel(wheel.location, name) from e
|
|
except UnsupportedWheel as e:
|
|
raise UnsupportedWheel(f"{name} has an invalid wheel, {e}")
|
|
dist = pkg_resources.DistInfoDistribution(
|
|
location=wheel.location,
|
|
metadata=InMemoryMetadata(metadata_dict, wheel.location),
|
|
project_name=name,
|
|
)
|
|
return cls(dist)
|
|
|
|
@property
|
|
def location(self) -> Optional[str]:
|
|
return self._dist.location
|
|
|
|
@property
|
|
def installed_location(self) -> Optional[str]:
|
|
egg_link = egg_link_path_from_location(self.raw_name)
|
|
if egg_link:
|
|
location = egg_link
|
|
elif self.location:
|
|
location = self.location
|
|
else:
|
|
return None
|
|
return normalize_path(location)
|
|
|
|
@property
|
|
def info_location(self) -> Optional[str]:
|
|
return self._dist.egg_info
|
|
|
|
@property
|
|
def installed_by_distutils(self) -> bool:
|
|
# A distutils-installed distribution is provided by FileMetadata. This
|
|
# provider has a "path" attribute not present anywhere else. Not the
|
|
# best introspection logic, but pip has been doing this for a long time.
|
|
try:
|
|
return bool(self._dist._provider.path)
|
|
except AttributeError:
|
|
return False
|
|
|
|
@property
|
|
def canonical_name(self) -> NormalizedName:
|
|
return canonicalize_name(self._dist.project_name)
|
|
|
|
@property
|
|
def version(self) -> DistributionVersion:
|
|
return parse_version(self._dist.version)
|
|
|
|
def is_file(self, path: InfoPath) -> bool:
|
|
return self._dist.has_metadata(str(path))
|
|
|
|
def iter_distutils_script_names(self) -> Iterator[str]:
|
|
yield from self._dist.metadata_listdir("scripts")
|
|
|
|
def read_text(self, path: InfoPath) -> str:
|
|
name = str(path)
|
|
if not self._dist.has_metadata(name):
|
|
raise FileNotFoundError(name)
|
|
content = self._dist.get_metadata(name)
|
|
if content is None:
|
|
raise NoneMetadataError(self, name)
|
|
return content
|
|
|
|
def iter_entry_points(self) -> Iterable[BaseEntryPoint]:
|
|
for group, entries in self._dist.get_entry_map().items():
|
|
for name, entry_point in entries.items():
|
|
name, _, value = str(entry_point).partition("=")
|
|
yield EntryPoint(name=name.strip(), value=value.strip(), group=group)
|
|
|
|
def _metadata_impl(self) -> email.message.Message:
|
|
"""
|
|
:raises NoneMetadataError: if the distribution reports `has_metadata()`
|
|
True but `get_metadata()` returns None.
|
|
"""
|
|
if isinstance(self._dist, pkg_resources.DistInfoDistribution):
|
|
metadata_name = "METADATA"
|
|
else:
|
|
metadata_name = "PKG-INFO"
|
|
try:
|
|
metadata = self.read_text(metadata_name)
|
|
except FileNotFoundError:
|
|
if self.location:
|
|
displaying_path = display_path(self.location)
|
|
else:
|
|
displaying_path = repr(self.location)
|
|
logger.warning("No metadata found in %s", displaying_path)
|
|
metadata = ""
|
|
feed_parser = email.parser.FeedParser()
|
|
feed_parser.feed(metadata)
|
|
return feed_parser.close()
|
|
|
|
def iter_dependencies(self, extras: Collection[str] = ()) -> Iterable[Requirement]:
|
|
if extras: # pkg_resources raises on invalid extras, so we sanitize.
|
|
extras = frozenset(extras).intersection(self._dist.extras)
|
|
return self._dist.requires(extras)
|
|
|
|
def iter_provided_extras(self) -> Iterable[str]:
|
|
return self._dist.extras
|
|
|
|
|
|
class Environment(BaseEnvironment):
|
|
def __init__(self, ws: pkg_resources.WorkingSet) -> None:
|
|
self._ws = ws
|
|
|
|
@classmethod
|
|
def default(cls) -> BaseEnvironment:
|
|
return cls(pkg_resources.working_set)
|
|
|
|
@classmethod
|
|
def from_paths(cls, paths: Optional[List[str]]) -> BaseEnvironment:
|
|
return cls(pkg_resources.WorkingSet(paths))
|
|
|
|
def _iter_distributions(self) -> Iterator[BaseDistribution]:
|
|
for dist in self._ws:
|
|
yield Distribution(dist)
|
|
|
|
def _search_distribution(self, name: str) -> Optional[BaseDistribution]:
|
|
"""Find a distribution matching the ``name`` in the environment.
|
|
|
|
This searches from *all* distributions available in the environment, to
|
|
match the behavior of ``pkg_resources.get_distribution()``.
|
|
"""
|
|
canonical_name = canonicalize_name(name)
|
|
for dist in self.iter_all_distributions():
|
|
if dist.canonical_name == canonical_name:
|
|
return dist
|
|
return None
|
|
|
|
def get_distribution(self, name: str) -> Optional[BaseDistribution]:
|
|
# Search the distribution by looking through the working set.
|
|
dist = self._search_distribution(name)
|
|
if dist:
|
|
return dist
|
|
|
|
# If distribution could not be found, call working_set.require to
|
|
# update the working set, and try to find the distribution again.
|
|
# This might happen for e.g. when you install a package twice, once
|
|
# using setup.py develop and again using setup.py install. Now when
|
|
# running pip uninstall twice, the package gets removed from the
|
|
# working set in the first uninstall, so we have to populate the
|
|
# working set again so that pip knows about it and the packages gets
|
|
# picked up and is successfully uninstalled the second time too.
|
|
try:
|
|
# We didn't pass in any version specifiers, so this can never
|
|
# raise pkg_resources.VersionConflict.
|
|
self._ws.require(name)
|
|
except pkg_resources.DistributionNotFound:
|
|
return None
|
|
return self._search_distribution(name)
|