Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for supporting multiple --pyenv options #313

Merged
merged 8 commits into from
May 8, 2023
10 changes: 7 additions & 3 deletions fawltydeps/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from fawltydeps import extract_declared_dependencies, extract_imports
from fawltydeps.check import calculate_undeclared, calculate_unused
from fawltydeps.cli_parser import build_parser
from fawltydeps.packages import Package, resolve_dependencies
from fawltydeps.packages import BasePackageResolver, Package, resolve_dependencies
from fawltydeps.settings import Action, OutputFormat, Settings, print_toml_config
from fawltydeps.traverse_project import find_sources
from fawltydeps.types import (
Expand Down Expand Up @@ -163,8 +163,12 @@ def print_json(self, out: TextIO) -> None:
# However, not all elements that we store in a set are automatically
# orderable (e.g. PathOrSpecial don't know how to order SpecialPath vs
# Path), so order by string representation instead:
set_sort = partial(sorted, key=str)
encoder = partial(custom_pydantic_encoder, {frozenset: set_sort, set: set_sort})
custom_type_encoders = {
frozenset: partial(sorted, key=str),
set: partial(sorted, key=str),
type(BasePackageResolver): lambda klass: klass.__name__,
}
encoder = partial(custom_pydantic_encoder, custom_type_encoders)
json_dict = {
"settings": self.settings,
# Using properties with an underscore do not trigger computations.
Expand Down
186 changes: 93 additions & 93 deletions fawltydeps/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@
import venv
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass, field
from enum import Enum
from itertools import chain
from dataclasses import dataclass, replace
from pathlib import Path
from typing import Dict, Iterable, Iterator, List, Optional, Set
from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple, Type, Union

# importlib_metadata is gradually graduating into the importlib.metadata stdlib
# module, however we rely on internal functions and recent (and upcoming)
Expand All @@ -30,26 +28,19 @@
UnparseablePathException,
UnresolvedDependenciesError,
)
from fawltydeps.utils import calculated_once, hide_dataclass_fields
from fawltydeps.utils import calculated_once

if sys.version_info >= (3, 11):
import tomllib # pylint: disable=no-member
else:
import tomli as tomllib

PackageDebugInfo = Union[None, str, Dict[str, Set[str]]]

logger = logging.getLogger(__name__)


class DependenciesMapping(str, Enum):
"""Types of dependency and imports mapping"""

IDENTITY = "identity"
LOCAL_ENV = "local_env"
USER_DEFINED = "user_defined"


@dataclass
@dataclass(frozen=True)
class Package:
"""Encapsulate an installable Python package.

Expand All @@ -59,17 +50,9 @@ class Package:
"""

package_name: str
mappings: Dict[DependenciesMapping, Set[str]] = field(default_factory=dict)
import_names: Set[str] = field(default_factory=set)

def __post_init__(self) -> None:
# The .import_names member is entirely redundant, as it can always be
# calculated from a union of self.mappings.values(). However, it is
# still used often enough (.is_used() is called once per declared
# dependency) that it makes sense to pre-calculate it, and rather hide
# the redundancy from our JSON output
self.import_names = {name for names in self.mappings.values() for name in names}
hide_dataclass_fields(self, "import_names")
import_names: Set[str]
resolved_with: Type["BasePackageResolver"]
debug_info: PackageDebugInfo = None

@staticmethod
def normalize_name(package_name: str) -> str:
Expand All @@ -83,17 +66,6 @@ def normalize_name(package_name: str) -> str:
"""
return package_name.lower().replace("-", "_")

def add_import_names(
self, *import_names: str, mapping: DependenciesMapping
) -> None:
"""Add import names provided by this package.

Import names must be associated with a DependenciesMapping enum value,
as keeping track of this is extremely helpful when debugging.
"""
self.mappings.setdefault(mapping, set()).update(import_names)
self.import_names.update(import_names)

def is_used(self, imported_names: Iterable[str]) -> bool:
"""Return True iff this package is among the given import names."""
return bool(self.import_names.intersection(imported_names))
Expand All @@ -116,6 +88,40 @@ def lookup_packages(self, package_names: Set[str]) -> Dict[str, Package]:
raise NotImplementedError


def accumulate_mappings(
resolved_with: Type[BasePackageResolver],
custom_mappings: Iterable[Tuple[CustomMapping, str]],
) -> Dict[str, Package]:
"""Merge CustomMappings (w/associated descriptions) into a dict of Packages.

Each resulting package object maps a (normalized) package name to a mapping
dict where the provided imports are keyed by their associated description.
The keys in the returned dict are also normalized package names.
"""
result: Dict[str, Package] = {}
for custom_mapping, debug_key in custom_mappings:
for name, imports in custom_mapping.items():
normalized_name = Package.normalize_name(name)
if normalized_name not in result: # create new Package instance
result[normalized_name] = Package(
package_name=normalized_name,
import_names=set(imports),
resolved_with=resolved_with,
debug_info={debug_key: set(imports)},
)
else: # replace existing Package instance with "augmented" version
prev = result[normalized_name]
debug_info = prev.debug_info
assert isinstance(debug_info, dict)
debug_info.setdefault(debug_key, set()).update(imports)
result[normalized_name] = replace(
prev,
import_names=set.union(prev.import_names, imports),
debug_info=debug_info,
)
return result


class UserDefinedMapping(BasePackageResolver):
"""Use user-defined mapping loaded from a toml file"""

Expand All @@ -134,22 +140,6 @@ def __init__(
# We enumerate packages declared in the mapping _once_ and cache the result here:
self._packages: Optional[Dict[str, Package]] = None

@staticmethod
def accumulate_mappings(custom_mappings: Iterable[CustomMapping]) -> CustomMapping:
"""Merge mapping dictionaries and normalise key (package) names."""
result: CustomMapping = {}
for name, imports in chain.from_iterable(cm.items() for cm in custom_mappings):
normalised_name = Package.normalize_name(name)
if normalised_name in result:
logger.info(
"Mapping for %s already found. Import names "
"from the second mapping are appended to ones "
"found in the first mapping.",
normalised_name,
)
result.setdefault(normalised_name, []).extend(imports)
return result

@property
@calculated_once
def packages(self) -> Dict[str, Package]:
Expand All @@ -165,26 +155,18 @@ def packages(self) -> Dict[str, Package]:
the remainder of this object's life in _packages.
"""

def _custom_mappings() -> Iterator[CustomMapping]:
def _custom_mappings() -> Iterator[Tuple[CustomMapping, str]]:
if self.custom_mapping is not None:
logger.debug("Applying user-defined mapping from settings.")
yield self.custom_mapping
yield self.custom_mapping, "from settings"

if self.mapping_paths is not None:
for path in self.mapping_paths:
logger.debug(f"Loading user-defined mapping from {path}")
with open(path, "rb") as mapping_file:
yield tomllib.load(mapping_file)
yield tomllib.load(mapping_file), str(path)

custom_mapping = self.accumulate_mappings(_custom_mappings())

return {
name: Package(
name,
{DependenciesMapping.USER_DEFINED: set(imports)},
)
for name, imports in custom_mapping.items()
}
return accumulate_mappings(self.__class__, _custom_mappings())

def lookup_packages(self, package_names: Set[str]) -> Dict[str, Package]:
"""Convert package names to locally available Package objects."""
Expand Down Expand Up @@ -248,38 +230,51 @@ def determine_package_dir(cls, path: Path) -> Optional[Path]:
return None if path.parent == path else cls.determine_package_dir(path.parent)

@property
@calculated_once
def packages(self) -> Dict[str, Package]:
"""Return mapping of package names to Package objects.

This enumerates the available packages in the given Python environment
(or the current Python environment) _once_, and caches the result for
the remainder of this object's life.
"""
if self._packages is None: # need to build cache
if self.pyenv_path is None:
paths = sys.path # use current Python environment
else:
paths = [str(self.pyenv_path)]

self._packages = {}
# We're reaching into the internals of importlib_metadata here,
# which Mypy is not overly fond of. Roughly what we're doing here
# is calling packages_distributions(), but on a possibly different
# environment than the current one (i.e. sys.path).
# Note that packages_distributions() is not able to return packages
# that map to zero import names.
context = DistributionFinder.Context(path=paths) # type: ignore
for dist in MetadataPathFinder().find_distributions(context): # type: ignore
parent_dir = dist.locate_file("")
logger.debug(f"Found {dist.name} {dist.version} under {parent_dir}")
imports = set(
_top_level_declared(dist) # type: ignore
or _top_level_inferred(dist) # type: ignore
)
package = Package(dist.name, {DependenciesMapping.LOCAL_ENV: imports})
self._packages[Package.normalize_name(dist.name)] = package
if self.pyenv_path is None:
paths = sys.path # use current Python environment
else:
paths = [str(self.pyenv_path)]

ret = {}
# We're reaching into the internals of importlib_metadata here,
# which Mypy is not overly fond of. Roughly what we're doing here
# is calling packages_distributions(), but on a possibly different
# environment than the current one (i.e. sys.path).
# Note that packages_distributions() is not able to return packages
# that map to zero import names.
context = DistributionFinder.Context(path=paths) # type: ignore
for dist in MetadataPathFinder().find_distributions(context): # type: ignore
normalized_name = Package.normalize_name(dist.name)
parent_dir = dist.locate_file("")
if normalized_name in ret:
# We already found another instance of this package earlier in
# the given paths. Assume that the earlier package is what
# Python's import machinery will choose, and that this later
# package is skipped
logger.debug(f"Skip {dist.name} {dist.version} under {parent_dir}")
continue

logger.debug(f"Found {dist.name} {dist.version} under {parent_dir}")
imports = set(
_top_level_declared(dist) # type: ignore
or _top_level_inferred(dist) # type: ignore
)
ret[normalized_name] = Package(
package_name=dist.name,
import_names=imports,
resolved_with=self.__class__,
debug_info={str(parent_dir): imports},
)

return self._packages
return ret

def lookup_packages(self, package_names: Set[str]) -> Dict[str, Package]:
"""Convert package names to locally available Package objects.
Expand Down Expand Up @@ -342,8 +337,15 @@ def lookup_packages(self, package_names: Set[str]) -> Dict[str, Package]:
"""
logger.info("Installing dependencies into a new temporary Python environment.")
with self.temp_installed_requirements(sorted(package_names)) as venv_dir:
local_resolver = LocalPackageResolver(venv_dir)
return local_resolver.lookup_packages(package_names)
resolver = LocalPackageResolver(venv_dir)
return {
name: replace(
package,
resolved_with=self.__class__,
debug_info="Provided by temporary `pip install`",
)
for name, package in resolver.lookup_packages(package_names).items()
}


class IdentityMapping(BasePackageResolver):
Expand All @@ -356,14 +358,12 @@ class IdentityMapping(BasePackageResolver):
@staticmethod
def lookup_package(package_name: str) -> Package:
"""Convert a package name into a Package with the same import name."""
ret = Package(package_name)
import_name = Package.normalize_name(package_name)
ret.add_import_names(import_name, mapping=DependenciesMapping.IDENTITY)
logger.info(
f"{package_name!r} was not resolved. "
f"Assuming it can be imported as {import_name!r}."
)
return ret
return Package(package_name, {import_name}, IdentityMapping)

def lookup_packages(self, package_names: Set[str]) -> Dict[str, Package]:
"""Convert package names into Package objects w/the same import name."""
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def lint(session):
"too-many-arguments",
"too-many-instance-attributes",
"too-many-lines",
"use-implicit-booleaness-not-comparison",
]
session.run(
"pylint",
Expand Down
40 changes: 37 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from tempfile import mkdtemp
from textwrap import dedent
from typing import Dict, Iterable, Set, Union
from typing import Callable, Dict, Iterable, Set, Tuple, Union

import pytest

Expand All @@ -26,7 +26,7 @@ def _inner(file_contents: Dict[str, str]) -> Path:

@pytest.fixture
def fake_venv(tmp_path):
def create_one_fake_venv(fake_packages: Dict[str, Set[str]]) -> Path:
def create_one_fake_venv(fake_packages: Dict[str, Set[str]]) -> Tuple[Path, Path]:
venv_dir = Path(mkdtemp(prefix="fake_venv.", dir=tmp_path))
venv.create(venv_dir, with_pip=False)

Expand All @@ -47,11 +47,45 @@ def create_one_fake_venv(fake_packages: Dict[str, Set[str]]) -> Path:
for name in import_names:
(site_dir / f"{name}.py").touch()

return venv_dir
return venv_dir, site_dir

return create_one_fake_venv


@pytest.fixture
def isolate_default_resolver(
fake_venv: Callable[[Dict[str, Set[str]]], Tuple[Path, Path]], monkeypatch
):
"""Put a fake_venv at the start of sys.path to yield predictable Packages.

Call the returned function to place a fake venv with the specified package
mappings at the start of sys.path.

Rationale:
When testing resolve_dependencies() or anything that depends on
LocalPackageResolver() with default/empty pyenv, it is important to realize
that local packages will be resolved via sys.path. This is hard to fully
isolate/mock in tests, but we can do the following to approximate isolation:
- Use fake_venv() and pytest.monkeypatch.syspath_prepend(path) to make sure
packages that we expect to find in the default environment are always
found in this fake venv. This is achieved by using this fixture.
- Populate this fake_venv with package that we expect to find in the default
environment. These will then be resolved through the fake_venv to yield
predictable import names and mapping descriptions.
- Tests must make sure packages that they expect NOT to find in the default
environment are chosen/spelled in ways to ensure they are indeed never
found elsewhere in sys.path, as we are not able to isolate the resolver
from sys.path.
"""

def inner(fake_packages: Dict[str, Set[str]]) -> Path:
_venv_dir, package_dir = fake_venv(fake_packages)
monkeypatch.syspath_prepend(package_dir)
return package_dir

return inner


@pytest.fixture
def project_with_requirements(write_tmp_files):
return write_tmp_files(
Expand Down
Loading