diff --git a/tests/integrations/grpc/test_grpc.py b/tests/integrations/grpc/test_grpc.py index a8872ef0b5..8d2698f411 100644 --- a/tests/integrations/grpc/test_grpc.py +++ b/tests/integrations/grpc/test_grpc.py @@ -1,10 +1,8 @@ -import os - import grpc import pytest from concurrent import futures -from typing import List, Optional +from typing import List, Optional, Tuple from unittest.mock import Mock from sentry_sdk import start_span, start_transaction @@ -19,25 +17,36 @@ ) -PORT = 50051 -PORT += os.getpid() % 100 # avoid port conflicts when running tests in parallel - - -def _set_up(interceptors: Optional[List[grpc.ServerInterceptor]] = None): +# Set up in-memory channel instead of network-based +def _set_up( + interceptors: Optional[List[grpc.ServerInterceptor]] = None, +) -> Tuple[grpc.Server, grpc.Channel]: + """ + Sets up a gRPC server and returns both the server and a channel connected to it. + This eliminates network dependencies and makes tests more reliable. + """ + # Create server with thread pool server = grpc.server( futures.ThreadPoolExecutor(max_workers=2), interceptors=interceptors, ) - add_gRPCTestServiceServicer_to_server(TestService(), server) - server.add_insecure_port("[::]:{}".format(PORT)) + # Add our test service to the server + servicer = TestService() + add_gRPCTestServiceServicer_to_server(servicer, server) + + # Use dynamic port allocation instead of hardcoded port + port = server.add_insecure_port("[::]:0") # Let gRPC choose an available port server.start() - return server + # Create channel connected to our server + channel = grpc.insecure_channel(f"localhost:{port}") # noqa: E231 + + return server, channel def _tear_down(server: grpc.Server): - server.stop(None) + server.stop(grace=None) # Immediate shutdown @pytest.mark.forked @@ -45,11 +54,11 @@ def test_grpc_server_starts_transaction(sentry_init, capture_events_forksafe): sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) - stub.TestServe(gRPCTestMessage(text="test")) + # Use the provided channel + stub = gRPCTestServiceStub(channel) + stub.TestServe(gRPCTestMessage(text="test")) _tear_down(server=server) @@ -76,11 +85,11 @@ def test_grpc_server_other_interceptors(sentry_init, capture_events_forksafe): mock_interceptor = Mock() mock_interceptor.intercept_service.side_effect = mock_intercept - server = _set_up(interceptors=[mock_interceptor]) + server, channel = _set_up(interceptors=[mock_interceptor]) - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) - stub.TestServe(gRPCTestMessage(text="test")) + # Use the provided channel + stub = gRPCTestServiceStub(channel) + stub.TestServe(gRPCTestMessage(text="test")) _tear_down(server=server) @@ -103,30 +112,30 @@ def test_grpc_server_continues_transaction(sentry_init, capture_events_forksafe) sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) + # Use the provided channel + stub = gRPCTestServiceStub(channel) - with start_transaction() as transaction: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=transaction.trace_id - ), + with start_transaction() as transaction: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=transaction.trace_id ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=transaction.trace_id, - parent_span_id=transaction.span_id, - sampled=1, - ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=transaction.span_id, + sampled=1, ), - ) - stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + ), + ) + stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) _tear_down(server=server) @@ -148,13 +157,13 @@ def test_grpc_client_starts_span(sentry_init, capture_events_forksafe): sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) + # Use the provided channel + stub = gRPCTestServiceStub(channel) - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) _tear_down(server=server) @@ -183,13 +192,13 @@ def test_grpc_client_unary_stream_starts_span(sentry_init, capture_events_forksa sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) + # Use the provided channel + stub = gRPCTestServiceStub(channel) - with start_transaction(): - [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] + with start_transaction(): + [el for el in stub.TestUnaryStream(gRPCTestMessage(text="test"))] _tear_down(server=server) @@ -227,14 +236,14 @@ def test_grpc_client_other_interceptor(sentry_init, capture_events_forksafe): sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - channel = grpc.intercept_channel(channel, MockClientInterceptor()) - stub = gRPCTestServiceStub(channel) + # Intercept the channel + channel = grpc.intercept_channel(channel, MockClientInterceptor()) + stub = gRPCTestServiceStub(channel) - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) _tear_down(server=server) @@ -267,13 +276,13 @@ def test_grpc_client_and_servers_interceptors_integration( sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) + # Use the provided channel + stub = gRPCTestServiceStub(channel) - with start_transaction(): - stub.TestServe(gRPCTestMessage(text="test")) + with start_transaction(): + stub.TestServe(gRPCTestMessage(text="test")) _tear_down(server=server) @@ -290,13 +299,13 @@ def test_grpc_client_and_servers_interceptors_integration( @pytest.mark.forked def test_stream_stream(sentry_init): sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) - response_iterator = stub.TestStreamStream(iter((gRPCTestMessage(text="test"),))) - for response in response_iterator: - assert response.text == "test" + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response_iterator = stub.TestStreamStream(iter((gRPCTestMessage(text="test"),))) + for response in response_iterator: + assert response.text == "test" _tear_down(server=server) @@ -308,12 +317,12 @@ def test_stream_unary(sentry_init): Tracing not supported for it yet. """ sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) - response = stub.TestStreamUnary(iter((gRPCTestMessage(text="test"),))) - assert response.text == "test" + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response = stub.TestStreamUnary(iter((gRPCTestMessage(text="test"),))) + assert response.text == "test" _tear_down(server=server) @@ -323,13 +332,13 @@ def test_span_origin(sentry_init, capture_events_forksafe): sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) events = capture_events_forksafe() - server = _set_up() + server, channel = _set_up() - with grpc.insecure_channel("localhost:{}".format(PORT)) as channel: - stub = gRPCTestServiceStub(channel) + # Use the provided channel + stub = gRPCTestServiceStub(channel) - with start_transaction(name="custom_transaction"): - stub.TestServe(gRPCTestMessage(text="test")) + with start_transaction(name="custom_transaction"): + stub.TestServe(gRPCTestMessage(text="test")) _tear_down(server=server) diff --git a/tests/integrations/grpc/test_grpc_aio.py b/tests/integrations/grpc/test_grpc_aio.py index 9ce9aef6a5..96e9a4dba8 100644 --- a/tests/integrations/grpc/test_grpc_aio.py +++ b/tests/integrations/grpc/test_grpc_aio.py @@ -1,5 +1,4 @@ import asyncio -import os import grpc import pytest @@ -17,37 +16,52 @@ gRPCTestServiceStub, ) -AIO_PORT = 50052 -AIO_PORT += os.getpid() % 100 # avoid port conflicts when running tests in parallel - @pytest_asyncio.fixture(scope="function") -async def grpc_server(sentry_init): +async def grpc_server_and_channel(sentry_init): + """ + Creates an async gRPC server and a channel connected to it. + Returns both for use in tests, and cleans up afterward. + """ sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) + + # Create server server = grpc.aio.server() - server.add_insecure_port("[::]:{}".format(AIO_PORT)) + + # Let gRPC choose a free port instead of hardcoding it + port = server.add_insecure_port("[::]:0") + + # Add service implementation add_gRPCTestServiceServicer_to_server(TestService, server) + # Start the server await asyncio.create_task(server.start()) + # Create channel connected to our server + channel = grpc.aio.insecure_channel(f"localhost:{port}") # noqa: E231 + try: - yield server + yield server, channel finally: + # Clean up resources + await channel.close() await server.stop(None) @pytest.mark.asyncio async def test_noop_for_unimplemented_method(sentry_init, capture_events): sentry_init(traces_sample_rate=1.0, integrations=[GRPCIntegration()]) - server = grpc.aio.server() - server.add_insecure_port("[::]:{}".format(AIO_PORT)) + # Create empty server with no services + server = grpc.aio.server() + port = server.add_insecure_port("[::]:0") # Let gRPC choose a free port await asyncio.create_task(server.start()) events = capture_events() + try: async with grpc.aio.insecure_channel( - "localhost:{}".format(AIO_PORT) + f"localhost:{port}" # noqa: E231 ) as channel: stub = gRPCTestServiceStub(channel) with pytest.raises(grpc.RpcError) as exc: @@ -60,12 +74,13 @@ async def test_noop_for_unimplemented_method(sentry_init, capture_events): @pytest.mark.asyncio -async def test_grpc_server_starts_transaction(grpc_server, capture_events): +async def test_grpc_server_starts_transaction(grpc_server_and_channel, capture_events): + _, channel = grpc_server_and_channel events = capture_events() - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - await stub.TestServe(gRPCTestMessage(text="test")) + # Use the provided channel + stub = gRPCTestServiceStub(channel) + await stub.TestServe(gRPCTestMessage(text="test")) (event,) = events span = event["spans"][0] @@ -79,32 +94,35 @@ async def test_grpc_server_starts_transaction(grpc_server, capture_events): @pytest.mark.asyncio -async def test_grpc_server_continues_transaction(grpc_server, capture_events): +async def test_grpc_server_continues_transaction( + grpc_server_and_channel, capture_events +): + _, channel = grpc_server_and_channel events = capture_events() - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - - with sentry_sdk.start_transaction() as transaction: - metadata = ( - ( - "baggage", - "sentry-trace_id={trace_id},sentry-environment=test," - "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( - trace_id=transaction.trace_id - ), + # Use the provided channel + stub = gRPCTestServiceStub(channel) + + with sentry_sdk.start_transaction() as transaction: + metadata = ( + ( + "baggage", + "sentry-trace_id={trace_id},sentry-environment=test," + "sentry-transaction=test-transaction,sentry-sample_rate=1.0".format( + trace_id=transaction.trace_id ), - ( - "sentry-trace", - "{trace_id}-{parent_span_id}-{sampled}".format( - trace_id=transaction.trace_id, - parent_span_id=transaction.span_id, - sampled=1, - ), + ), + ( + "sentry-trace", + "{trace_id}-{parent_span_id}-{sampled}".format( + trace_id=transaction.trace_id, + parent_span_id=transaction.span_id, + sampled=1, ), - ) + ), + ) - await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) + await stub.TestServe(gRPCTestMessage(text="test"), metadata=metadata) (event, _) = events span = event["spans"][0] @@ -119,16 +137,17 @@ async def test_grpc_server_continues_transaction(grpc_server, capture_events): @pytest.mark.asyncio -async def test_grpc_server_exception(grpc_server, capture_events): +async def test_grpc_server_exception(grpc_server_and_channel, capture_events): + _, channel = grpc_server_and_channel events = capture_events() - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - try: - await stub.TestServe(gRPCTestMessage(text="exception")) - raise AssertionError() - except Exception: - pass + # Use the provided channel + stub = gRPCTestServiceStub(channel) + try: + await stub.TestServe(gRPCTestMessage(text="exception")) + raise AssertionError() + except Exception: + pass (event, _) = events @@ -139,28 +158,35 @@ async def test_grpc_server_exception(grpc_server, capture_events): @pytest.mark.asyncio -async def test_grpc_server_abort(grpc_server, capture_events): +async def test_grpc_server_abort(grpc_server_and_channel, capture_events): + _, channel = grpc_server_and_channel events = capture_events() - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - try: - await stub.TestServe(gRPCTestMessage(text="abort")) - raise AssertionError() - except Exception: - pass + # Use the provided channel + stub = gRPCTestServiceStub(channel) + try: + await stub.TestServe(gRPCTestMessage(text="abort")) + raise AssertionError() + except Exception: + pass + + # Add a small delay to allow events to be collected + await asyncio.sleep(0.1) assert len(events) == 1 @pytest.mark.asyncio -async def test_grpc_client_starts_span(grpc_server, capture_events_forksafe): +async def test_grpc_client_starts_span( + grpc_server_and_channel, capture_events_forksafe +): + _, channel = grpc_server_and_channel events = capture_events_forksafe() - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - with start_transaction(): - await stub.TestServe(gRPCTestMessage(text="test")) + # Use the provided channel + stub = gRPCTestServiceStub(channel) + with start_transaction(): + await stub.TestServe(gRPCTestMessage(text="test")) events.write_file.close() events.read_event() @@ -184,15 +210,16 @@ async def test_grpc_client_starts_span(grpc_server, capture_events_forksafe): @pytest.mark.asyncio async def test_grpc_client_unary_stream_starts_span( - grpc_server, capture_events_forksafe + grpc_server_and_channel, capture_events_forksafe ): + _, channel = grpc_server_and_channel events = capture_events_forksafe() - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - with start_transaction(): - response = stub.TestUnaryStream(gRPCTestMessage(text="test")) - [_ async for _ in response] + # Use the provided channel + stub = gRPCTestServiceStub(channel) + with start_transaction(): + response = stub.TestUnaryStream(gRPCTestMessage(text="test")) + [_ async for _ in response] events.write_file.close() local_transaction = events.read_event() @@ -213,38 +240,43 @@ async def test_grpc_client_unary_stream_starts_span( @pytest.mark.asyncio -async def test_stream_stream(grpc_server): +async def test_stream_stream(grpc_server_and_channel): """ Test to verify stream-stream works. Tracing not supported for it yet. """ - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - response = stub.TestStreamStream((gRPCTestMessage(text="test"),)) - async for r in response: - assert r.text == "test" + _, channel = grpc_server_and_channel + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response = stub.TestStreamStream((gRPCTestMessage(text="test"),)) + async for r in response: + assert r.text == "test" @pytest.mark.asyncio -async def test_stream_unary(grpc_server): +async def test_stream_unary(grpc_server_and_channel): """ Test to verify stream-stream works. Tracing not supported for it yet. """ - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - response = await stub.TestStreamUnary((gRPCTestMessage(text="test"),)) - assert response.text == "test" + _, channel = grpc_server_and_channel + + # Use the provided channel + stub = gRPCTestServiceStub(channel) + response = await stub.TestStreamUnary((gRPCTestMessage(text="test"),)) + assert response.text == "test" @pytest.mark.asyncio -async def test_span_origin(grpc_server, capture_events_forksafe): +async def test_span_origin(grpc_server_and_channel, capture_events_forksafe): + _, channel = grpc_server_and_channel events = capture_events_forksafe() - async with grpc.aio.insecure_channel("localhost:{}".format(AIO_PORT)) as channel: - stub = gRPCTestServiceStub(channel) - with start_transaction(name="custom_transaction"): - await stub.TestServe(gRPCTestMessage(text="test")) + # Use the provided channel + stub = gRPCTestServiceStub(channel) + with start_transaction(name="custom_transaction"): + await stub.TestServe(gRPCTestMessage(text="test")) events.write_file.close() @@ -283,7 +315,7 @@ async def TestServe(cls, request, context): # noqa: N802 raise cls.TestException() if request.text == "abort": - await context.abort(grpc.StatusCode.ABORTED) + await context.abort(grpc.StatusCode.ABORTED, "Aborted!") return gRPCTestMessage(text=request.text)