Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Improve the tests for relations #12113

Merged
merged 9 commits into from
Mar 2, 2022
Prev Previous commit
Next Next commit
Factor out the redaction tests cases.
clokep committed Mar 1, 2022
commit 6e271771711b71ec6b95d86e7f5421b2d2536b9f
354 changes: 180 additions & 174 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@
from tests.test_utils.event_injection import inject_event


class RelationsTestCase(unittest.HomeserverTestCase):
class BaseRelationsTestCase(unittest.HomeserverTestCase):
servlets = [
relations.register_servlets,
room.register_servlets,
@@ -66,6 +66,60 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
res = self.helper.send(self.room, body="Hi!", tok=self.user_token)
self.parent_id = res["event_id"]

def _create_user(self, localpart: str) -> Tuple[str, str]:
user_id = self.register_user(localpart, "abc123")
access_token = self.login(localpart, "abc123")

return user_id, access_token

def _send_relation(
self,
relation_type: str,
event_type: str,
key: Optional[str] = None,
content: Optional[dict] = None,
access_token: Optional[str] = None,
parent_id: Optional[str] = None,
) -> FakeChannel:
"""Helper function to send a relation pointing at `self.parent_id`
Args:
relation_type: One of `RelationTypes`
event_type: The type of the event to create
key: The aggregation key used for m.annotation relation type.
content: The content of the created event. Will be modified to configure
the m.relates_to key based on the other provided parameters.
access_token: The access token used to send the relation, defaults
to `self.user_token`
parent_id: The event_id this relation relates to. If None, then self.parent_id
Returns:
FakeChannel
"""
if not access_token:
access_token = self.user_token

original_id = parent_id if parent_id else self.parent_id

if content is None:
content = {}
content["m.relates_to"] = {
"event_id": original_id,
"rel_type": relation_type,
}
if key is not None:
content["m.relates_to"]["key"] = key

channel = self.make_request(
"POST",
f"/_matrix/client/v3/rooms/{self.room}/send/{event_type}",
content,
access_token=access_token,
)
return channel


class RelationsTestCase(BaseRelationsTestCase):
def test_send_relation(self) -> None:
"""Tests that sending a relation works."""

@@ -553,39 +607,6 @@ def test_aggregation(self) -> None:
},
)

def test_aggregation_redactions(self) -> None:
"""Test that annotations get correctly aggregated after a redaction."""

channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a")
self.assertEqual(200, channel.code, channel.json_body)
to_redact_event_id = channel.json_body["event_id"]

channel = self._send_relation(
RelationTypes.ANNOTATION, "m.reaction", "a", access_token=self.user2_token
)
self.assertEqual(200, channel.code, channel.json_body)

# Now lets redact one of the 'a' reactions
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{self.room}/redact/{to_redact_event_id}",
access_token=self.user_token,
content={},
)
self.assertEqual(200, channel.code, channel.json_body)

channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/aggregations/{self.parent_id}",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

self.assertEqual(
channel.json_body,
{"chunk": [{"type": "m.reaction", "key": "a", "count": 1}]},
)

def test_aggregation_must_be_annotation(self) -> None:
"""Test that aggregations must be annotations."""

@@ -1207,94 +1228,6 @@ def test_edit_edit(self) -> None:
{"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
)

def test_relations_redaction_redacts_edits(self) -> None:
"""Test that edits of an event are redacted when the original event
is redacted.
"""
# Send a new event
res = self.helper.send(self.room, body="Heyo!", tok=self.user_token)
original_event_id = res["event_id"]

# Add a relation
channel = self._send_relation(
RelationTypes.REPLACE,
"m.room.message",
parent_id=original_event_id,
content={
"msgtype": "m.text",
"body": "Wibble",
"m.new_content": {"msgtype": "m.text", "body": "First edit"},
},
)
self.assertEqual(200, channel.code, channel.json_body)

# Check the relation is returned
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations"
f"/{original_event_id}/m.replace/m.room.message",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

self.assertIn("chunk", channel.json_body)
self.assertEqual(len(channel.json_body["chunk"]), 1)

# Redact the original event
channel = self.make_request(
"PUT",
f"/rooms/{self.room}/redact/{original_event_id}/test_relations_redaction_redacts_edits",
access_token=self.user_token,
content="{}",
)
self.assertEqual(200, channel.code, channel.json_body)

# Try to check for remaining m.replace relations
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{original_event_id}/m.replace/m.room.message",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

# Check that no relations are returned
self.assertIn("chunk", channel.json_body)
self.assertEqual(channel.json_body["chunk"], [])

def test_aggregations_redaction_prevents_access_to_aggregations(self) -> None:
"""Test that annotations of an event are redacted when the original event
is redacted.
"""
# Send a new event
res = self.helper.send(self.room, body="Hello!", tok=self.user_token)
original_event_id = res["event_id"]

# Add a relation
channel = self._send_relation(
RelationTypes.ANNOTATION, "m.reaction", key="👍", parent_id=original_event_id
)
self.assertEqual(200, channel.code, channel.json_body)

# Redact the original
channel = self.make_request(
"PUT",
f"/rooms/{self.room}/redact/{original_event_id}/test_aggregations_redaction_prevents_access_to_aggregations",
access_token=self.user_token,
content="{}",
)
self.assertEqual(200, channel.code, channel.json_body)

# Check that aggregations returns zero
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/aggregations/{original_event_id}/m.annotation/m.reaction",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

self.assertIn("chunk", channel.json_body)
self.assertEqual(channel.json_body["chunk"], [])

def test_unknown_relations(self) -> None:
"""Unknown relations should be accepted."""
channel = self._send_relation("m.relation.test", "m.room.test")
@@ -1349,58 +1282,6 @@ def _find_event_in_chunk(self, events: List[JsonDict]) -> JsonDict:

raise AssertionError(f"Event {self.parent_id} not found in chunk")

def _send_relation(
self,
relation_type: str,
event_type: str,
key: Optional[str] = None,
content: Optional[dict] = None,
access_token: Optional[str] = None,
parent_id: Optional[str] = None,
) -> FakeChannel:
"""Helper function to send a relation pointing at `self.parent_id`
Args:
relation_type: One of `RelationTypes`
event_type: The type of the event to create
key: The aggregation key used for m.annotation relation type.
content: The content of the created event. Will be modified to configure
the m.relates_to key based on the other provided parameters.
access_token: The access token used to send the relation, defaults
to `self.user_token`
parent_id: The event_id this relation relates to. If None, then self.parent_id
Returns:
FakeChannel
"""
if not access_token:
access_token = self.user_token

original_id = parent_id if parent_id else self.parent_id

if content is None:
content = {}
content["m.relates_to"] = {
"event_id": original_id,
"rel_type": relation_type,
}
if key is not None:
content["m.relates_to"]["key"] = key

channel = self.make_request(
"POST",
f"/_matrix/client/v3/rooms/{self.room}/send/{event_type}",
content,
access_token=access_token,
)
return channel

def _create_user(self, localpart: str) -> Tuple[str, str]:
user_id = self.register_user(localpart, "abc123")
access_token = self.login(localpart, "abc123")

return user_id, access_token

def test_background_update(self) -> None:
"""Test the event_arbitrary_relations background update."""
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍")
@@ -1462,3 +1343,128 @@ def test_background_update(self) -> None:
[ev["event_id"] for ev in channel.json_body["chunk"]],
[annotation_event_id_good, thread_event_id],
)


class RelationRedactionTestCase(BaseRelationsTestCase):
"""Test the behaviour of relations when the parent or child event is redacted."""

def test_aggregation_redactions(self) -> None:
"""Test that annotations get correctly aggregated after a redaction."""

channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a")
self.assertEqual(200, channel.code, channel.json_body)
to_redact_event_id = channel.json_body["event_id"]

channel = self._send_relation(
RelationTypes.ANNOTATION, "m.reaction", "a", access_token=self.user2_token
)
self.assertEqual(200, channel.code, channel.json_body)

# Now lets redact one of the 'a' reactions
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{self.room}/redact/{to_redact_event_id}",
access_token=self.user_token,
content={},
)
self.assertEqual(200, channel.code, channel.json_body)

channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/aggregations/{self.parent_id}",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

self.assertEqual(
channel.json_body,
{"chunk": [{"type": "m.reaction", "key": "a", "count": 1}]},
)

def test_relations_redaction_redacts_edits(self) -> None:
"""Test that edits of an event are redacted when the original event
is redacted.
"""
# Send a new event
res = self.helper.send(self.room, body="Heyo!", tok=self.user_token)
original_event_id = res["event_id"]

# Add a relation
channel = self._send_relation(
RelationTypes.REPLACE,
"m.room.message",
parent_id=original_event_id,
content={
"msgtype": "m.text",
"body": "Wibble",
"m.new_content": {"msgtype": "m.text", "body": "First edit"},
},
)
self.assertEqual(200, channel.code, channel.json_body)

# Check the relation is returned
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations"
f"/{original_event_id}/m.replace/m.room.message",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

self.assertIn("chunk", channel.json_body)
self.assertEqual(len(channel.json_body["chunk"]), 1)

# Redact the original event
channel = self.make_request(
"PUT",
f"/rooms/{self.room}/redact/{original_event_id}/test_relations_redaction_redacts_edits",
access_token=self.user_token,
content="{}",
)
self.assertEqual(200, channel.code, channel.json_body)

# Try to check for remaining m.replace relations
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{original_event_id}/m.replace/m.room.message",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

# Check that no relations are returned
self.assertIn("chunk", channel.json_body)
self.assertEqual(channel.json_body["chunk"], [])

def test_aggregations_redaction_prevents_access_to_aggregations(self) -> None:
"""Test that annotations of an event are redacted when the original event
is redacted.
"""
# Send a new event
res = self.helper.send(self.room, body="Hello!", tok=self.user_token)
original_event_id = res["event_id"]

# Add a relation
channel = self._send_relation(
RelationTypes.ANNOTATION, "m.reaction", key="👍", parent_id=original_event_id
)
self.assertEqual(200, channel.code, channel.json_body)

# Redact the original
channel = self.make_request(
"PUT",
f"/rooms/{self.room}/redact/{original_event_id}/test_aggregations_redaction_prevents_access_to_aggregations",
access_token=self.user_token,
content="{}",
)
self.assertEqual(200, channel.code, channel.json_body)

# Check that aggregations returns zero
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/aggregations/{original_event_id}/m.annotation/m.reaction",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)

self.assertIn("chunk", channel.json_body)
self.assertEqual(channel.json_body["chunk"], [])