-
Notifications
You must be signed in to change notification settings - Fork 281
fix(resource_manager): reinitialize consumer threads after os.fork() #1658
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5909633
f3b0b53
2ea0708
fd16c0b
eba27cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| import atexit | ||
| import os | ||
| import threading | ||
| import weakref | ||
| from queue import Full, Queue | ||
| from typing import Any, Callable, Dict, List, Optional, cast | ||
|
|
||
|
|
@@ -170,6 +171,7 @@ def _initialize_instance( | |
| self.base_url = base_url | ||
| self.mask = mask | ||
| self.environment = environment | ||
| self._shutdown = False | ||
|
|
||
| # Store additional client settings for get_client() to use | ||
| self.timeout = timeout | ||
|
|
@@ -243,7 +245,9 @@ def _initialize_instance( | |
| x_langfuse_public_key=self.public_key, | ||
| timeout=timeout, | ||
| ) | ||
| score_ingestion_client = LangfuseClient( | ||
|
|
||
| # Store as instance variable so _at_fork_reinit can reuse without recreation | ||
| self._score_ingestion_client = LangfuseClient( | ||
| public_key=self.public_key, | ||
| secret_key=secret_key, | ||
| base_url=base_url, | ||
|
|
@@ -257,6 +261,52 @@ def _initialize_instance( | |
| LANGFUSE_MEDIA_UPLOAD_ENABLED, "True" | ||
| ).lower() not in ("false", "0") | ||
|
|
||
| self._media_upload_thread_count = media_upload_thread_count or max( | ||
| int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 | ||
| ) | ||
|
|
||
| self._init_consumer_threads() | ||
|
|
||
| # Prompt cache | ||
| self.prompt_cache = PromptCache() | ||
|
|
||
| # Register shutdown handler | ||
| atexit.register(self.shutdown) | ||
|
|
||
| # Register fork handler to reinitialize consumer threads in child process. | ||
| # When using Gunicorn with --preload, os.fork() copies memory but not threads | ||
| # (POSIX.1: https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html). | ||
| # Without this, media upload and score ingestion threads are lost after fork, | ||
| # causing silent data loss. | ||
| # | ||
| # Note: LangfuseSpanProcessor (BatchSpanProcessor) already handles fork-safety | ||
| # for span export via its own os.register_at_fork. This handler covers the | ||
| # remaining background threads managed by LangfuseResourceManager. | ||
| # | ||
| # weakref.WeakMethod prevents os.register_at_fork from holding a permanent strong | ||
| # reference to this instance, which would block garbage collection. | ||
| # See: https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py | ||
| if hasattr(os, "register_at_fork"): | ||
| weak_reinit = weakref.WeakMethod(self._at_fork_reinit) | ||
| os.register_at_fork( | ||
| # Walrus operator resolves the weak reference once and stores it in | ||
| # a temporary variable before calling it. This avoids a TOCTOU window | ||
| # where GC could collect the referent between checking for None and | ||
| # invoking the method. | ||
| after_in_child=lambda: (m := weak_reinit()) and m() | ||
| ) | ||
|
|
||
| langfuse_logger.info( | ||
| f"Startup: Langfuse tracer successfully initialized | " | ||
| f"public_key={self.public_key} | " | ||
| f"base_url={base_url} | " | ||
| f"environment={environment or 'default'} | " | ||
| f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " | ||
| f"media_threads={self._media_upload_thread_count}" | ||
| ) | ||
|
|
||
| def _init_consumer_threads(self) -> None: | ||
| """Initialize media upload and score ingestion consumer threads.""" | ||
| self._media_upload_queue: Queue[Any] = Queue(100_000) | ||
| self._media_manager = MediaManager( | ||
| api_client=self.api, | ||
|
|
@@ -266,48 +316,107 @@ def _initialize_instance( | |
| ) | ||
| self._media_upload_consumers = [] | ||
|
|
||
| media_upload_thread_count = media_upload_thread_count or max( | ||
| int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 | ||
| ) | ||
|
|
||
| if self._media_upload_enabled: | ||
| for i in range(media_upload_thread_count): | ||
| for i in range(self._media_upload_thread_count): | ||
| media_upload_consumer = MediaUploadConsumer( | ||
| identifier=i, | ||
| media_manager=self._media_manager, | ||
| ) | ||
| media_upload_consumer.start() | ||
| self._media_upload_consumers.append(media_upload_consumer) | ||
|
|
||
| # Prompt cache | ||
| self.prompt_cache = PromptCache() | ||
|
|
||
| # Score ingestion | ||
| self._score_ingestion_queue: Queue[Any] = Queue(100_000) | ||
| self._ingestion_consumers = [] | ||
|
|
||
| ingestion_consumer = ScoreIngestionConsumer( | ||
| ingestion_queue=self._score_ingestion_queue, | ||
| identifier=0, | ||
| client=score_ingestion_client, | ||
| flush_at=flush_at, | ||
| flush_interval=flush_interval, | ||
| client=self._score_ingestion_client, | ||
| flush_at=self.flush_at, | ||
| flush_interval=self.flush_interval, | ||
| max_retries=3, | ||
| public_key=self.public_key, | ||
| ) | ||
| ingestion_consumer.start() | ||
| self._ingestion_consumers.append(ingestion_consumer) | ||
|
|
||
| # Register shutdown handler | ||
| atexit.register(self.shutdown) | ||
| def _at_fork_reinit(self) -> None: | ||
| """Reinitialize consumer threads after fork in child process. | ||
|
|
||
| langfuse_logger.info( | ||
| f"Startup: Langfuse tracer successfully initialized | " | ||
| f"public_key={self.public_key} | " | ||
| f"base_url={base_url} | " | ||
| f"environment={environment or 'default'} | " | ||
| f"sample_rate={sample_rate if sample_rate is not None else 1.0} | " | ||
| f"media_threads={media_upload_thread_count or 1}" | ||
| Called automatically via os.register_at_fork() after fork(). | ||
| Necessary for Gunicorn --preload deployments where os.fork() is used: | ||
| threads are not copied to child processes (POSIX standard), so without | ||
| reinitialization, the child process has no consumer threads and all | ||
| media upload and score ingestion events are silently lost. | ||
|
|
||
| Note: LangfuseSpanProcessor (BatchSpanProcessor) handles span export | ||
| fork-safety separately via its own os.register_at_fork handler. | ||
|
|
||
| Skipped if shutdown() was already called on this instance, to avoid | ||
| restarting threads on an intentionally torn-down manager. | ||
| """ | ||
| if self._shutdown: | ||
| return | ||
|
|
||
| # The class-level lock may have been held by a thread in the parent at fork time. | ||
| # That thread does not exist in the child, so the lock can never be released and | ||
| # any attempt to acquire it would deadlock. Replace it with a fresh lock first. | ||
| LangfuseResourceManager._lock = threading.RLock() | ||
|
|
||
| langfuse_logger.debug( | ||
| f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads." | ||
| ) | ||
|
|
||
| # Queues are intentionally recreated after fork. Items enqueued before fork | ||
| # belong to the preloaded parent process and must not be processed by every | ||
| # worker — otherwise uploads/scores would be duplicated across workers. | ||
| # | ||
| # HTTP clients must also be recreated. httpx.Client is thread-safe but not | ||
| # process-safe: fork() duplicates the parent's connection pool (TCP socket file | ||
| # descriptors) into the child. Both processes then share the same underlying | ||
| # sockets, which causes data corruption and SSL/TLS state mismatch under | ||
| # concurrent use. Fresh clients start with an empty pool owned solely by this | ||
| # child process. | ||
| try: | ||
| client_headers = self.additional_headers if self.additional_headers else {} | ||
| self.httpx_client = httpx.Client( | ||
| timeout=self.timeout, headers=client_headers | ||
| ) | ||
| self.api = LangfuseAPI( | ||
| base_url=self.base_url, | ||
| username=self.public_key, | ||
| password=self.secret_key, | ||
| x_langfuse_sdk_name="python", | ||
| x_langfuse_sdk_version=langfuse_version, | ||
| x_langfuse_public_key=self.public_key, | ||
| httpx_client=self.httpx_client, | ||
| timeout=self.timeout, | ||
| ) | ||
| self._score_ingestion_client = LangfuseClient( | ||
| public_key=self.public_key, | ||
| secret_key=self.secret_key, | ||
| base_url=self.base_url, | ||
| version=langfuse_version, | ||
| timeout=self.timeout or 20, | ||
| session=self.httpx_client, | ||
| ) | ||
| except Exception as e: | ||
| langfuse_logger.error( | ||
| f"[PID {os.getpid()}] Failed to recreate HTTP clients after fork: {e}. " | ||
| f"Network requests may fail in this worker." | ||
| ) | ||
|
|
||
| try: | ||
| self._init_consumer_threads() | ||
| except Exception as e: | ||
| langfuse_logger.error( | ||
| f"[PID {os.getpid()}] Failed to reinitialize consumer threads after fork: {e}. " | ||
| f"Media upload and score ingestion will be unavailable in this worker." | ||
| ) | ||
|
|
||
| langfuse_logger.debug( | ||
| f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork" | ||
| ) | ||
|
Comment on lines
+344
to
420
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Prompt To Fix With AIThis is a comment left during a code review.
Path: langfuse/_client/resource_manager.py
Line: 344-378
Comment:
**Unhandled exception in `after_in_child` callback crashes Gunicorn worker**
`_at_fork_reinit` calls `_init_consumer_threads`, which calls `Thread.start()`. If the OS refuses to create a thread — e.g., due to resource exhaustion (`OSError: can't start new thread`) — the exception propagates through the `after_in_child` callback chain and surfaces as an exception from `os.fork()` in the child process. Gunicorn would see the worker fail immediately at startup rather than the telemetry path degrading gracefully. Wrapping `_init_consumer_threads()` in a `try/except Exception` and logging the error would allow the child to continue (without consumer threads) instead of crashing.
How can I resolve this? If you propose a fix, please make it concise. |
||
|
|
||
| @classmethod | ||
|
|
@@ -449,6 +558,8 @@ def flush(self) -> None: | |
| langfuse_logger.debug("Successfully flushed media upload queue") | ||
|
|
||
| def shutdown(self) -> None: | ||
| self._shutdown = True | ||
|
|
||
| # Unregister the atexit handler first | ||
| atexit.unregister(self.shutdown) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also need re-init
_lock = threading.RLock()because it might be locked when the forking is happening and then it can never be released in the forked child as there is no owning thread in that child.