diff --git a/src/google/adk/flows/llm_flows/base_llm_flow.py b/src/google/adk/flows/llm_flows/base_llm_flow.py index 51cdd021e0..62e41e1b69 100644 --- a/src/google/adk/flows/llm_flows/base_llm_flow.py +++ b/src/google/adk/flows/llm_flows/base_llm_flow.py @@ -48,6 +48,7 @@ from ...tools.base_toolset import BaseToolset from ...tools.tool_context import ToolContext from ...utils.context_utils import Aclosing +from ...utils import model_name_utils from .audio_cache_manager import AudioCacheManager from .functions import build_auth_request_event @@ -516,6 +517,20 @@ async def run_live( ) llm_request.live_connect_config.session_resumption.transparent = True + if ( + isinstance(llm, Gemini) + and llm._api_backend == GoogleLLMVariant.GEMINI_API + and model_name_utils.is_gemini_3_1_flash_live(llm_request.model) + and llm_request.contents + and not invocation_context.live_session_resumption_handle + ): + if llm_request.live_connect_config is None: + llm_request.live_connect_config = types.LiveConnectConfig() + if llm_request.live_connect_config.history_config is None: + llm_request.live_connect_config.history_config = types.HistoryConfig( + initial_history_in_client_content=True + ) + logger.info( 'Establishing live connection for agent: %s', invocation_context.agent.name, diff --git a/src/google/adk/flows/llm_flows/basic.py b/src/google/adk/flows/llm_flows/basic.py index 8e9bfa514c..aadfd39dec 100644 --- a/src/google/adk/flows/llm_flows/basic.py +++ b/src/google/adk/flows/llm_flows/basic.py @@ -25,6 +25,7 @@ from ...agents.invocation_context import InvocationContext from ...events.event import Event from ...models.llm_request import LlmRequest +from ...utils import model_name_utils from ...utils.output_schema_utils import can_use_output_schema_with_tools from ._base_llm_processor import BaseLlmRequestProcessor @@ -78,11 +79,13 @@ def _build_basic_request( llm_request.live_connect_config.realtime_input_config = ( invocation_context.run_config.realtime_input_config ) + active_model_name = getattr(getattr(agent, 'canonical_live_model', None), 'model', None) or llm_request.model + is_gemini_31 = model_name_utils.is_gemini_3_1_flash_live(active_model_name) llm_request.live_connect_config.enable_affective_dialog = ( - invocation_context.run_config.enable_affective_dialog + None if is_gemini_31 else invocation_context.run_config.enable_affective_dialog ) llm_request.live_connect_config.proactivity = ( - invocation_context.run_config.proactivity + None if is_gemini_31 else invocation_context.run_config.proactivity ) llm_request.live_connect_config.session_resumption = ( invocation_context.run_config.session_resumption diff --git a/src/google/adk/models/gemini_llm_connection.py b/src/google/adk/models/gemini_llm_connection.py index fb9a3a5163..f5d0400b5e 100644 --- a/src/google/adk/models/gemini_llm_connection.py +++ b/src/google/adk/models/gemini_llm_connection.py @@ -80,10 +80,24 @@ async def send_history(self, history: list[types.Content]): ] if contents: + is_gemini_31 = model_name_utils.is_gemini_3_1_flash_live( + self._model_version + ) + # Gemini Enterprise Agent Platform does not support history_config in the SDK. + # To initialize a live session with prior history without hitting a 1007 + # protocol error (invalid role mid-session), we consolidate previous multi-turn + # interactions into a unified contextual preamble on a single user role turn. + if is_gemini_31 and self._api_backend != GoogleLLMVariant.GEMINI_API: + collapsed_text = "Previous conversation history:\n" + for c in contents: + text_parts = "".join(p.text for p in c.parts if p.text) + collapsed_text += f'[{c.role}]: {text_parts}\n' + contents = [types.Content(role='user', parts=[types.Part.from_text(text=collapsed_text)])] + logger.debug('Sending history to live connection: %s', contents) await self._gemini_session.send_client_content( turns=contents, - turn_complete=contents[-1].role == 'user', + turn_complete=True if is_gemini_31 else (contents[-1].role == 'user'), ) else: logger.info('no content is sent') @@ -159,7 +173,12 @@ async def send_realtime(self, input: RealtimeInput): else: raise ValueError('Unsupported input type: %s' % type(input)) - def __build_full_text_response(self, text: str): + def __build_full_text_response( + self, + text: str, + is_thought: bool = False, + grounding_metadata: types.GroundingMetadata | None = None, + ): """Builds a full text response. The text should not be partial and the returned LlmResponse is not @@ -167,6 +186,8 @@ def __build_full_text_response(self, text: str): Args: text: The text to be included in the response. + is_thought: Whether the text is a thought. + grounding_metadata: The grounding metadata to include. Returns: An LlmResponse containing the full text. @@ -176,6 +197,8 @@ def __build_full_text_response(self, text: str): role='model', parts=[types.Part.from_text(text=text)], ), + grounding_metadata=grounding_metadata, + partial=False, live_session_id=self._gemini_session.session_id, ) @@ -188,6 +211,7 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: text = '' tool_call_parts = [] + pending_grounding_metadata = None async with Aclosing(self._gemini_session.receive()) as agen: # TODO(b/440101573): Reuse StreamingResponseAggregator to accumulate # partial content and emit responses as needed. @@ -203,6 +227,10 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: ) if message.server_content: content = message.server_content.model_turn + if message.server_content.grounding_metadata: + pending_grounding_metadata = ( + message.server_content.grounding_metadata + ) # Standalone grounding_metadata event (when content is empty) if ( @@ -215,6 +243,9 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: interrupted=message.server_content.interrupted, model_version=self._model_version, live_session_id=live_session_id, + turn_complete_reason=getattr( + message.server_content, 'turn_complete_reason', None + ), ) if content and content.parts: @@ -223,6 +254,9 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: interrupted=message.server_content.interrupted, model_version=self._model_version, live_session_id=live_session_id, + turn_complete_reason=getattr( + message.server_content, 'turn_complete_reason', None + ), ) # grounding_metadata is yielded again at turn_complete, # so avoid duplicating it here if turn_complete is true. @@ -230,12 +264,21 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: llm_response.grounding_metadata = ( message.server_content.grounding_metadata ) - if content.parts[0].text: - text += content.parts[0].text - llm_response.partial = True + has_inline_data = any(p.inline_data for p in content.parts) + for part in content.parts: + if part.text: + current_is_thought = getattr(part, 'thought', False) + if text and current_is_thought != is_thought: + yield self.__build_full_text_response(text, is_thought) + text = '' + is_thought = False + + text += part.text + is_thought = current_is_thought + llm_response.partial = True # don't yield the merged text event when receiving audio data - elif text and not content.parts[0].inline_data: - yield self.__build_full_text_response(text) + if text and not any(p.text for p in content.parts) and not has_inline_data: + yield self.__build_full_text_response(text, is_thought) text = '' yield llm_response # Note: in some cases, tool_call may arrive before @@ -324,9 +367,14 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: ) self._output_transcription_text = '' if message.server_content.turn_complete: + g_metadata_to_yield = pending_grounding_metadata if text: - yield self.__build_full_text_response(text) + yield self.__build_full_text_response( + text, is_thought, g_metadata_to_yield + ) text = '' + is_thought = False + g_metadata_to_yield = None if tool_call_parts: logger.debug('Returning aggregated tool_call_parts') yield LlmResponse( @@ -338,9 +386,13 @@ async def receive(self) -> AsyncGenerator[LlmResponse, None]: yield LlmResponse( turn_complete=True, interrupted=message.server_content.interrupted, - grounding_metadata=message.server_content.grounding_metadata, + grounding_metadata=message.server_content.grounding_metadata + or g_metadata_to_yield, model_version=self._model_version, live_session_id=live_session_id, + turn_complete_reason=getattr( + message.server_content, 'turn_complete_reason', None + ), ) break # in case of empty content or parts, we still surface it diff --git a/src/google/adk/models/llm_response.py b/src/google/adk/models/llm_response.py index c921f197c3..333034565f 100644 --- a/src/google/adk/models/llm_response.py +++ b/src/google/adk/models/llm_response.py @@ -81,6 +81,12 @@ class LlmResponse(BaseModel): Only used for streaming mode. """ + turn_complete_reason: Optional[types.TurnCompleteReason] = None + """The reason why the turn is complete. + + Only used for streaming mode. + """ + finish_reason: Optional[types.FinishReason] = None """The finish reason of the response.""" diff --git a/tests/unittests/flows/llm_flows/test_base_llm_flow.py b/tests/unittests/flows/llm_flows/test_base_llm_flow.py index e3c1530ca3..59a988c5d1 100644 --- a/tests/unittests/flows/llm_flows/test_base_llm_flow.py +++ b/tests/unittests/flows/llm_flows/test_base_llm_flow.py @@ -22,7 +22,7 @@ from google.adk.events.event import Event from google.adk.flows.llm_flows.base_llm_flow import _handle_after_model_callback from google.adk.flows.llm_flows.base_llm_flow import BaseLlmFlow -from google.adk.models.google_llm import Gemini +from google.adk.models.google_llm import Gemini, GoogleLLMVariant from google.adk.models.llm_request import LlmRequest from google.adk.models.llm_response import LlmResponse from google.adk.plugins.base_plugin import BasePlugin @@ -1129,3 +1129,217 @@ async def test_postprocess_async_yields_grounding_metadata_only(): assert len(events) == 1 assert events[0].grounding_metadata == grounding_metadata + + +@pytest.mark.asyncio +async def test_run_live_reconnect_does_not_set_transparent(): + """Test that run_live reconnect does not set transparent=True.""" + + real_model = Gemini() + mock_connection = mock.AsyncMock() + + async def mock_receive(): + yield LlmResponse( + live_session_resumption_update=types.LiveServerSessionResumptionUpdate( + new_handle='test_handle' + ) + ) + raise ConnectionClosed(None, None) + + mock_connection.receive = mock.Mock(side_effect=mock_receive) + + agent = Agent(name='test_agent', model=real_model) + invocation_context = await testing_utils.create_invocation_context( + agent=agent + ) + invocation_context.live_request_queue = LiveRequestQueue() + invocation_context.run_config = RunConfig() + + flow = BaseLlmFlowForTesting() + + with mock.patch.object(flow, '_send_to_model', new_callable=AsyncMock): + + async def mock_preprocess(ctx, req): + req.live_connect_config.session_resumption = ( + ctx.run_config.session_resumption + ) + yield Event(id=Event.new_id(), author='test') + + with mock.patch.object( + flow, '_preprocess_async', side_effect=mock_preprocess + ): + mock_connection_2 = mock.AsyncMock() + + class StopTestError(Exception): + pass + + async def mock_receive_2(): + yield LlmResponse( + content=types.Content(parts=[types.Part.from_text(text='hi')]) + ) + raise StopTestError('stop') + + mock_connection_2.receive = mock.Mock(side_effect=mock_receive_2) + + mock_aenter = mock.AsyncMock() + mock_aenter.side_effect = [mock_connection, mock_connection_2] + + with mock.patch( + 'google.adk.models.google_llm.Gemini.connect' + ) as mock_connect: + mock_connect.return_value.__aenter__ = mock_aenter + + try: + async for _ in flow.run_live(invocation_context): + pass + except StopTestError: + pass + + assert mock_connect.call_count == 2 + second_call_req = mock_connect.call_args_list[1][0][0] + session_resump = second_call_req.live_connect_config.session_resumption + assert session_resump.transparent is None + + +@pytest.mark.asyncio +async def test_run_live_reconnect_sets_transparent_for_vertex(): + """Test that run_live reconnect sets transparent=True for vertex backend.""" + + real_model = Gemini( + model='projects/test-project/locations/us-central1/publishers/google/models/gemini-2.0-flash-exp' + ) + mock_connection = mock.AsyncMock() + + async def mock_receive(): + yield LlmResponse( + live_session_resumption_update=types.LiveServerSessionResumptionUpdate( + new_handle='test_handle' + ) + ) + raise ConnectionClosed(None, None) + + mock_connection.receive = mock.Mock(side_effect=mock_receive) + + agent = Agent(name='test_agent', model=real_model) + invocation_context = await testing_utils.create_invocation_context( + agent=agent + ) + invocation_context.live_request_queue = LiveRequestQueue() + invocation_context.run_config = RunConfig() + + flow = BaseLlmFlowForTesting() + + with mock.patch.object(flow, '_send_to_model', new_callable=AsyncMock): + + async def mock_preprocess(ctx, req): + req.live_connect_config.session_resumption = ( + ctx.run_config.session_resumption + ) + yield Event(id=Event.new_id(), author='test') + + with mock.patch.object( + flow, '_preprocess_async', side_effect=mock_preprocess + ): + mock_connection_2 = mock.AsyncMock() + + class StopTestError(Exception): + pass + + async def mock_receive_2(): + yield LlmResponse( + content=types.Content(parts=[types.Part.from_text(text='hi')]) + ) + raise StopTestError('stop') + + mock_connection_2.receive = mock.Mock(side_effect=mock_receive_2) + + mock_aenter = mock.AsyncMock() + mock_aenter.side_effect = [mock_connection, mock_connection_2] + + with mock.patch( + 'google.adk.models.google_llm.Gemini.connect' + ) as mock_connect: + mock_connect.return_value.__aenter__ = mock_aenter + + try: + async for _ in flow.run_live(invocation_context): + pass + except StopTestError: + pass + + assert mock_connect.call_count == 2 + second_call_req = mock_connect.call_args_list[1][0][0] + session_resump = second_call_req.live_connect_config.session_resumption + assert session_resump.transparent + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "api_backend,should_have_history_config", + [ + (GoogleLLMVariant.GEMINI_API, True), + (GoogleLLMVariant.VERTEX_AI, False), + ], +) +async def test_run_live_history_config_gated_by_backend( + api_backend, should_have_history_config +): + """Test that run_live only sets history_config for Gemini API backend.""" + + real_model = Gemini(model='gemini-3.1-flash-live-preview') + mock_connection = mock.AsyncMock() + + class StopTestError(Exception): + pass + + async def mock_receive(): + yield LlmResponse( + content=types.Content(parts=[types.Part.from_text(text='hi')]) + ) + raise StopTestError('stop') + + mock_connection.receive = mock.Mock(side_effect=mock_receive) + + agent = Agent(name='test_agent', model=real_model) + invocation_context = await testing_utils.create_invocation_context( + agent=agent + ) + invocation_context.live_request_queue = LiveRequestQueue() + + flow = BaseLlmFlowForTesting() + + with mock.patch.object(flow, '_send_to_model', new_callable=AsyncMock): + async def mock_preprocess(ctx, req): + req.contents = [types.Content(parts=[types.Part.from_text(text='history')])] + yield Event(id=Event.new_id(), author='test') + + with mock.patch.object( + flow, '_preprocess_async', side_effect=mock_preprocess + ): + with mock.patch.object( + Gemini, '_api_backend', new_callable=mock.PropertyMock + ) as mock_backend: + mock_backend.return_value = api_backend + with mock.patch( + 'google.adk.models.google_llm.Gemini.connect' + ) as mock_connect: + mock_connect.return_value.__aenter__.return_value = mock_connection + + try: + async for _ in flow.run_live(invocation_context): + pass + except StopTestError: + pass + + assert mock_connect.call_count == 1 + called_req = mock_connect.call_args[0][0] + if should_have_history_config: + assert called_req.live_connect_config is not None + assert called_req.live_connect_config.history_config is not None + assert ( + called_req.live_connect_config.history_config.initial_history_in_client_content + is True + ) + else: + if called_req.live_connect_config: + assert called_req.live_connect_config.history_config is None diff --git a/tests/unittests/models/test_gemini_llm_connection.py b/tests/unittests/models/test_gemini_llm_connection.py index 58aace30ed..7cc7c22290 100644 --- a/tests/unittests/models/test_gemini_llm_connection.py +++ b/tests/unittests/models/test_gemini_llm_connection.py @@ -1262,3 +1262,292 @@ async def mock_receive_generator(): content_response = next((r for r in responses if r.content), None) assert content_response is not None assert content_response.content == mock_content + + +@pytest.mark.asyncio +async def test_receive_grounding_metadata_pending( + gemini_connection, mock_gemini_session +): + """Test that grounding metadata in partial chunks is pending and yielded on full text.""" + grounding_metadata = types.GroundingMetadata( + web_search_queries=['stock price of google'], + ) + + def make_msg(text=None, g_meta=None, tc=False): + msg = mock.Mock( + usage_metadata=None, + tool_call=None, + session_resumption_update=None, + go_away=None, + ) + msg.server_content = mock.Mock( + interrupted=False, + input_transcription=None, + output_transcription=None, + generation_complete=False, + turn_complete=tc, + grounding_metadata=g_meta, + model_turn=types.Content( + role='model', parts=[types.Part.from_text(text=text)] + ) + if text + else None, + ) + return msg + + msg1 = make_msg(text='hello', g_meta=grounding_metadata) + msg2 = make_msg(text=' world') + msg3 = make_msg(tc=True) + + async def gen(): + yield msg1 + yield msg2 + yield msg3 + + mock_gemini_session.receive = mock.Mock(return_value=gen()) + + responses = [resp async for resp in gemini_connection.receive()] + + # Expected responses: + # 1. Msg 1 partial (hello) with grounding_metadata + # 2. Msg 2 partial ( world) without grounding_metadata + # 3. Full text response (hello world) with PENDING grounding_metadata + # 4. Turn complete response without grounding_metadata (already cleared) + assert len(responses) == 4 + + assert responses[0].content.parts[0].text == 'hello' + assert responses[0].partial is True + assert responses[0].grounding_metadata == grounding_metadata + + assert responses[1].content.parts[0].text == ' world' + assert responses[1].partial is True + assert responses[1].grounding_metadata is None + + assert responses[2].content.parts[0].text == 'hello world' + assert responses[2].partial is False + assert responses[2].grounding_metadata == grounding_metadata + + assert responses[3].turn_complete is True + assert responses[3].grounding_metadata is None + + +@pytest.mark.asyncio +async def test_receive_populates_turn_complete_reason( + gemini_connection, mock_gemini_session +): + """Test that receive populates turn_complete_reason in LlmResponse.""" + mock_server_content = mock.create_autospec( + types.LiveServerContent, instance=True + ) + mock_server_content.model_turn = None + mock_server_content.grounding_metadata = None + mock_server_content.turn_complete = True + mock_server_content.interrupted = False + mock_server_content.input_transcription = None + mock_server_content.output_transcription = None + mock_server_content.generation_complete = False + mock_server_content.turn_complete_reason = ( + types.TurnCompleteReason.RESPONSE_REJECTED + ) + + mock_message = mock.create_autospec(types.LiveServerMessage, instance=True) + mock_message.usage_metadata = None + mock_message.server_content = mock_server_content + mock_message.tool_call = None + mock_message.session_resumption_update = None + mock_message.go_away = None + + async def mock_receive_generator(): + yield mock_message + + mock_gemini_session.receive = mock.Mock(return_value=mock_receive_generator()) + + responses = [resp async for resp in gemini_connection.receive()] + + assert len(responses) == 1 + assert responses[0].turn_complete is True + assert ( + responses[0].turn_complete_reason + == types.TurnCompleteReason.RESPONSE_REJECTED + ) + + +@pytest.mark.asyncio +async def test_receive_populates_turn_complete_reason_standalone_grounding( + gemini_connection, mock_gemini_session +): + """Test that receive populates turn_complete_reason in LlmResponse for standalone grounding metadata.""" + mock_server_content = mock.create_autospec( + types.LiveServerContent, instance=True + ) + mock_server_content.model_turn = None + mock_server_content.grounding_metadata = mock.create_autospec( + types.GroundingMetadata, instance=True + ) + mock_server_content.turn_complete = False + mock_server_content.interrupted = False + mock_server_content.input_transcription = None + mock_server_content.output_transcription = None + mock_server_content.generation_complete = False + mock_server_content.turn_complete_reason = ( + types.TurnCompleteReason.RESPONSE_REJECTED + ) + + mock_message = mock.create_autospec(types.LiveServerMessage, instance=True) + mock_message.usage_metadata = None + mock_message.server_content = mock_server_content + mock_message.tool_call = None + mock_message.session_resumption_update = None + mock_message.go_away = None + + async def mock_receive_generator(): + yield mock_message + + mock_gemini_session.receive = mock.Mock(return_value=mock_receive_generator()) + + responses = [resp async for resp in gemini_connection.receive()] + + assert len(responses) == 1 + assert responses[0].grounding_metadata is not None + assert responses[0].turn_complete is None + assert ( + responses[0].turn_complete_reason + == types.TurnCompleteReason.RESPONSE_REJECTED + ) + + +@pytest.mark.asyncio +async def test_receive_populates_turn_complete_reason_with_content( + gemini_connection, mock_gemini_session +): + """Test that receive populates turn_complete_reason in LlmResponse when model turn has content parts.""" + mock_content = types.Content( + role='model', + parts=[types.Part.from_text(text='hello')], + ) + mock_server_content = mock.create_autospec( + types.LiveServerContent, instance=True + ) + mock_server_content.model_turn = mock_content + mock_server_content.grounding_metadata = None + mock_server_content.turn_complete = False + mock_server_content.interrupted = False + mock_server_content.input_transcription = None + mock_server_content.output_transcription = None + mock_server_content.generation_complete = False + mock_server_content.turn_complete_reason = ( + types.TurnCompleteReason.RESPONSE_REJECTED + ) + + mock_message = mock.create_autospec(types.LiveServerMessage, instance=True) + mock_message.usage_metadata = None + mock_message.server_content = mock_server_content + mock_message.tool_call = None + mock_message.session_resumption_update = None + mock_message.go_away = None + + async def mock_receive_generator(): + yield mock_message + + mock_gemini_session.receive = mock.Mock(return_value=mock_receive_generator()) + + responses = [resp async for resp in gemini_connection.receive()] + + assert len(responses) == 1 + assert responses[0].content == mock_content + assert ( + responses[0].turn_complete_reason + == types.TurnCompleteReason.RESPONSE_REJECTED + ) + + +@pytest.mark.asyncio +async def test_receive_multiplexed_parts(gemini_connection, mock_gemini_session): + """Test receive with multiplexed inline data and text content.""" + mock_content = types.Content( + role='model', + parts=[ + types.Part( + inline_data=types.Blob(data=b'audio_data', mime_type='audio/pcm') + ), + types.Part.from_text(text='transcription text'), + ], + ) + mock_server_content = mock.Mock() + mock_server_content.model_turn = mock_content + mock_server_content.interrupted = False + mock_server_content.input_transcription = None + mock_server_content.output_transcription = None + mock_server_content.turn_complete = False + mock_server_content.grounding_metadata = None + + mock_message = mock.AsyncMock() + mock_message.usage_metadata = None + mock_message.server_content = mock_server_content + mock_message.tool_call = None + mock_message.session_resumption_update = None + mock_message.go_away = None + + async def mock_receive_generator(): + yield mock_message + + receive_mock = mock.Mock(return_value=mock_receive_generator()) + mock_gemini_session.receive = receive_mock + + responses = [resp async for resp in gemini_connection.receive()] + + assert responses + content_response = next((r for r in responses if r.content), None) + assert content_response is not None + assert content_response.content == mock_content + assert content_response.partial is True + + +@pytest.mark.asyncio +async def test_send_history_gemini_31_turn_complete(mock_gemini_session): + """Verify Gemini 3.1 Live history seeding explicitly appends turn_complete=True.""" + from google.adk.models.google_llm import GoogleLLMVariant + conn = GeminiLlmConnection( + mock_gemini_session, + api_backend=GoogleLLMVariant.GEMINI_API, + model_version='gemini-3.1-flash-live-preview', + ) + mock_gemini_session.send_client_content = mock.AsyncMock() + + mock_contents = [ + types.Content(role='user', parts=[types.Part.from_text(text='hi')]), + types.Content(role='model', parts=[types.Part.from_text(text='hello')]), + ] + await conn.send_history(mock_contents) + + mock_gemini_session.send_client_content.assert_called_once_with( + turns=mock_contents, + turn_complete=True, + ) + + +@pytest.mark.asyncio +async def test_send_history_collapse_vertex_ai(mock_gemini_session): + """Verify history prompt collapse when seeding Gemini 3.1 Live on Vertex AI backend.""" + from google.adk.models.google_llm import GoogleLLMVariant + conn = GeminiLlmConnection( + mock_gemini_session, + api_backend=GoogleLLMVariant.VERTEX_AI, + model_version='gemini-3.1-flash-live-preview', + ) + mock_gemini_session.send_client_content = mock.AsyncMock() + + mock_contents = [ + types.Content(role='user', parts=[types.Part.from_text(text='hi')]), + types.Content(role='model', parts=[types.Part.from_text(text='hello')]), + ] + await conn.send_history(mock_contents) + + assert mock_gemini_session.send_client_content.call_count == 1 + called_turns = mock_gemini_session.send_client_content.call_args.kwargs['turns'] + assert len(called_turns) == 1 + assert called_turns[0].role == 'user' + assert 'Previous conversation history:' in called_turns[0].parts[0].text + assert '[user]: hi' in called_turns[0].parts[0].text + assert '[model]: hello' in called_turns[0].parts[0].text + assert mock_gemini_session.send_client_content.call_args.kwargs['turn_complete'] is True