Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ def _prepare_arrow_batch(rows: list[dict[str, Any]], arrow_schema: Any) -> Any:
if value is not None:
if isinstance(value, (dict, list)):
try:
value = json.dumps(value)
value = json.dumps(value, ensure_ascii=False)
except (TypeError, ValueError):
value = str(value)
elif isinstance(value, (str, bytes)):
Expand All @@ -1183,18 +1183,18 @@ def _prepare_arrow_batch(rows: list[dict[str, Any]], arrow_schema: Any) -> Any:

if not is_already_json:
try:
value = json.dumps(value)
value = json.dumps(value, ensure_ascii=False)
except (TypeError, ValueError):
value = str(value)
else:
try:
value = json.dumps(value)
value = json.dumps(value, ensure_ascii=False)
except (TypeError, ValueError):
value = str(value)
elif isinstance(value, (dict, list)) and not is_struct and not is_list:
if value is not None and not isinstance(value, (str, bytes)):
try:
value = json.dumps(value)
value = json.dumps(value, ensure_ascii=False)
except (TypeError, ValueError):
value = str(value)

Expand Down Expand Up @@ -1729,7 +1729,8 @@ def _prepare_content_part(
part_data["mime_type"] = "application/json"
part_data["text"] = f"Tool Call: {part.get('name')}"
part_data["part_attributes"] = json.dumps(
{"tool_id": part.get("id"), "name": part.get("name")}
{"tool_id": part.get("id"), "name": part.get("name")},
ensure_ascii=False,
)
summary_text = f"[TOOL: {part.get('name')}]"

Expand Down Expand Up @@ -2928,7 +2929,7 @@ async def on_chain_start(
await self._log(
event_type,
run_id,
content=json.dumps(inputs, default=str),
content=json.dumps(inputs, default=str, ensure_ascii=False),
parent_run_id=parent_run_id,
attributes=attributes,
metadata=metadata,
Expand Down Expand Up @@ -2976,7 +2977,7 @@ async def on_chain_end(
await self._log(
event_type,
run_id,
content=json.dumps(outputs, default=str),
content=json.dumps(outputs, default=str, ensure_ascii=False),
parent_run_id=parent_run_id,
attributes=attributes,
metadata=metadata,
Expand Down Expand Up @@ -3107,7 +3108,7 @@ async def on_retriever_end(
await self._log(
"RETRIEVER_END",
run_id,
content=json.dumps(docs, default=str),
content=json.dumps(docs, default=str, ensure_ascii=False),
parent_run_id=parent_run_id,
metadata=metadata,
latency_measurement=latency_measurement,
Expand Down Expand Up @@ -3165,7 +3166,9 @@ async def on_agent_action(
"AGENT_ACTION",
run_id,
content=json.dumps(
{"tool": action.tool, "input": str(action.tool_input)}, default=str
{"tool": action.tool, "input": str(action.tool_input)},
default=str,
ensure_ascii=False,
),
parent_run_id=parent_run_id,
metadata=kwargs.get("metadata"),
Expand All @@ -3182,7 +3185,9 @@ async def on_agent_finish(
await self._log(
"AGENT_FINISH",
run_id,
content=json.dumps({"output": finish.return_values}, default=str),
content=json.dumps(
{"output": finish.return_values}, default=str, ensure_ascii=False
),
parent_run_id=parent_run_id,
metadata=kwargs.get("metadata"),
)
Expand Down Expand Up @@ -4075,7 +4080,7 @@ def on_chain_start(
self._log(
event_type,
run_id,
content=json.dumps(inputs, default=str),
content=json.dumps(inputs, default=str, ensure_ascii=False),
parent_run_id=parent_run_id,
attributes=attributes,
metadata=metadata,
Expand Down Expand Up @@ -4117,7 +4122,7 @@ def on_chain_end(
self._log(
event_type,
run_id,
content=json.dumps(outputs, default=str),
content=json.dumps(outputs, default=str, ensure_ascii=False),
parent_run_id=parent_run_id,
attributes=attributes,
metadata=metadata,
Expand Down Expand Up @@ -4277,7 +4282,9 @@ def on_agent_action(
"AGENT_ACTION",
run_id,
content=json.dumps(
{"tool": action.tool, "input": str(action.tool_input)}, default=str
{"tool": action.tool, "input": str(action.tool_input)},
default=str,
ensure_ascii=False,
),
parent_run_id=parent_run_id,
metadata=kwargs.get("metadata"),
Expand All @@ -4294,7 +4301,9 @@ def on_agent_finish(
self._log(
"AGENT_FINISH",
run_id,
content=json.dumps({"output": finish.return_values}, default=str),
content=json.dumps(
{"output": finish.return_values}, default=str, ensure_ascii=False
),
parent_run_id=parent_run_id,
metadata=kwargs.get("metadata"),
)
Expand Down Expand Up @@ -4340,7 +4349,7 @@ def on_retriever_end(
self._log(
"RETRIEVER_END",
run_id,
content=json.dumps(docs, default=str),
content=json.dumps(docs, default=str, ensure_ascii=False),
parent_run_id=parent_run_id,
metadata=metadata,
latency_measurement=latency_measurement,
Expand Down Expand Up @@ -4464,7 +4473,9 @@ def __enter__(self) -> "GraphExecutionContext":
self.handler._log(
"INVOCATION_STARTING",
self._run_id,
content=json.dumps({"graph_name": self.graph_name}, default=str),
content=json.dumps(
{"graph_name": self.graph_name}, default=str, ensure_ascii=False
),
attributes=self.handler._build_langgraph_attributes(metadata=self.metadata),
metadata=self.metadata,
)
Expand Down Expand Up @@ -4499,7 +4510,9 @@ def __exit__(
self.handler._log(
"INVOCATION_COMPLETED",
self._run_id,
content=json.dumps({"graph_name": self.graph_name}, default=str),
content=json.dumps(
{"graph_name": self.graph_name}, default=str, ensure_ascii=False
),
attributes=self.handler._build_langgraph_attributes(
metadata=self.metadata
),
Expand Down Expand Up @@ -4558,7 +4571,9 @@ async def __aenter__(self) -> "AsyncGraphExecutionContext":
await self.handler._log(
"INVOCATION_STARTING",
self._run_id,
content=json.dumps({"graph_name": self.graph_name}, default=str),
content=json.dumps(
{"graph_name": self.graph_name}, default=str, ensure_ascii=False
),
attributes=self.handler._build_langgraph_attributes(metadata=self.metadata),
metadata=self.metadata,
)
Expand Down Expand Up @@ -4593,7 +4608,9 @@ async def __aexit__(
await self.handler._log(
"INVOCATION_COMPLETED",
self._run_id,
content=json.dumps({"graph_name": self.graph_name}, default=str),
content=json.dumps(
{"graph_name": self.graph_name}, default=str, ensure_ascii=False
),
attributes=self.handler._build_langgraph_attributes(
metadata=self.metadata
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2513,3 +2513,63 @@ async def slow_write(rows: Any) -> None:
f"{unfinished(bp._queue)}, expected 1 "
"(row 2 should still be unfinished)"
)


def test_bigquery_callback_every_json_dumps_uses_ensure_ascii_false() -> None:
"""Pin that every `json.dumps(` call site in `bigquery_callback.py`
passes `ensure_ascii=False`. Pre-fix, the BigQuery callback's `content`
column (chain inputs, outputs, retrieved docs, tool calls, agent
actions, langgraph attributes) and `_prepare_arrow_batch`'s JSON
columns used the default `ensure_ascii=True`, so CJK / emoji /
accented user input landed in BigQuery as `\\uXXXX` escape sequences
and was unreadable on inspection. A regex tokenize-style check
survives the autouse fixture that mocks pyarrow in CI (which makes
end-to-end tests of `_prepare_arrow_batch` brittle).
"""
import inspect
import re
import tokenize
from io import StringIO

from langchain_google_community.callbacks import bigquery_callback as bq

source = inspect.getsource(bq)

# Find every `json.dumps(` opening and pair it with its closing paren.
pattern = re.compile(r"json\.dumps\(")
offenders: list[str] = []

for match in pattern.finditer(source):
start = match.end()
# Walk paren-balanced, ignoring strings/comments via tokenize.
depth = 1
idx = start
while depth > 0 and idx < len(source):
ch = source[idx]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
idx += 1
call_text = source[match.start() : idx]
if "ensure_ascii=False" not in call_text:
# Take just the head for the error report.
head = re.sub(r"\s+", " ", call_text)[:120]
offenders.append(head)

# Sanity: the file should actually have json.dumps calls (guards against
# a refactor that moves the helper elsewhere silently passing this test).
assert "json.dumps(" in source, (
"Expected `json.dumps(` calls in bigquery_callback.py; if you moved "
"them, update this regression test."
)
# Touch tokenize/StringIO to keep imports load-bearing during static
# analysis even when the regex never trips them.
_ = list(tokenize.generate_tokens(StringIO("").readline))

assert not offenders, (
f"{len(offenders)} `json.dumps` call(s) in bigquery_callback.py are "
"missing `ensure_ascii=False`. Without it, non-ASCII (CJK, emoji, "
"accented) values in BigQuery JSON columns get escaped to `\\uXXXX` "
"and become unreadable.\n\nOffenders:\n - " + "\n - ".join(offenders)
)
Loading