# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt

"""Tests for coverage.data"""

from __future__ import annotations

import glob
import os
import os.path
import re
import sqlite3
import threading

from typing import (
    Any, Callable, Collection, Dict, Iterable, Mapping, Set, TypeVar, Union
)
from unittest import mock

import pytest

from coverage.data import CoverageData, combine_parallel_data
from coverage.data import add_data_to_hash, line_counts
from coverage.debug import DebugControlString
from coverage.exceptions import DataError, NoDataError
from coverage.files import PathAliases, canonical_filename
from coverage.types import FilePathClasses, FilePathType, TArc, TLineNo

from tests.coveragetest import CoverageTest
from tests.helpers import assert_count_equal


LINES_1 = {
    'a.py': {1, 2},
    'b.py': {3},
}
SUMMARY_1 = {'a.py': 2, 'b.py': 1}
MEASURED_FILES_1 = ['a.py', 'b.py']
A_PY_LINES_1 = [1, 2]
B_PY_LINES_1 = [3]

LINES_2 = {
    'a.py': {1, 5},
    'c.py': {17},
}
SUMMARY_1_2 = {'a.py': 3, 'b.py': 1, 'c.py': 1}
MEASURED_FILES_1_2 = ['a.py', 'b.py', 'c.py']

ARCS_3 = {
    'x.py': {(-1, 1), (1, 2), (2, 3), (3, -1)},
    'y.py': {(-1, 17), (17, 23), (23, -1)},
}
X_PY_ARCS_3 = [(-1, 1), (1, 2), (2, 3), (3, -1)]
Y_PY_ARCS_3 = [(-1, 17), (17, 23), (23, -1)]
SUMMARY_3 = {'x.py': 3, 'y.py': 2}
MEASURED_FILES_3 = ['x.py', 'y.py']
X_PY_LINES_3 = [1, 2, 3]
Y_PY_LINES_3 = [17, 23]

ARCS_4 = {
    'x.py': {(-1, 2), (2, 5), (5, -1)},
    'z.py': {(-1, 1000), (1000, -1)},
}
SUMMARY_3_4 = {'x.py': 4, 'y.py': 2, 'z.py': 1}
MEASURED_FILES_3_4 = ['x.py', 'y.py', 'z.py']


def DebugCoverageData(*args: Any, **kwargs: Any) -> CoverageData:
    """Factory for CovergeData instances with debugging turned on.

    This lets us exercise the debugging lines in sqldata.py.  We don't make
    any assertions about the debug output, but at least we can know that they
    execute successfully, and they won't be marked as distracting missing
    lines in our coverage reports.
    """
    assert "debug" not in kwargs
    options = ["dataio", "dataop", "sql"]
    if kwargs:
        # There's no reason kwargs should imply sqldata debugging.
        # This is a way to get a mix of debug options across the tests.
        options.extend(["sqldata"])
    debug = DebugControlString(options=options)
    return CoverageData(*args, debug=debug, **kwargs)   # type: ignore[misc]


TCoverageData = Callable[..., CoverageData]

def assert_line_counts(
    covdata: CoverageData,
    counts: Mapping[str, int],
    fullpath: bool = False,
) -> None:
    """Check that the line_counts of `covdata` is `counts`."""
    assert line_counts(covdata, fullpath) == counts

def assert_measured_files(covdata: CoverageData, measured: Iterable[str]) -> None:
    """Check that `covdata`'s measured files are `measured`."""
    assert_count_equal(covdata.measured_files(), measured)

def assert_lines1_data(covdata: CoverageData) -> None:
    """Check that `covdata` has the data from LINES1."""
    assert_line_counts(covdata, SUMMARY_1)
    assert_measured_files(covdata, MEASURED_FILES_1)
    assert_count_equal(covdata.lines("a.py"), A_PY_LINES_1)
    assert not covdata.has_arcs()

def assert_arcs3_data(covdata: CoverageData) -> None:
    """Check that `covdata` has the data from ARCS3."""
    assert_line_counts(covdata, SUMMARY_3)
    assert_measured_files(covdata, MEASURED_FILES_3)
    assert_count_equal(covdata.lines("x.py"), X_PY_LINES_3)
    assert_count_equal(covdata.arcs("x.py"), X_PY_ARCS_3)
    assert_count_equal(covdata.lines("y.py"), Y_PY_LINES_3)
    assert_count_equal(covdata.arcs("y.py"), Y_PY_ARCS_3)
    assert covdata.has_arcs()


TData = TypeVar("TData", bound=Union[TLineNo, TArc])

def dicts_from_sets(file_data: Dict[str, Set[TData]]) -> Dict[str, Dict[TData, None]]:
    """Convert a dict of sets into a dict of dicts.

    Before 6.0, file data was a dict with None as the values.  In 6.0, file
    data is a set.  SqlData all along only cared that it was an iterable.
    This function helps us test that the old dict format still works.
    """
    return {k: dict.fromkeys(v) for k, v in file_data.items()}


class CoverageDataTest(CoverageTest):
    """Test cases for CoverageData."""

    def test_empty_data_is_false(self) -> None:
        covdata = DebugCoverageData()
        assert not covdata
        self.assert_doesnt_exist(".coverage")

    def test_empty_data_is_false_when_read(self) -> None:
        covdata = DebugCoverageData()
        covdata.read()
        assert not covdata
        self.assert_doesnt_exist(".coverage")

    def test_line_data_is_true(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines(LINES_1)
        assert covdata

    def test_arc_data_is_true(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs(ARCS_3)
        assert covdata

    def test_empty_line_data_is_false(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines({})
        assert not covdata

    def test_empty_arc_data_is_false(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs({})
        assert not covdata

    @pytest.mark.parametrize("lines", [LINES_1, dicts_from_sets(LINES_1)])
    def test_adding_lines(self, lines: Mapping[str, Collection[TLineNo]]) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines(lines)
        assert_lines1_data(covdata)

    @pytest.mark.parametrize("arcs", [ARCS_3, dicts_from_sets(ARCS_3)])
    def test_adding_arcs(self, arcs: Mapping[str, Collection[TArc]]) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs(arcs)
        assert_arcs3_data(covdata)

    def test_ok_to_add_lines_twice(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines(LINES_1)
        covdata.add_lines(LINES_2)
        assert_line_counts(covdata, SUMMARY_1_2)
        assert_measured_files(covdata, MEASURED_FILES_1_2)

    def test_ok_to_add_arcs_twice(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs(ARCS_3)
        covdata.add_arcs(ARCS_4)
        assert_line_counts(covdata, SUMMARY_3_4)
        assert_measured_files(covdata, MEASURED_FILES_3_4)

    def test_ok_to_add_empty_arcs(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs(ARCS_3)
        covdata.add_arcs(ARCS_4)
        covdata.add_arcs(dict.fromkeys(ARCS_3, set()))
        assert_line_counts(covdata, SUMMARY_3_4)
        assert_measured_files(covdata, MEASURED_FILES_3_4)

    @pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
    def test_cant_add_arcs_with_lines(self, klass: TCoverageData) -> None:
        covdata = klass()
        covdata.add_lines(LINES_1)
        msg = "Can't add branch measurements to existing line data"
        with pytest.raises(DataError, match=msg):
            covdata.add_arcs(ARCS_3)

    @pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
    def test_cant_add_lines_with_arcs(self, klass: TCoverageData) -> None:
        covdata = klass()
        covdata.add_arcs(ARCS_3)
        msg = "Can't add line measurements to existing branch data"
        with pytest.raises(DataError, match=msg):
            covdata.add_lines(LINES_1)

    def test_touch_file_with_lines(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines(LINES_1)
        covdata.touch_file('zzz.py')
        assert_measured_files(covdata, MEASURED_FILES_1 + ['zzz.py'])

    def test_touch_file_with_arcs(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs(ARCS_3)
        covdata.touch_file('zzz.py')
        assert_measured_files(covdata, MEASURED_FILES_3 + ['zzz.py'])

    def test_set_query_contexts(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context('test_a')
        covdata.add_lines(LINES_1)
        covdata.set_query_contexts(['te.*a'])
        assert covdata.lines('a.py') == [1, 2]
        covdata.set_query_contexts(['other'])
        assert covdata.lines('a.py') == []

    def test_no_lines_vs_unmeasured_file(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines(LINES_1)
        covdata.touch_file('zzz.py')
        assert covdata.lines('zzz.py') == []
        assert covdata.lines('no_such_file.py') is None

    def test_lines_with_contexts(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context('test_a')
        covdata.add_lines(LINES_1)
        assert covdata.lines('a.py') == [1, 2]
        covdata.set_query_contexts(['test'])
        assert covdata.lines('a.py') == [1, 2]
        covdata.set_query_contexts(['other'])
        assert covdata.lines('a.py') == []

    def test_contexts_by_lineno_with_lines(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context('test_a')
        covdata.add_lines(LINES_1)
        expected = {1: ['test_a'], 2: ['test_a']}
        assert covdata.contexts_by_lineno('a.py') == expected

    @pytest.mark.parametrize("lines", [LINES_1, dicts_from_sets(LINES_1)])
    def test_no_duplicate_lines(self, lines: Mapping[str, Collection[TLineNo]]) -> None:
        covdata = DebugCoverageData()
        covdata.set_context("context1")
        covdata.add_lines(lines)
        covdata.set_context("context2")
        covdata.add_lines(lines)
        assert covdata.lines('a.py') == A_PY_LINES_1

    @pytest.mark.parametrize("arcs", [ARCS_3, dicts_from_sets(ARCS_3)])
    def test_no_duplicate_arcs(self, arcs: Mapping[str, Collection[TArc]]) -> None:
        covdata = DebugCoverageData()
        covdata.set_context("context1")
        covdata.add_arcs(arcs)
        covdata.set_context("context2")
        covdata.add_arcs(arcs)
        assert covdata.arcs('x.py') == X_PY_ARCS_3

    def test_no_arcs_vs_unmeasured_file(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs(ARCS_3)
        covdata.touch_file('zzz.py')
        assert covdata.lines('zzz.py') == []
        assert covdata.lines('no_such_file.py') is None
        assert covdata.arcs('zzz.py') == []
        assert covdata.arcs('no_such_file.py') is None

    def test_arcs_with_contexts(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context('test_x')
        covdata.add_arcs(ARCS_3)
        assert covdata.arcs('x.py') == [(-1, 1), (1, 2), (2, 3), (3, -1)]
        covdata.set_query_contexts(['test_.$'])
        assert covdata.arcs('x.py') == [(-1, 1), (1, 2), (2, 3), (3, -1)]
        covdata.set_query_contexts(['other'])
        assert covdata.arcs('x.py') == []

    def test_contexts_by_lineno_with_arcs(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context('test_x')
        covdata.add_arcs(ARCS_3)
        expected = {1: ['test_x'], 2: ['test_x'], 3: ['test_x']}
        assert covdata.contexts_by_lineno('x.py') == expected

    def test_contexts_by_lineno_with_unknown_file(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context('test_x')
        covdata.add_arcs(ARCS_3)
        assert covdata.contexts_by_lineno('xyz.py') == {}

    def test_context_by_lineno_with_query_contexts_with_lines(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context("test_1")
        covdata.add_lines(LINES_1)
        covdata.set_context("test_2")
        covdata.add_lines(LINES_2)
        covdata.set_query_context("test_1")
        assert covdata.contexts_by_lineno("a.py") == dict.fromkeys([1,2], ["test_1"])

    def test_context_by_lineno_with_query_contexts_with_arcs(self) -> None:
        covdata = DebugCoverageData()
        covdata.set_context("test_1")
        covdata.add_arcs(ARCS_3)
        covdata.set_context("test_2")
        covdata.add_arcs(ARCS_4)
        covdata.set_query_context("test_1")
        assert covdata.contexts_by_lineno("x.py") == dict.fromkeys([1,2,3], ["test_1"])

    def test_file_tracer_name(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines({
            "p1.foo": [1, 2, 3],
            "p2.html": [10, 11, 12],
            "main.py": [20],
        })
        covdata.add_file_tracers({"p1.foo": "p1.plugin", "p2.html": "p2.plugin"})
        assert covdata.file_tracer("p1.foo") == "p1.plugin"
        assert covdata.file_tracer("p2.html") == "p2.plugin"
        assert covdata.file_tracer("main.py") == ""
        assert covdata.file_tracer("p3.not_here") is None

    def test_ok_to_repeat_file_tracer(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines({
            "p1.foo": [1, 2, 3],
            "p2.html": [10, 11, 12],
        })
        covdata.add_file_tracers({"p1.foo": "p1.plugin", "p2.html": "p2.plugin"})
        covdata.add_file_tracers({"p1.foo": "p1.plugin"})
        assert covdata.file_tracer("p1.foo") == "p1.plugin"

    def test_ok_to_set_empty_file_tracer(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines({
            "p1.foo": [1, 2, 3],
            "p2.html": [10, 11, 12],
            "main.py": [20],
        })
        covdata.add_file_tracers({"p1.foo": "p1.plugin", "main.py": ""})
        assert covdata.file_tracer("p1.foo") == "p1.plugin"
        assert covdata.file_tracer("main.py") == ""

    def test_cant_change_file_tracer_name(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines({"p1.foo": [1, 2, 3]})
        covdata.add_file_tracers({"p1.foo": "p1.plugin"})

        msg = "Conflicting file tracer name for 'p1.foo': 'p1.plugin' vs 'p1.plugin.foo'"
        with pytest.raises(DataError, match=msg):
            covdata.add_file_tracers({"p1.foo": "p1.plugin.foo"})

    def test_update_lines(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_lines(LINES_1)

        covdata2 = DebugCoverageData(suffix='2')
        covdata2.add_lines(LINES_2)

        covdata3 = DebugCoverageData(suffix='3')
        covdata3.update(covdata1)
        covdata3.update(covdata2)

        assert_line_counts(covdata3, SUMMARY_1_2)
        assert_measured_files(covdata3, MEASURED_FILES_1_2)

    def test_update_arcs(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_arcs(ARCS_3)

        covdata2 = DebugCoverageData(suffix='2')
        covdata2.add_arcs(ARCS_4)

        covdata3 = DebugCoverageData(suffix='3')
        covdata3.update(covdata1)
        covdata3.update(covdata2)

        assert_line_counts(covdata3, SUMMARY_3_4)
        assert_measured_files(covdata3, MEASURED_FILES_3_4)

    def test_update_cant_mix_lines_and_arcs(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_lines(LINES_1)

        covdata2 = DebugCoverageData(suffix='2')
        covdata2.add_arcs(ARCS_3)

        with pytest.raises(DataError, match="Can't combine arc data with line data"):
            covdata1.update(covdata2)

        with pytest.raises(DataError, match="Can't combine line data with arc data"):
            covdata2.update(covdata1)

    def test_update_file_tracers(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_lines({
            "p1.html": [1, 2, 3, 4],
            "p2.html": [5, 6, 7],
            "main.py": [10, 11, 12],
        })
        covdata1.add_file_tracers({
            "p1.html": "html.plugin",
            "p2.html": "html.plugin2",
        })

        covdata2 = DebugCoverageData(suffix='2')
        covdata2.add_lines({
            "p1.html": [3, 4, 5, 6],
            "p2.html": [7, 8, 9],
            "p3.foo": [1000, 1001],
            "main.py": [10, 11, 12],
        })
        covdata2.add_file_tracers({
            "p1.html": "html.plugin",
            "p2.html": "html.plugin2",
            "p3.foo": "foo_plugin",
        })

        covdata3 = DebugCoverageData(suffix='3')
        covdata3.update(covdata1)
        covdata3.update(covdata2)
        assert covdata3.file_tracer("p1.html") == "html.plugin"
        assert covdata3.file_tracer("p2.html") == "html.plugin2"
        assert covdata3.file_tracer("p3.foo") == "foo_plugin"
        assert covdata3.file_tracer("main.py") == ""

    def test_update_conflicting_file_tracers(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_lines({"p1.html": [1, 2, 3]})
        covdata1.add_file_tracers({"p1.html": "html.plugin"})

        covdata2 = DebugCoverageData(suffix='2')
        covdata2.add_lines({"p1.html": [1, 2, 3]})
        covdata2.add_file_tracers({"p1.html": "html.other_plugin"})

        msg = "Conflicting file tracer name for 'p1.html': 'html.plugin' vs 'html.other_plugin'"
        with pytest.raises(DataError, match=msg):
            covdata1.update(covdata2)

        msg = "Conflicting file tracer name for 'p1.html': 'html.other_plugin' vs 'html.plugin'"
        with pytest.raises(DataError, match=msg):
            covdata2.update(covdata1)

    def test_update_file_tracer_vs_no_file_tracer(self) -> None:
        covdata1 = DebugCoverageData(suffix="1")
        covdata1.add_lines({"p1.html": [1, 2, 3]})
        covdata1.add_file_tracers({"p1.html": "html.plugin"})

        covdata2 = DebugCoverageData(suffix="2")
        covdata2.add_lines({"p1.html": [1, 2, 3]})

        msg = "Conflicting file tracer name for 'p1.html': 'html.plugin' vs ''"
        with pytest.raises(DataError, match=msg):
            covdata1.update(covdata2)

        msg = "Conflicting file tracer name for 'p1.html': '' vs 'html.plugin'"
        with pytest.raises(DataError, match=msg):
            covdata2.update(covdata1)

    def test_update_lines_empty(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_lines(LINES_1)

        covdata2 = DebugCoverageData(suffix='2')
        covdata1.update(covdata2)
        assert_line_counts(covdata1, SUMMARY_1)

    def test_update_arcs_empty(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_arcs(ARCS_3)

        covdata2 = DebugCoverageData(suffix='2')
        covdata1.update(covdata2)
        assert_line_counts(covdata1, SUMMARY_3)

    def test_asking_isnt_measuring(self) -> None:
        # Asking about an unmeasured file shouldn't make it seem measured.
        covdata = DebugCoverageData()
        assert_measured_files(covdata, [])
        assert covdata.arcs("missing.py") is None
        assert_measured_files(covdata, [])

    def test_add_to_hash_with_lines(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines(LINES_1)
        hasher = mock.Mock()
        add_data_to_hash(covdata, "a.py", hasher)
        assert hasher.method_calls == [
            mock.call.update([1, 2]),   # lines
            mock.call.update(""),       # file_tracer name
        ]

    def test_add_to_hash_with_arcs(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs(ARCS_3)
        covdata.add_file_tracers({"y.py": "hologram_plugin"})
        hasher = mock.Mock()
        add_data_to_hash(covdata, "y.py", hasher)
        assert hasher.method_calls == [
            mock.call.update([(-1, 17), (17, 23), (23, -1)]),   # arcs
            mock.call.update("hologram_plugin"),                # file_tracer name
        ]

    def test_add_to_lines_hash_with_missing_file(self) -> None:
        # https://github.com/nedbat/coveragepy/issues/403
        covdata = DebugCoverageData()
        covdata.add_lines(LINES_1)
        hasher = mock.Mock()
        add_data_to_hash(covdata, "missing.py", hasher)
        assert hasher.method_calls == [
            mock.call.update([]),
            mock.call.update(None),
        ]

    def test_add_to_arcs_hash_with_missing_file(self) -> None:
        # https://github.com/nedbat/coveragepy/issues/403
        covdata = DebugCoverageData()
        covdata.add_arcs(ARCS_3)
        covdata.add_file_tracers({"y.py": "hologram_plugin"})
        hasher = mock.Mock()
        add_data_to_hash(covdata, "missing.py", hasher)
        assert hasher.method_calls == [
            mock.call.update([]),
            mock.call.update(None),
        ]

    def test_empty_lines_are_still_lines(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines({})
        covdata.touch_file("abc.py")
        assert not covdata.has_arcs()

    def test_empty_arcs_are_still_arcs(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_arcs({})
        covdata.touch_file("abc.py")
        assert covdata.has_arcs()

    def test_cant_touch_in_empty_data(self) -> None:
        covdata = DebugCoverageData()
        msg = "Can't touch files in an empty CoverageData"
        with pytest.raises(DataError, match=msg):
            covdata.touch_file("abc.py")

    def test_read_and_write_are_opposites(self) -> None:
        covdata1 = DebugCoverageData()
        covdata1.add_arcs(ARCS_3)
        covdata1.write()

        covdata2 = DebugCoverageData()
        covdata2.read()
        assert_arcs3_data(covdata2)

    def test_thread_stress(self) -> None:
        covdata = DebugCoverageData()
        exceptions = []

        def thread_main() -> None:
            """Every thread will try to add the same data."""
            try:
                covdata.add_lines(LINES_1)
            except Exception as ex:         # pragma: only failure
                exceptions.append(ex)

        threads = [threading.Thread(target=thread_main) for _ in range(10)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()

        assert_lines1_data(covdata)
        assert not exceptions

    def test_purge_files_lines(self) -> None:
        covdata = DebugCoverageData()
        covdata.add_lines(LINES_1)
        covdata.add_lines(LINES_2)
        assert_line_counts(covdata, SUMMARY_1_2)
        covdata.purge_files(["a.py", "b.py"])
        assert_line_counts(covdata, {"a.py": 0, "b.py": 0, "c.py": 1})
        covdata.purge_files(["c.py"])
        assert_line_counts(covdata, {"a.py": 0, "b.py": 0, "c.py": 0})
        # It's OK to "purge" a file that wasn't measured.
        covdata.purge_files(["xyz.py"])
        assert_line_counts(covdata, {"a.py": 0, "b.py": 0, "c.py": 0})

    def test_purge_files_arcs(self) -> None:
        covdata = CoverageData()
        covdata.add_arcs(ARCS_3)
        covdata.add_arcs(ARCS_4)
        assert_line_counts(covdata, SUMMARY_3_4)
        covdata.purge_files(["x.py", "y.py"])
        assert_line_counts(covdata, {"x.py": 0, "y.py": 0, "z.py": 1})
        covdata.purge_files(["z.py"])
        assert_line_counts(covdata, {"x.py": 0, "y.py": 0, "z.py": 0})

    def test_cant_purge_in_empty_data(self) -> None:
        covdata = DebugCoverageData()
        msg = "Can't purge files in an empty CoverageData"
        with pytest.raises(DataError, match=msg):
            covdata.purge_files(["abc.py"])


class CoverageDataInTempDirTest(CoverageTest):
    """Tests of CoverageData that need a temporary directory to make files."""

    @pytest.mark.parametrize("file_class", FilePathClasses)
    def test_read_write_lines(self, file_class: FilePathType) -> None:
        self.assert_doesnt_exist("lines.dat")
        covdata1 = DebugCoverageData(file_class("lines.dat"))
        covdata1.add_lines(LINES_1)
        covdata1.write()
        self.assert_exists("lines.dat")

        covdata2 = DebugCoverageData("lines.dat")
        covdata2.read()
        assert_lines1_data(covdata2)

    def test_read_write_arcs(self) -> None:
        covdata1 = DebugCoverageData("arcs.dat")
        covdata1.add_arcs(ARCS_3)
        covdata1.write()

        covdata2 = DebugCoverageData("arcs.dat")
        covdata2.read()
        assert_arcs3_data(covdata2)

    def test_read_errors(self) -> None:
        self.make_file("xyzzy.dat", "xyzzy")
        with pytest.raises(DataError, match=r"Couldn't .* '.*[/\\]xyzzy.dat': \S+"):
            covdata = DebugCoverageData("xyzzy.dat")
            covdata.read()
        assert not covdata

    def test_hard_read_error(self) -> None:
        self.make_file("noperms.dat", "go away")
        os.chmod("noperms.dat", 0)
        with pytest.raises(DataError, match=r"Couldn't .* '.*[/\\]noperms.dat': \S+"):
            covdata = DebugCoverageData("noperms.dat")
            covdata.read()

    @pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
    def test_error_when_closing(self, klass: TCoverageData) -> None:
        msg = r"Couldn't .* '.*[/\\]flaked.dat': \S+"
        with pytest.raises(DataError, match=msg):
            covdata = klass("flaked.dat")
            covdata.add_lines(LINES_1)
            # I don't know how to make a real error, so let's fake one.
            sqldb = list(covdata._dbs.values())[0]
            sqldb.close = lambda: 1/0       # type: ignore[assignment]
            covdata.add_lines(LINES_1)

    def test_wrong_schema_version(self) -> None:
        with sqlite3.connect("wrong_schema.db") as con:
            con.execute("create table coverage_schema (version integer)")
            con.execute("insert into coverage_schema (version) values (99)")
        msg = r"Couldn't .* '.*[/\\]wrong_schema.db': wrong schema: 99 instead of \d+"
        with pytest.raises(DataError, match=msg):
            covdata = DebugCoverageData("wrong_schema.db")
            covdata.read()
        assert not covdata

    def test_wrong_schema_schema(self) -> None:
        with sqlite3.connect("wrong_schema_schema.db") as con:
            con.execute("create table coverage_schema (xyzzy integer)")
            con.execute("insert into coverage_schema (xyzzy) values (99)")
        msg = r"Data file .* doesn't seem to be a coverage data file: .* no such column"
        with pytest.raises(DataError, match=msg):
            covdata = DebugCoverageData("wrong_schema_schema.db")
            covdata.read()
        assert not covdata


class CoverageDataFilesTest(CoverageTest):
    """Tests of CoverageData file handling."""

    def test_reading_missing(self) -> None:
        self.assert_doesnt_exist(".coverage")
        covdata = DebugCoverageData()
        covdata.read()
        assert_line_counts(covdata, {})

    def test_writing_and_reading(self) -> None:
        covdata1 = DebugCoverageData()
        covdata1.add_lines(LINES_1)
        covdata1.write()

        covdata2 = DebugCoverageData()
        covdata2.read()
        assert_line_counts(covdata2, SUMMARY_1)

    def test_debug_output_with_debug_option(self) -> None:
        # With debug option dataio, we get debug output about reading and
        # writing files.
        debug = DebugControlString(options=["dataio"])
        covdata1 = CoverageData(debug=debug)
        covdata1.add_lines(LINES_1)
        covdata1.write()

        covdata2 = CoverageData(debug=debug)
        covdata2.read()
        assert_line_counts(covdata2, SUMMARY_1)

        assert re.search(
            r"^Erasing data file '.*\.coverage'\n" +
            r"Opening data file '.*\.coverage'\n" +
            r"Initing data file '.*\.coverage'\n" +
            r"Opening data file '.*\.coverage'\n$",
            debug.get_output()
        )

    def test_debug_output_without_debug_option(self) -> None:
        # With a debug object, but not the dataio option, we don't get debug
        # output.
        debug = DebugControlString(options=[])
        covdata1 = CoverageData(debug=debug)
        covdata1.add_lines(LINES_1)
        covdata1.write()

        covdata2 = CoverageData(debug=debug)
        covdata2.read()
        assert_line_counts(covdata2, SUMMARY_1)

        assert debug.get_output() == ""

    def test_explicit_suffix(self) -> None:
        self.assert_doesnt_exist(".coverage.SUFFIX")
        covdata = DebugCoverageData(suffix='SUFFIX')
        covdata.add_lines(LINES_1)
        covdata.write()
        self.assert_exists(".coverage.SUFFIX")
        self.assert_doesnt_exist(".coverage")

    def test_true_suffix(self) -> None:
        self.assert_file_count(".coverage.*", 0)

        # suffix=True will make a randomly named data file.
        covdata1 = DebugCoverageData(suffix=True)
        covdata1.add_lines(LINES_1)
        covdata1.write()
        self.assert_doesnt_exist(".coverage")
        data_files1 = glob.glob(".coverage.*")
        assert len(data_files1) == 1

        # Another suffix=True will choose a different name.
        covdata2 = DebugCoverageData(suffix=True)
        covdata2.add_lines(LINES_1)
        covdata2.write()
        self.assert_doesnt_exist(".coverage")
        data_files2 = glob.glob(".coverage.*")
        assert len(data_files2) == 2

        # In addition to being different, the suffixes have the pid in them.
        assert all(str(os.getpid()) in fn for fn in data_files2)

    def test_combining(self) -> None:
        self.assert_file_count(".coverage.*", 0)

        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_lines(LINES_1)
        covdata1.write()
        self.assert_exists(".coverage.1")
        self.assert_file_count(".coverage.*", 1)

        covdata2 = DebugCoverageData(suffix='2')
        covdata2.add_lines(LINES_2)
        covdata2.write()
        self.assert_exists(".coverage.2")
        self.assert_file_count(".coverage.*", 2)

        covdata3 = DebugCoverageData()
        combine_parallel_data(covdata3)
        assert_line_counts(covdata3, SUMMARY_1_2)
        assert_measured_files(covdata3, MEASURED_FILES_1_2)
        self.assert_file_count(".coverage.*", 0)

    def test_erasing(self) -> None:
        covdata1 = DebugCoverageData()
        covdata1.add_lines(LINES_1)
        covdata1.write()

        covdata1.erase()
        assert_line_counts(covdata1, {})

        covdata2 = DebugCoverageData()
        covdata2.read()
        assert_line_counts(covdata2, {})

    def test_erasing_parallel(self) -> None:
        self.make_file("datafile.1")
        self.make_file("datafile.2")
        self.make_file(".coverage")
        data = DebugCoverageData("datafile")
        data.erase(parallel=True)
        self.assert_file_count("datafile.*", 0)
        self.assert_exists(".coverage")

    def test_combining_with_aliases(self) -> None:
        covdata1 = DebugCoverageData(suffix='1')
        covdata1.add_lines({
            '/home/ned/proj/src/a.py': {1, 2},
            '/home/ned/proj/src/sub/b.py': {3},
            '/home/ned/proj/src/template.html': {10},
        })
        covdata1.add_file_tracers({
            '/home/ned/proj/src/template.html': 'html.plugin',
        })
        covdata1.write()

        covdata2 = DebugCoverageData(suffix='2')
        covdata2.add_lines({
            r'c:\ned\test\a.py': {4, 5},
            r'c:\ned\test\sub\b.py': {3, 6},
        })
        covdata2.write()

        self.assert_file_count(".coverage.*", 2)

        self.make_file("a.py", "")
        self.make_file("sub/b.py", "")
        self.make_file("template.html", "")
        covdata3 = DebugCoverageData()
        aliases = PathAliases()
        aliases.add("/home/ned/proj/src/", "./")
        aliases.add(r"c:\ned\test", "./")
        combine_parallel_data(covdata3, aliases=aliases)
        self.assert_file_count(".coverage.*", 0)
        self.assert_exists(".coverage")

        apy = canonical_filename('./a.py')
        sub_bpy = canonical_filename('./sub/b.py')
        template_html = canonical_filename('./template.html')

        assert_line_counts(covdata3, {apy: 4, sub_bpy: 2, template_html: 1}, fullpath=True)
        assert_measured_files(covdata3, [apy, sub_bpy, template_html])
        assert covdata3.file_tracer(template_html) == 'html.plugin'

    def test_combining_from_different_directories(self) -> None:
        os.makedirs('cov1')
        covdata1 = DebugCoverageData('cov1/.coverage.1')
        covdata1.add_lines(LINES_1)
        covdata1.write()

        os.makedirs('cov2')
        covdata2 = DebugCoverageData('cov2/.coverage.2')
        covdata2.add_lines(LINES_2)
        covdata2.write()

        # This data won't be included.
        covdata_xxx = DebugCoverageData('.coverage.xxx')
        covdata_xxx.add_arcs(ARCS_3)
        covdata_xxx.write()

        covdata3 = DebugCoverageData()
        combine_parallel_data(covdata3, data_paths=['cov1', 'cov2'])

        assert_line_counts(covdata3, SUMMARY_1_2)
        assert_measured_files(covdata3, MEASURED_FILES_1_2)
        self.assert_doesnt_exist("cov1/.coverage.1")
        self.assert_doesnt_exist("cov2/.coverage.2")
        self.assert_exists(".coverage.xxx")

    def test_combining_from_files(self) -> None:
        os.makedirs('cov1')
        covdata1 = DebugCoverageData('cov1/.coverage.1')
        covdata1.add_lines(LINES_1)
        covdata1.write()

        os.makedirs('cov2')
        covdata2 = DebugCoverageData('cov2/.coverage.2')
        covdata2.add_lines(LINES_2)
        covdata2.write()

        # This data won't be included.
        covdata_xxx = DebugCoverageData('.coverage.xxx')
        covdata_xxx.add_arcs(ARCS_3)
        covdata_xxx.write()

        covdata_2xxx = DebugCoverageData('cov2/.coverage.xxx')
        covdata_2xxx.add_arcs(ARCS_3)
        covdata_2xxx.write()

        covdata3 = DebugCoverageData()
        combine_parallel_data(covdata3, data_paths=['cov1', 'cov2/.coverage.2'])

        assert_line_counts(covdata3, SUMMARY_1_2)
        assert_measured_files(covdata3, MEASURED_FILES_1_2)
        self.assert_doesnt_exist("cov1/.coverage.1")
        self.assert_doesnt_exist("cov2/.coverage.2")
        self.assert_exists(".coverage.xxx")
        self.assert_exists("cov2/.coverage.xxx")

    def test_combining_from_nonexistent_directories(self) -> None:
        covdata = DebugCoverageData()
        msg = "Couldn't combine from non-existent path 'xyzzy'"
        with pytest.raises(NoDataError, match=msg):
            combine_parallel_data(covdata, data_paths=['xyzzy'])

    def test_interleaved_erasing_bug716(self) -> None:
        # pytest-cov could produce this scenario. #716
        covdata1 = DebugCoverageData()
        covdata2 = DebugCoverageData()
        # this used to create the .coverage database file..
        covdata2.set_context("")
        # then this would erase it all..
        covdata1.erase()
        # then this would try to use tables that no longer exist.
        # "no such table: meta"
        covdata2.add_lines(LINES_1)

    @pytest.mark.parametrize(
        "dpart, fpart",
        [
            ("", "[b-a]"),
            ("[3-1]", ""),
            ("[3-1]", "[b-a]"),
        ],
    )
    def test_combining_with_crazy_filename(self, dpart: str, fpart: str) -> None:
        dirname = f"py{dpart}"
        basename = f"{dirname}/.coverage{fpart}"
        os.makedirs(dirname)

        covdata1 = CoverageData(basename=basename, suffix="1")
        covdata1.add_lines(LINES_1)
        covdata1.write()

        covdata2 = CoverageData(basename=basename, suffix="2")
        covdata2.add_lines(LINES_2)
        covdata2.write()

        covdata3 = CoverageData(basename=basename)
        combine_parallel_data(covdata3)
        assert_line_counts(covdata3, SUMMARY_1_2)
        assert_measured_files(covdata3, MEASURED_FILES_1_2)
        self.assert_file_count(glob.escape(basename) + ".*", 0)

    def test_meta_data(self) -> None:
        # The metadata written to the data file shouldn't interfere with
        # hashing to remove duplicates, except for debug=process, which
        # writes debugging info as metadata.
        debug = DebugControlString(options=[])
        covdata1 = CoverageData(basename="meta.1", debug=debug)
        covdata1.add_lines(LINES_1)
        covdata1.write()
        with sqlite3.connect("meta.1") as con:
            data = sorted(k for (k,) in con.execute("select key from meta"))
        assert data == ["has_arcs", "version"]

        debug = DebugControlString(options=["process"])
        covdata2 = CoverageData(basename="meta.2", debug=debug)
        covdata2.add_lines(LINES_1)
        covdata2.write()
        with sqlite3.connect("meta.2") as con:
            data = sorted(k for (k,) in con.execute("select key from meta"))
        assert data == ["has_arcs", "sys_argv", "version", "when"]


class DumpsLoadsTest(CoverageTest):
    """Tests of CoverageData.dumps and loads."""

    run_in_temp_dir = False

    @pytest.mark.parametrize("klass", [CoverageData, DebugCoverageData])
    def test_serialization(self, klass: TCoverageData) -> None:
        covdata1 = klass(no_disk=True)
        covdata1.add_lines(LINES_1)
        covdata1.add_lines(LINES_2)
        serial = covdata1.dumps()

        covdata2 = klass(no_disk=True)
        covdata2.loads(serial)
        assert_line_counts(covdata2, SUMMARY_1_2)
        assert_measured_files(covdata2, MEASURED_FILES_1_2)

    def test_misfed_serialization(self) -> None:
        covdata = CoverageData(no_disk=True)
        bad_data = b'Hello, world!\x07 ' + b'z' * 100
        msg = r"Unrecognized serialization: {} \(head of {} bytes\)".format(
            re.escape(repr(bad_data[:40])),
            len(bad_data),
        )
        with pytest.raises(DataError, match=msg):
            covdata.loads(bad_data)


class NoDiskTest(CoverageTest):
    """Tests of in-memory CoverageData."""

    run_in_temp_dir = False

    def test_updating(self) -> None:
        # https://github.com/nedbat/coveragepy/issues/1323
        a = CoverageData(no_disk=True)
        a.add_lines({'foo.py': [10, 20, 30]})
        assert a.measured_files() == {'foo.py'}

        b = CoverageData(no_disk=True)
        b.update(a)
        assert b.measured_files() == {'foo.py'}