Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ class Langfuse:
host (Optional[str]): Deprecated. Use base_url instead. The Langfuse API host URL. Defaults to "https://cloud.langfuse.com".
timeout (Optional[int]): Timeout in seconds for API requests. Defaults to 5 seconds.
httpx_client (Optional[httpx.Client]): Custom httpx client for making non-tracing HTTP requests. If not provided, a default client will be created.
**Fork limitation**: ``httpx.Client`` is thread-safe but not process-safe. When using
``fork()``-based servers (e.g. Gunicorn with ``--preload``), the SDK recreates the HTTP
client in child processes after fork to avoid sharing file descriptors (TCP sockets) across
processes. A custom ``httpx_client`` will therefore be replaced by a new default client in
child processes — any custom transport, SSL, or proxy settings will not carry over.
If you need those settings in forked workers, configure them via environment variables or
apply them in an ``after_in_child`` fork handler instead.
debug (bool): Enable debug logging. Defaults to False. Can also be set via LANGFUSE_DEBUG environment variable.
tracing_enabled (Optional[bool]): Enable or disable tracing. Defaults to True. Can also be set via LANGFUSE_TRACING_ENABLED environment variable.
flush_at (Optional[int]): Number of spans to batch before sending to the API. Defaults to 512. Can also be set via LANGFUSE_FLUSH_AT environment variable.
Expand Down
153 changes: 132 additions & 21 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import atexit
import os
import threading
import weakref
from queue import Full, Queue
from typing import Any, Callable, Dict, List, Optional, cast

Expand Down Expand Up @@ -170,6 +171,7 @@ def _initialize_instance(
self.base_url = base_url
self.mask = mask
self.environment = environment
self._shutdown = False

# Store additional client settings for get_client() to use
self.timeout = timeout
Expand Down Expand Up @@ -243,7 +245,9 @@ def _initialize_instance(
x_langfuse_public_key=self.public_key,
timeout=timeout,
)
score_ingestion_client = LangfuseClient(

# Store as instance variable so _at_fork_reinit can reuse without recreation
self._score_ingestion_client = LangfuseClient(
public_key=self.public_key,
secret_key=secret_key,
base_url=base_url,
Expand All @@ -257,6 +261,52 @@ def _initialize_instance(
LANGFUSE_MEDIA_UPLOAD_ENABLED, "True"
).lower() not in ("false", "0")

self._media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)

self._init_consumer_threads()

# Prompt cache
self.prompt_cache = PromptCache()

# Register shutdown handler
atexit.register(self.shutdown)

# Register fork handler to reinitialize consumer threads in child process.
# When using Gunicorn with --preload, os.fork() copies memory but not threads
# (POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html).
# Without this, media upload and score ingestion threads are lost after fork,
# causing silent data loss.
#
# Note: LangfuseSpanProcessor (BatchSpanProcessor) already handles fork-safety
# for span export via its own os.register_at_fork. This handler covers the
# remaining background threads managed by LangfuseResourceManager.
#
# weakref.WeakMethod prevents os.register_at_fork from holding a permanent strong
# reference to this instance, which would block garbage collection.
# See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
# Walrus operator resolves the weak reference once and stores it in
# a temporary variable before calling it. This avoids a TOCTOU window
# where GC could collect the referent between checking for None and
# invoking the method.
after_in_child=lambda: (m := weak_reinit()) and m()
)

langfuse_logger.info(
f"Startup: Langfuse tracer successfully initialized | "
f"public_key={self.public_key} | "
f"base_url={base_url} | "
f"environment={environment or 'default'} | "
f"sample_rate={sample_rate if sample_rate is not None else 1.0} | "
f"media_threads={self._media_upload_thread_count}"
)

def _init_consumer_threads(self) -> None:
"""Initialize media upload and score ingestion consumer threads."""
self._media_upload_queue: Queue[Any] = Queue(100_000)
self._media_manager = MediaManager(
api_client=self.api,
Expand All @@ -266,48 +316,107 @@ def _initialize_instance(
)
self._media_upload_consumers = []

media_upload_thread_count = media_upload_thread_count or max(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)

if self._media_upload_enabled:
for i in range(media_upload_thread_count):
for i in range(self._media_upload_thread_count):
media_upload_consumer = MediaUploadConsumer(
identifier=i,
media_manager=self._media_manager,
)
media_upload_consumer.start()
self._media_upload_consumers.append(media_upload_consumer)

# Prompt cache
self.prompt_cache = PromptCache()

# Score ingestion
self._score_ingestion_queue: Queue[Any] = Queue(100_000)
self._ingestion_consumers = []

ingestion_consumer = ScoreIngestionConsumer(
ingestion_queue=self._score_ingestion_queue,
identifier=0,
client=score_ingestion_client,
flush_at=flush_at,
flush_interval=flush_interval,
client=self._score_ingestion_client,
flush_at=self.flush_at,
flush_interval=self.flush_interval,
max_retries=3,
public_key=self.public_key,
)
ingestion_consumer.start()
self._ingestion_consumers.append(ingestion_consumer)

# Register shutdown handler
atexit.register(self.shutdown)
def _at_fork_reinit(self) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need re-init _lock = threading.RLock() because it might be locked when the forking is happening and then it can never be released in the forked child as there is no owning thread in that child.

"""Reinitialize consumer threads after fork in child process.

langfuse_logger.info(
f"Startup: Langfuse tracer successfully initialized | "
f"public_key={self.public_key} | "
f"base_url={base_url} | "
f"environment={environment or 'default'} | "
f"sample_rate={sample_rate if sample_rate is not None else 1.0} | "
f"media_threads={media_upload_thread_count or 1}"
Called automatically via os.register_at_fork() after fork().
Necessary for Gunicorn --preload deployments where os.fork() is used:
threads are not copied to child processes (POSIX standard), so without
reinitialization, the child process has no consumer threads and all
media upload and score ingestion events are silently lost.

Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export
fork-safety separately via its own os.register_at_fork handler.

Skipped if shutdown() was already called on this instance, to avoid
restarting threads on an intentionally torn-down manager.
"""
if self._shutdown:
return

# The class-level lock may have been held by a thread in the parent at fork time.
# That thread does not exist in the child, so the lock can never be released and
# any attempt to acquire it would deadlock. Replace it with a fresh lock first.
LangfuseResourceManager._lock = threading.RLock()

langfuse_logger.debug(
f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads."
)

# Queues are intentionally recreated after fork. Items enqueued before fork
# belong to the preloaded parent process and must not be processed by every
# worker — otherwise uploads/scores would be duplicated across workers.
#
# HTTP clients must also be recreated. httpx.Client is thread-safe but not
# process-safe: fork() duplicates the parent's connection pool (TCP socket file
# descriptors) into the child. Both processes then share the same underlying
# sockets, which causes data corruption and SSL/TLS state mismatch under
# concurrent use. Fresh clients start with an empty pool owned solely by this
# child process.
try:
client_headers = self.additional_headers if self.additional_headers else {}
self.httpx_client = httpx.Client(
timeout=self.timeout, headers=client_headers
)
self.api = LangfuseAPI(
base_url=self.base_url,
username=self.public_key,
password=self.secret_key,
x_langfuse_sdk_name="python",
x_langfuse_sdk_version=langfuse_version,
x_langfuse_public_key=self.public_key,
httpx_client=self.httpx_client,
timeout=self.timeout,
)
self._score_ingestion_client = LangfuseClient(
public_key=self.public_key,
secret_key=self.secret_key,
base_url=self.base_url,
version=langfuse_version,
timeout=self.timeout or 20,
session=self.httpx_client,
)
except Exception as e:
langfuse_logger.error(
f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. "
f"Network requests may fail in this worker."
)

try:
self._init_consumer_threads()
except Exception as e:
langfuse_logger.error(
f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. "
f"Media upload and score ingestion will be unavailable in this worker."
)

langfuse_logger.debug(
f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork"
)
Comment on lines +344 to 420
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Unhandled exception in after_in_child callback crashes Gunicorn worker

_at_fork_reinit calls _init_consumer_threads, which calls Thread.start(). If the OS refuses to create a thread — e.g., due to resource exhaustion (OSError: can't start new thread) — the exception propagates through the after_in_child callback chain and surfaces as an exception from os.fork() in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping _init_consumer_threads() in a try/except Exception and logging the error would allow the child to continue (without consumer threads) instead of crashing.

Prompt To Fix With AI
This is a comment left during a code review.
Path: langfuse/_client/resource_manager.py
Line: 344-378

Comment:
**Unhandled exception in `after_in_child` callback crashes Gunicorn worker**

`_at_fork_reinit` calls `_init_consumer_threads`, which calls `Thread.start()`. If the OS refuses to create a thread — e.g., due to resource exhaustion (`OSError: can't start new thread`) — the exception propagates through the `after_in_child` callback chain and surfaces as an exception from `os.fork()` in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping `_init_consumer_threads()` in a `try/except Exception` and logging the error would allow the child to continue (without consumer threads) instead of crashing.

How can I resolve this? If you propose a fix, please make it concise.


@classmethod
Expand Down Expand Up @@ -449,6 +558,8 @@ def flush(self) -> None:
langfuse_logger.debug("Successfully flushed media upload queue")

def shutdown(self) -> None:
self._shutdown = True

# Unregister the atexit handler first
atexit.unregister(self.shutdown)

Expand Down
Loading