Add poison message handling to the dispatchers#1366
Conversation
…ase of poison message handling, except for entity unlock requests
Co-authored-by: Chris Gillum <cgillum@microsoft.com>
Co-authored-by: Chris Gillum <cgillum@gmail.com>
…ad for trace activities
…vent for json deserialization, etc.
| this.Reason = reason; | ||
| } | ||
|
|
||
| // Private ctor for JSON deserialization (required by some storage providers and out-of-proc executors) |
There was a problem hiding this comment.
Unrelated to this PR but I bug I found when testing (JSON was not able to deserialize this event because it lacked a 0-arg constructor and the other constructors all had multiple parameters)
There was a problem hiding this comment.
Also unrelated to this PR, but I realized while working on it that this code I wrote a while back had some incorrect assumptions so I took the opportunity to fix it
There was a problem hiding this comment.
Pull request overview
This PR adds an extensibility hook (IPoisonMessageHandler) and integrates poison/invalid message detection into the core dispatchers so that corrupted or “poisoned” inputs can be handled deterministically (e.g., fail orchestration/activity/entity work) instead of always throwing.
Changes:
- Introduces
IPoisonMessageHandlerand wires it into orchestration/activity/entity dispatchers for invalid work items and poison message detection. - Adds structured logging support for poison-message detection (new event ID + event source + log event).
- Adds dispatch-count tracking on history events and propagates poison metadata through entity request processing.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/DurableTask.Core/Tracing/TraceHelper.cs | Adjusts entity invocation activity ending to better handle partial result sets. |
| src/DurableTask.Core/TaskOrchestrationDispatcher.cs | Adds poison detection/handling and updates reconciliation to return a drop reason. |
| src/DurableTask.Core/TaskEntityDispatcher.cs | Adds poison detection/handling for entity messages, plus poison-aware batching/result shaping. |
| src/DurableTask.Core/TaskActivityDispatcher.cs | Adds poison/invalid handling for activity scheduling messages (including failing poisoned tasks). |
| src/DurableTask.Core/Logging/StructuredEventSource.cs | Adds a new structured event for poison message detection. |
| src/DurableTask.Core/Logging/LogHelper.cs | Adds PoisonMessageDetected helper overloads emitting structured logs. |
| src/DurableTask.Core/Logging/LogEvents.cs | Adds a new structured log event type for poison messages. |
| src/DurableTask.Core/Logging/EventIds.cs | Reserves a new event ID for poison message detection. |
| src/DurableTask.Core/IPoisonMessageHandler.cs | New interface defining poison detection and handling hooks. |
| src/DurableTask.Core/History/HistoryEvent.cs | Adds DispatchCount to history events for poisoning heuristics/telemetry. |
| src/DurableTask.Core/History/ExecutionRewoundEvent.cs | Adds a parameterless ctor for JSON deserialization compatibility. |
| src/DurableTask.Core/Entities/OrchestrationEntityContext.cs | Adds AbandonAcquire() to reset lock acquisition state on failure. |
| src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs | Adds poison metadata fields used during entity request processing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… combined' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
… combined' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
cgillum
left a comment
There was a problem hiding this comment.
Some initial comments. I haven't gone through the dispatcher code yet (those are bigger diffs).
| /// If the request message is poisoned, the reason it is poisoned. | ||
| /// Otherwise, null. | ||
| /// </summary> | ||
| public string? PoisonReason { get; set; } |
There was a problem hiding this comment.
What's the use case for PoisonReason in entity request messages?
There was a problem hiding this comment.
It's used when generating the failure response for a poisoned entity operation, i.e. here.
Since processing the history event that corresponds to the entity request is decoupled from sending the response, we need to store the poison reason in the request message so we can use it later to populate the failure details.
cgillum
left a comment
There was a problem hiding this comment.
Finished reviewing. Just a few more comments (and some responses).
| @@ -1,4 +1,4 @@ | |||
| // ---------------------------------------------------------------------------------- | |||
| // ---------------------------------------------------------------------------------- | |||
There was a problem hiding this comment.
It might be good to have @sebastianburckhardt review the changes to this file as I'm less familiar with the details of entity dispatching.
| var poisonEvents = runtimeState.NewEvents.Where(evt => evt.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount); | ||
| if (poisonEvents.Count() > 0) |
| if (poisonMessageReason == null && scheduledEvent.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount) | ||
| { | ||
| // start a task to run RenewUntil | ||
| renewTask = Task.Factory.StartNew( | ||
| () => this.RenewUntil(workItem, renewCancellationTokenSource.Token), | ||
| renewCancellationTokenSource.Token); | ||
| poisonMessageReason = $"Activity has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds the maximum dispatch " + | ||
| $"count of {this.poisonMessageHandler?.MaxDispatchCount}. The task will be failed."; |
| bool isPoisonMessage = request.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount; | ||
| if (!isPoisonMessage) | ||
| { |
| case EventType.EventRaised: | ||
| EventRaisedEvent eventRaisedEvent = (EventRaisedEvent)e; | ||
| bool isPoisonMessage = eventRaisedEvent.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount; | ||
|
|
||
| if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name)) |
|
|
||
| var request = schedulerState.Dequeue(); | ||
| if (request.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount) | ||
| { | ||
| this.logHelper.PoisonMessageDetected( | ||
| runtimeState.OrchestrationInstance, | ||
| request, | ||
| $"Entity request has dispatch count {request.DispatchCount} which exceeds the maximum dispatch count " + | ||
| $"of {this.poisonMessageHandler?.MaxDispatchCount} and will be failed."); | ||
| } |
|
|
||
| bool poisonMessagesExist = workToDoNow.Operations.Any(op => op.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount); | ||
| var operationsToSend = operations; | ||
|
|
||
| if (poisonMessagesExist) | ||
| { | ||
| operationsToSend = new List<OperationRequest>(); | ||
| for (int i = 0; i < operations.Count; i++) | ||
| { | ||
| if (workToDoNow.Operations[i].DispatchCount <= this.poisonMessageHandler.MaxDispatchCount) | ||
| { | ||
| operationsToSend.Add(operations[i]); | ||
| } | ||
| } |
| string? poisonMessageReason = null; | ||
| if (scheduledEvent.Name == null) | ||
| { | ||
| string message = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name."; | ||
| this.logHelper.TaskActivityDispatcherError(workItem, message); | ||
| throw TraceHelper.TraceException( | ||
| TraceEventType.Error, | ||
| "TaskActivityDispatcher-MissingActivityName", | ||
| new InvalidOperationException(message)); | ||
| poisonMessageReason = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name."; | ||
| if (this.poisonMessageHandler == null) | ||
| { | ||
| this.logHelper.TaskActivityDispatcherError(workItem, poisonMessageReason); | ||
| throw TraceHelper.TraceException( | ||
| TraceEventType.Error, | ||
| "TaskActivityDispatcher-MissingActivityName", | ||
| new InvalidOperationException(poisonMessageReason)); | ||
| } | ||
| } | ||
|
|
||
| this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent); | ||
| TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name, scheduledEvent.Version); | ||
|
|
||
| if (workItem.LockedUntilUtc < DateTime.MaxValue) | ||
| if (poisonMessageReason == null && scheduledEvent.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount) | ||
| { | ||
| // start a task to run RenewUntil | ||
| renewTask = Task.Factory.StartNew( | ||
| () => this.RenewUntil(workItem, renewCancellationTokenSource.Token), | ||
| renewCancellationTokenSource.Token); | ||
| poisonMessageReason = $"Activity has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds the maximum dispatch " + | ||
| $"count of {this.poisonMessageHandler?.MaxDispatchCount}. The task will be failed."; | ||
| } | ||
|
|
||
| var dispatchContext = new DispatchMiddlewareContext(); | ||
| dispatchContext.SetProperty(taskMessage.OrchestrationInstance); | ||
| dispatchContext.SetProperty(taskActivity); | ||
| dispatchContext.SetProperty(scheduledEvent); | ||
|
|
||
| // In transitionary phase (activity queued from old code, accessed in new code) context can be null. | ||
| if (taskMessage.OrchestrationExecutionContext != null) | ||
| HistoryEvent? eventToRespond = null; | ||
| if (poisonMessageReason != null) | ||
| { | ||
| dispatchContext.SetProperty(taskMessage.OrchestrationExecutionContext); | ||
| this.logHelper.PoisonMessageDetected( | ||
| orchestrationInstance, | ||
| taskMessage.Event, | ||
| poisonMessageReason); | ||
|
|
||
| eventToRespond = new TaskFailedEvent( | ||
| -1, | ||
| scheduledEvent.EventId, | ||
| reason: null, | ||
| details: null, | ||
| new | ||
| ( | ||
| "PoisonMessage", | ||
| poisonMessageReason!, | ||
| stackTrace: null, | ||
| innerFailure: null, | ||
| isNonRetriable: true) | ||
| ); | ||
| traceActivity?.SetStatus(ActivityStatusCode.Error, poisonMessageReason); | ||
| } |
| operationsToSend = new List<OperationRequest>(); | ||
| for (int i = 0; i < operations.Count; i++) | ||
| { | ||
| if (workToDoNow.Operations[i].DispatchCount <= this.poisonMessageHandler.MaxDispatchCount) |
| // (including potential poison messages) will be deferred | ||
| for (int i = 0; i < operations.Count && middlewareResultIndex < result.Results.Count; i++) | ||
| { | ||
| if (workToDoNow.Operations[i].DispatchCount <= this.poisonMessageHandler.MaxDispatchCount) |
| () => this.RenewUntil(workItem, renewCancellationTokenSource.Token), | ||
| renewCancellationTokenSource.Token); | ||
| poisonMessageReason = $"Activity has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds the maximum dispatch " + | ||
| $"count of {this.poisonMessageHandler?.MaxDispatchCount}. The task will be failed."; |
| new FailureDetails( | ||
| "PoisonMessage", | ||
| $"Entity lock request has dispatch count {request.DispatchCount} " + | ||
| $"which exceeds the maximum dispatch count of {this.poisonMessageHandler?.MaxDispatchCount}.", |
| if (isPoisonMessage) | ||
| { | ||
| string failureReason = $"Entity lock release message from parent instance '{message.ParentInstanceId}' has dispatch count " + | ||
| $"{eventRaisedEvent.DispatchCount} which exceeds the maximum allowed dispatch count of {this.poisonMessageHandler?.MaxDispatchCount}."; |
| if (isPoisonMessage) | ||
| { | ||
| string failureReason = $"Entity self-continue message has dispatch count {eventRaisedEvent.DispatchCount} which exceeds the maximum allowed " + | ||
| $"dispatch count of {this.poisonMessageHandler?.MaxDispatchCount}."; |
| runtimeState.OrchestrationInstance, | ||
| request, | ||
| $"Entity request has dispatch count {request.DispatchCount} which exceeds the maximum dispatch count " + | ||
| $"of {this.poisonMessageHandler?.MaxDispatchCount} and will be failed."); |
This PR introduces poison message handling to the dispatchers. This is done by
IPoisonMessageHandlerthat the any orchestration service which has poison message handling is expected to implementThe orchestration service is otherwise responsible for determining what to do with the poison message(s) and how to store them.