diff --git a/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs b/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs
index ce355ab12..d7186b5cc 100644
--- a/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs
+++ b/src/DurableTask.Core/Entities/EventFormat/RequestMessage.cs
@@ -122,6 +122,11 @@ internal class RequestMessage : EntityMessage
///
public string? ClientSpanId { get; set; }
+ ///
+ /// The number of times this request has been dispatched.
+ ///
+ public int DispatchCount { get; set; }
+
///
public override string GetShortDescription()
{
diff --git a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs
index 323cba441..fb0c7ee17 100644
--- a/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs
+++ b/src/DurableTask.Core/Entities/OrchestrationEntityContext.cs
@@ -341,6 +341,16 @@ public void CompleteAcquire(OperationResult result, Guid criticalSectionId)
this.lockAcquisitionPending = false;
}
+ ///
+ /// Called when the entity lock acquisition fails.
+ ///
+ public void AbandonAcquire()
+ {
+ this.criticalSectionLocks = null;
+ this.criticalSectionId = null;
+ this.lockAcquisitionPending = false;
+ }
+
internal void AdjustOutgoingMessage(string instanceId, RequestMessage requestMessage, DateTime? cappedTime, out string eventName)
{
if (cappedTime.HasValue)
diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs
index c838e9eb2..4a9338156 100644
--- a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs
+++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs
@@ -39,6 +39,12 @@ public ExecutionRewoundEvent(int eventId, string? reason)
this.Reason = reason;
}
+ // Private ctor for JSON deserialization (required by some storage providers and out-of-proc executors)
+ ExecutionRewoundEvent()
+ : base(-1)
+ {
+ }
+
///
/// Gets the event type
///
diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs
index 8e6a8f316..c99510b2c 100644
--- a/src/DurableTask.Core/History/HistoryEvent.cs
+++ b/src/DurableTask.Core/History/HistoryEvent.cs
@@ -89,6 +89,12 @@ protected HistoryEvent(int eventId)
[DataMember]
public virtual EventType EventType { get; private set; }
+ ///
+ /// Gets or sets the number of times this event has been dispatched.
+ ///
+ [DataMember(EmitDefaultValue = false)]
+ public int DispatchCount { get; set; }
+
///
/// Implementation for .
///
diff --git a/src/DurableTask.Core/IPoisonMessageHandler.cs b/src/DurableTask.Core/IPoisonMessageHandler.cs
new file mode 100644
index 000000000..ea39ffc5f
--- /dev/null
+++ b/src/DurableTask.Core/IPoisonMessageHandler.cs
@@ -0,0 +1,70 @@
+// ----------------------------------------------------------------------------------
+// Copyright Microsoft Corporation
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ----------------------------------------------------------------------------------
+#nullable enable
+namespace DurableTask.Core
+{
+ using System.Threading.Tasks;
+ using DurableTask.Core.History;
+
+ ///
+ /// Provides extensibility points for detecting and handling "poison" messages and invalid work items
+ /// in the task dispatchers.
+ ///
+ public interface IPoisonMessageHandler
+ {
+ ///
+ /// The maximum dispatch count after which a message should be considered "poisoned" if it is dispatched again.
+ ///
+ public int MaxDispatchCount { get; }
+
+ ///
+ /// Invoked to handle a poison message in the case that a message cannot necessarily
+ /// be "failed" by the dispatchers, so the must
+ /// decide what to do.
+ ///
+ ///
+ /// If this method returns false, the dispatcher should fall back to the default behavior
+ /// followed when poison message handling is not enabled.
+ ///
+ /// The orchestration instance the event was sent to, or null
+ /// if this information is not available.
+ /// The "poisoned" history event.
+ /// The reason the event is "poisoned".
+ /// True if the poison message was successfully handled, otherwise false.
+ public Task HandlePoisonMessageAsync(OrchestrationInstance? orchestrationInstance, HistoryEvent historyEvent, string reason);
+
+ ///
+ /// Invoked to handle a work item that is invalid and cannot be processed at all.
+ ///
+ ///
+ /// If this method returns false, the dispatcher should fall back to the default behavior
+ /// followed in the case of an invalid work item.
+ ///
+ /// The work item that could not be processed.
+ /// Why the work item is invalid.
+ /// True if the poison message was successfully handled, otherwise false.
+ public Task HandleInvalidWorkItemAsync(TaskOrchestrationWorkItem workItem, string reason);
+
+ ///
+ /// Invoked to handle a work item that is invalid and cannot be processed at all.
+ ///
+ ///
+ /// If this method returns false, the dispatcher should fall back to the default behavior
+ /// followed in the case of an invalid work item.
+ ///
+ /// The work item that could not be processed.
+ /// Why the work item is invalid.
+ /// True if the poison message was successfully handled, otherwise false.
+ public Task HandleInvalidWorkItemAsync(TaskActivityWorkItem workItem, string reason);
+ }
+}
diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs
index 049af43f3..f8cb6d769 100644
--- a/src/DurableTask.Core/Logging/EventIds.cs
+++ b/src/DurableTask.Core/Logging/EventIds.cs
@@ -72,5 +72,6 @@ static class EventIds
public const int OrchestrationDebugTrace = 73;
public const int OrchestrationCompletedWithWarning = 74;
+ public const int PoisonMessageDetected = 75;
}
}
diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs
index e5fe2ee33..36f6c8d10 100644
--- a/src/DurableTask.Core/Logging/LogEvents.cs
+++ b/src/DurableTask.Core/Logging/LogEvents.cs
@@ -1976,5 +1976,59 @@ void IEventSourceEvent.WriteEventSource() =>
Utils.PackageVersion);
}
+ ///
+ /// Log event representing the discarding of a "poison" message.
+ ///
+ internal class PoisonMessageDetected : StructuredLogEvent, IEventSourceEvent
+ {
+ public PoisonMessageDetected(OrchestrationInstance orchestrationInstance, string eventType, int taskEventId, int dispatchCount, string details)
+ {
+ this.InstanceId = orchestrationInstance?.InstanceId ?? string.Empty;
+ this.ExecutionId = orchestrationInstance?.ExecutionId ?? string.Empty;
+ this.EventType = eventType;
+ this.TaskEventId = taskEventId;
+ this.Details = details;
+ this.DispatchCount = dispatchCount;
+ }
+
+ [StructuredLogField]
+ public string InstanceId { get; }
+
+ [StructuredLogField]
+ public string ExecutionId { get; }
+
+ [StructuredLogField]
+ public string EventType { get; }
+
+ [StructuredLogField]
+ public int TaskEventId { get; }
+
+ [StructuredLogField]
+ public int DispatchCount { get; }
+
+ [StructuredLogField]
+ public string Details { get; }
+
+ public override EventId EventId => new EventId(
+ EventIds.PoisonMessageDetected,
+ nameof(EventIds.PoisonMessageDetected));
+
+ public override LogLevel Level => LogLevel.Warning;
+
+ protected override string CreateLogMessage() =>
+ $"{this.InstanceId}: Poison message detected for {GetEventDescription(this.EventType, this.TaskEventId)} with dispatch count {this.DispatchCount}: {this.Details}";
+
+ void IEventSourceEvent.WriteEventSource() =>
+ StructuredEventSource.Log.PoisonMessageDetected(
+ this.InstanceId,
+ this.ExecutionId,
+ this.EventType,
+ this.TaskEventId,
+ this.DispatchCount,
+ this.Details,
+ Utils.AppName,
+ Utils.PackageVersion);
+ }
+
}
}
diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs
index 6efbe0cfe..37216df13 100644
--- a/src/DurableTask.Core/Logging/LogHelper.cs
+++ b/src/DurableTask.Core/Logging/LogHelper.cs
@@ -17,6 +17,8 @@ namespace DurableTask.Core.Logging
using System.Collections.Generic;
using System.Text;
using DurableTask.Core.Command;
+ using DurableTask.Core.Common;
+ using DurableTask.Core.Entities.EventFormat;
using DurableTask.Core.Entities.OperationFormat;
using DurableTask.Core.History;
using Microsoft.Extensions.Logging;
@@ -760,6 +762,64 @@ internal void RenewActivityMessageFailed(TaskActivityWorkItem workItem, Exceptio
this.WriteStructuredLog(new LogEvents.RenewActivityMessageFailed(workItem, exception), exception);
}
}
+
+ ///
+ /// Logs that a "poison message" has been detected and is being dropped.
+ ///
+ /// The orchestration instance this event was sent to.
+ /// The "poisoned" event.
+ /// Extra details related to the processing of this poison message.
+ internal void PoisonMessageDetected(OrchestrationInstance? orchestrationInstance, HistoryEvent historyEvent, string details)
+ {
+ if (this.IsStructuredLoggingEnabled)
+ {
+ this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
+ orchestrationInstance,
+ historyEvent.EventType.ToString(),
+ Utils.GetTaskEventId(historyEvent),
+ historyEvent.DispatchCount,
+ details));
+ }
+ }
+
+ ///
+ /// Logs that a "poison" entity request message has been detected and is being dropped.
+ ///
+ /// The orchestration instance this event was sent to.
+ /// The "poisoned" request message.
+ /// Extra details related to the processing of this poison message.
+ internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, RequestMessage requestMessage, string details)
+ {
+ if (this.IsStructuredLoggingEnabled)
+ {
+ this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
+ orchestrationInstance,
+ requestMessage.IsLockRequest ? "LockRequest" : "OperationRequest",
+ taskEventId: -1,
+ requestMessage.DispatchCount,
+ details));
+ }
+ }
+
+ ///
+ /// Logs that a "poison" entity lock release message has been detected and is being dropped.
+ ///
+ /// The orchestration instance this event was sent to.
+ /// The "poisoned" release message.
+ /// The dispatch count of the release message.
+ /// Extra details related to the processing of this poison message.
+ internal void PoisonMessageDetected(OrchestrationInstance orchestrationInstance, ReleaseMessage releaseMessage, int dispatchCount, string details)
+ {
+ if (this.IsStructuredLoggingEnabled)
+ {
+ this.WriteStructuredLog(new LogEvents.PoisonMessageDetected(
+ orchestrationInstance,
+ "LockRelease",
+ taskEventId: -1,
+ dispatchCount,
+ details));
+ }
+ }
#endregion
internal void OrchestrationDebugTrace(string instanceId, string executionId, string details)
diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs
index 129bac158..fb09dc905 100644
--- a/src/DurableTask.Core/Logging/StructuredEventSource.cs
+++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs
@@ -656,6 +656,33 @@ internal void DiscardingMessage(
}
}
+ [Event(EventIds.PoisonMessageDetected, Level = EventLevel.Warning, Version = 1)]
+ internal void PoisonMessageDetected(
+ string InstanceId,
+ string ExecutionId,
+ string EventType,
+ int TaskEventId,
+ int DispatchCount,
+ string Details,
+ string AppName,
+ string ExtensionVersion)
+ {
+ if (this.IsEnabled(EventLevel.Warning))
+ {
+ // TODO: Use WriteEventCore for better performance
+ this.WriteEvent(
+ EventIds.PoisonMessageDetected,
+ InstanceId ?? string.Empty,
+ ExecutionId ?? string.Empty,
+ EventType,
+ TaskEventId,
+ DispatchCount,
+ Details,
+ AppName,
+ ExtensionVersion);
+ }
+ }
+
[Event(EventIds.EntityBatchExecuting, Level = EventLevel.Informational, Version = 1)]
internal void EntityBatchExecuting(
string InstanceId,
diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs
index 8f4c24dca..59a7e87ba 100644
--- a/src/DurableTask.Core/TaskActivityDispatcher.cs
+++ b/src/DurableTask.Core/TaskActivityDispatcher.cs
@@ -37,6 +37,7 @@ public sealed class TaskActivityDispatcher
readonly LogHelper logHelper;
readonly ErrorPropagationMode errorPropagationMode;
readonly IExceptionPropertiesProvider? exceptionPropertiesProvider;
+ readonly IPoisonMessageHandler? poisonMessageHandler;
///
/// Initializes a new instance of the class with an exception properties provider.
@@ -61,6 +62,7 @@ internal TaskActivityDispatcher(
this.logHelper = logHelper;
this.errorPropagationMode = errorPropagationMode;
this.exceptionPropertiesProvider = exceptionPropertiesProvider;
+ this.poisonMessageHandler = orchestrationService as IPoisonMessageHandler;
this.dispatcher = new WorkItemDispatcher(
"TaskActivityDispatcher",
@@ -117,9 +119,16 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
{
if (orchestrationInstance == null || string.IsNullOrWhiteSpace(orchestrationInstance.InstanceId))
{
- this.logHelper.TaskActivityDispatcherError(
- workItem,
- $"The activity worker received a message that does not have any OrchestrationInstance information.");
+ string message = "The activity worker received a message that does not have any OrchestrationInstance information.";
+ if (this.poisonMessageHandler != null)
+ {
+ this.logHelper.PoisonMessageDetected(orchestrationInstance, taskMessage.Event, message);
+ if (await this.poisonMessageHandler.HandleInvalidWorkItemAsync(workItem, message))
+ {
+ return;
+ }
+ }
+ this.logHelper.TaskActivityDispatcherError(workItem, message);
throw TraceHelper.TraceException(
TraceEventType.Error,
"TaskActivityDispatcher-MissingOrchestrationInstance",
@@ -128,14 +137,22 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
if (taskMessage.Event.EventType != EventType.TaskScheduled)
{
- this.logHelper.TaskActivityDispatcherError(
- workItem,
- $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported.");
+ string message = $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only " +
+ $"'{EventType.TaskScheduled}' is supported.";
+ if (this.poisonMessageHandler != null)
+ {
+ this.logHelper.PoisonMessageDetected(orchestrationInstance, taskMessage.Event, message);
+ if (await this.poisonMessageHandler.HandleInvalidWorkItemAsync(workItem, message))
+ {
+ return;
+ }
+ }
+ this.logHelper.TaskActivityDispatcherError(workItem, message);
throw TraceHelper.TraceException(
TraceEventType.Critical,
"TaskActivityDispatcher-UnsupportedEventType",
new NotSupportedException("Activity worker does not support event of type: " +
- taskMessage.Event.EventType));
+ taskMessage.Event.EventType));
}
scheduledEvent = (TaskScheduledEvent)taskMessage.Event;
@@ -143,136 +160,172 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
// Distributed tracing: start a new trace activity derived from the orchestration's trace context.
Activity? traceActivity = TraceHelper.StartTraceActivityForTaskExecution(scheduledEvent, orchestrationInstance);
+ string? poisonMessageReason = null;
if (scheduledEvent.Name == null)
{
- string message = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name.";
- this.logHelper.TaskActivityDispatcherError(workItem, message);
- throw TraceHelper.TraceException(
- TraceEventType.Error,
- "TaskActivityDispatcher-MissingActivityName",
- new InvalidOperationException(message));
+ poisonMessageReason = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name.";
+ if (this.poisonMessageHandler == null)
+ {
+ this.logHelper.TaskActivityDispatcherError(workItem, poisonMessageReason);
+ throw TraceHelper.TraceException(
+ TraceEventType.Error,
+ "TaskActivityDispatcher-MissingActivityName",
+ new InvalidOperationException(poisonMessageReason));
+ }
}
- this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent);
- TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name, scheduledEvent.Version);
-
- if (workItem.LockedUntilUtc < DateTime.MaxValue)
+ if (poisonMessageReason == null && scheduledEvent.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount)
{
- // start a task to run RenewUntil
- renewTask = Task.Factory.StartNew(
- () => this.RenewUntil(workItem, renewCancellationTokenSource.Token),
- renewCancellationTokenSource.Token);
+ poisonMessageReason = $"Activity has received an event with dispatch count {taskMessage.Event.DispatchCount} which exceeds the maximum dispatch " +
+ $"count of {this.poisonMessageHandler.MaxDispatchCount}. The task will be failed.";
}
- var dispatchContext = new DispatchMiddlewareContext();
- dispatchContext.SetProperty(taskMessage.OrchestrationInstance);
- dispatchContext.SetProperty(taskActivity);
- dispatchContext.SetProperty(scheduledEvent);
-
- // In transitionary phase (activity queued from old code, accessed in new code) context can be null.
- if (taskMessage.OrchestrationExecutionContext != null)
+ HistoryEvent? eventToRespond = null;
+ if (poisonMessageReason != null)
{
- dispatchContext.SetProperty(taskMessage.OrchestrationExecutionContext);
+ this.logHelper.PoisonMessageDetected(
+ orchestrationInstance,
+ taskMessage.Event,
+ poisonMessageReason);
+
+ eventToRespond = new TaskFailedEvent(
+ -1,
+ scheduledEvent.EventId,
+ reason: null,
+ details: null,
+ new
+ (
+ "PoisonMessage",
+ poisonMessageReason!,
+ stackTrace: null,
+ innerFailure: null,
+ isNonRetriable: true)
+ );
+ traceActivity?.SetStatus(ActivityStatusCode.Error, poisonMessageReason);
}
-
- // correlation
- CorrelationTraceClient.Propagate(() =>
+ else
{
- workItem.TraceContextBase?.SetActivityToCurrent();
- diagnosticActivity = workItem.TraceContextBase?.CurrentActivity;
- });
+ this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent);
+ TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name!, scheduledEvent.Version);
- ActivityExecutionResult? result;
- try
- {
- await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
+ if (workItem.LockedUntilUtc < DateTime.MaxValue)
{
- if (taskActivity == null)
- {
- // This likely indicates a deployment error of some kind. Because these unhandled exceptions are
- // automatically retried, resolving this may require redeploying the app code so that the activity exists again.
- // CONSIDER: Should this be changed into a permanent error that fails the orchestration? Perhaps
- // the app owner doesn't care to preserve existing instances when doing code deployments?
- throw new TypeMissingException($"TaskActivity {scheduledEvent.Name} version {scheduledEvent.Version} was not found");
- }
-
- var context = new TaskContext(
- taskMessage.OrchestrationInstance,
- scheduledEvent.Name,
- scheduledEvent.Version,
- scheduledEvent.EventId);
- context.ErrorPropagationMode = this.errorPropagationMode;
- context.ExceptionPropertiesProvider = this.exceptionPropertiesProvider;
+ // start a task to run RenewUntil
+ renewTask = Task.Factory.StartNew(
+ () => this.RenewUntil(workItem, renewCancellationTokenSource.Token),
+ renewCancellationTokenSource.Token);
+ }
- HistoryEvent? responseEvent;
+ var dispatchContext = new DispatchMiddlewareContext();
+ dispatchContext.SetProperty(taskMessage.OrchestrationInstance);
+ dispatchContext.SetProperty(taskActivity);
+ dispatchContext.SetProperty(scheduledEvent);
- try
- {
- string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
- responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
- }
- catch (Exception e) when (e is not TaskFailureException && !Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
- {
- // These are unexpected exceptions that occur in the task activity abstraction. Normal exceptions from
- // activities are expected to be translated into TaskFailureException and handled outside the middleware
- // context (see further below).
- TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessException", taskMessage.OrchestrationInstance, e);
- string? details = this.IncludeDetails
- ? $"Unhandled exception while executing task: {e}"
- : null;
- responseEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, new FailureDetails(e));
-
- traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);
-
- this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, (TaskFailedEvent)responseEvent, e);
- }
+ // In transitionary phase (activity queued from old code, accessed in new code) context can be null.
+ if (taskMessage.OrchestrationExecutionContext != null)
+ {
+ dispatchContext.SetProperty(taskMessage.OrchestrationExecutionContext);
+ }
- var result = new ActivityExecutionResult { ResponseEvent = responseEvent };
- dispatchContext.SetProperty(result);
+ // correlation
+ CorrelationTraceClient.Propagate(() =>
+ {
+ workItem.TraceContextBase?.SetActivityToCurrent();
+ diagnosticActivity = workItem.TraceContextBase?.CurrentActivity;
});
- result = dispatchContext.GetProperty();
- }
- catch (TaskFailureException e)
- {
- // These are normal task activity failures. They can come from Activity implementations or from middleware.
- TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessTaskFailure", taskMessage.OrchestrationInstance, e);
- string? details = this.IncludeDetails ? e.Details : null;
- var failureEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, e.FailureDetails);
+ ActivityExecutionResult? result;
+ try
+ {
+ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
+ {
+ if (taskActivity == null)
+ {
+ // This likely indicates a deployment error of some kind. Because these unhandled exceptions are
+ // automatically retried, resolving this may require redeploying the app code so that the activity exists again.
+ // CONSIDER: Should this be changed into a permanent error that fails the orchestration? Perhaps
+ // the app owner doesn't care to preserve existing instances when doing code deployments?
+ throw new TypeMissingException($"TaskActivity {scheduledEvent.Name} version {scheduledEvent.Version} was not found");
+ }
+
+ var context = new TaskContext(
+ taskMessage.OrchestrationInstance,
+ scheduledEvent.Name!,
+ scheduledEvent.Version,
+ scheduledEvent.EventId);
+ context.ErrorPropagationMode = this.errorPropagationMode;
+ context.ExceptionPropertiesProvider = this.exceptionPropertiesProvider;
+
+ HistoryEvent? responseEvent;
+
+ try
+ {
+ string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
+ responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
+ }
+ catch (Exception e) when (e is not TaskFailureException && !Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
+ {
+ // These are unexpected exceptions that occur in the task activity abstraction. Normal exceptions from
+ // activities are expected to be translated into TaskFailureException and handled outside the middleware
+ // context (see further below).
+ TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessException", taskMessage.OrchestrationInstance, e);
+ string? details = this.IncludeDetails
+ ? $"Unhandled exception while executing task: {e}"
+ : null;
+ responseEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, new FailureDetails(e));
+
+ traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);
+
+ this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name!, (TaskFailedEvent)responseEvent, e);
+ }
+
+ var result = new ActivityExecutionResult { ResponseEvent = responseEvent };
+ dispatchContext.SetProperty(result);
+ });
+
+ result = dispatchContext.GetProperty();
+ }
+ catch (TaskFailureException e)
+ {
+ // These are normal task activity failures. They can come from Activity implementations or from middleware.
+ TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessTaskFailure", taskMessage.OrchestrationInstance, e);
+ string? details = this.IncludeDetails ? e.Details : null;
+ var failureEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, e.FailureDetails);
- traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);
+ traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);
- this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, failureEvent, e);
- CorrelationTraceClient.Propagate(() => CorrelationTraceClient.TrackException(e));
- result = new ActivityExecutionResult { ResponseEvent = failureEvent };
- }
- catch (Exception middlewareException) when (!Utils.IsFatal(middlewareException))
- {
- traceActivity?.SetStatus(ActivityStatusCode.Error, middlewareException.Message);
+ this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name!, failureEvent, e);
+ CorrelationTraceClient.Propagate(() => CorrelationTraceClient.TrackException(e));
+ result = new ActivityExecutionResult { ResponseEvent = failureEvent };
+ }
+ catch (Exception middlewareException) when (!Utils.IsFatal(middlewareException))
+ {
+ traceActivity?.SetStatus(ActivityStatusCode.Error, middlewareException.Message);
- // These are considered retriable
- this.logHelper.TaskActivityDispatcherError(workItem, $"Unhandled exception in activity middleware pipeline: {middlewareException}");
- throw;
- }
+ // These are considered retriable
+ this.logHelper.TaskActivityDispatcherError(workItem, $"Unhandled exception in activity middleware pipeline: {middlewareException}");
+ throw;
+ }
- HistoryEvent? eventToRespond = result?.ResponseEvent;
+ eventToRespond = result?.ResponseEvent;
- if (eventToRespond is TaskCompletedEvent completedEvent)
- {
- this.logHelper.TaskActivityCompleted(orchestrationInstance, scheduledEvent.Name, completedEvent);
- }
- else if (eventToRespond is null)
- {
- // Default response if middleware prevents a response from being generated
- eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, null);
- }
+ if (eventToRespond is TaskCompletedEvent completedEvent)
+ {
+ this.logHelper.TaskActivityCompleted(orchestrationInstance, scheduledEvent.Name!, completedEvent);
+ }
+ else if (eventToRespond is null)
+ {
+ // Default response if middleware prevents a response from being generated
+ eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, null);
+ }
- if (traceActivity != null && eventToRespond is TaskCompletedEvent)
- {
- // Ensure successful executions don't preserve a prior error status from custom instrumentation.
- traceActivity.SetStatus(ActivityStatusCode.OK, "Completed");
+ if (traceActivity != null && eventToRespond is TaskCompletedEvent)
+ {
+ // Ensure successful executions don't preserve a prior error status from custom instrumentation.
+ traceActivity.SetStatus(ActivityStatusCode.OK, "Completed");
+ }
}
-
+
var responseTaskMessage = new TaskMessage
{
Event = eventToRespond,
diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs
index a3d163e28..736c5013c 100644
--- a/src/DurableTask.Core/TaskEntityDispatcher.cs
+++ b/src/DurableTask.Core/TaskEntityDispatcher.cs
@@ -1,4 +1,4 @@
-// ----------------------------------------------------------------------------------
+// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ namespace DurableTask.Core
using System;
using System.Collections.Generic;
using System.Diagnostics;
+ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -43,6 +44,7 @@ public class TaskEntityDispatcher
readonly ErrorPropagationMode errorPropagationMode;
readonly TaskOrchestrationDispatcher.NonBlockingCountdownLock concurrentSessionLock;
readonly IExceptionPropertiesProvider exceptionPropertiesProvider;
+ readonly IPoisonMessageHandler poisonMessageHandler;
///
/// Initializes a new instance of the class with an exception properties provider.
@@ -69,7 +71,8 @@ internal TaskEntityDispatcher(
this.exceptionPropertiesProvider = exceptionPropertiesProvider;
this.entityOrchestrationService = (orchestrationService as IEntityOrchestrationService)!;
this.entityBackendProperties = entityOrchestrationService.EntityBackendProperties;
-
+ this.poisonMessageHandler = orchestrationService as IPoisonMessageHandler;
+
this.dispatcher = new WorkItemDispatcher(
"TaskEntityDispatcher",
item => item == null ? string.Empty : item.InstanceId,
@@ -155,8 +158,8 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
// If we failed to acquire it, we will end the extended session after this execution.
schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState);
- // The entity has been deleted, so we end the extended session.
- if (this.EntityIsDeleted(schedulerState))
+ // The work item could not be processed or the entity has been deleted, so we end the extended session.
+ if (schedulerState == null || this.EntityIsDeleted(schedulerState))
{
break;
}
@@ -251,24 +254,44 @@ private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkI
try
{
+ bool firstExecutionIfExtendedSession = schedulerState == null;
+
+ Work workToDoNow = null;
+ bool reconciled = TaskOrchestrationDispatcher.ReconcileMessagesWithState(
+ workItem,
+ nameof(TaskEntityDispatcher),
+ this.errorPropagationMode,
+ this.logHelper,
+ this.poisonMessageHandler != null,
+ out string errorMessage);
+
+ bool workDetermined = false;
+ if (reconciled)
+ {
+ // we start with processing all the requests and figuring out which ones to execute now
+ // results can depend on whether the entity is locked, what the maximum batch size is,
+ // and whether the messages arrived out of order
+ DetermineWorkResult determineWorkResult = await this.DetermineWorkAsync(workItem, schedulerState);
+ schedulerState = determineWorkResult.SchedulerState;
+ workToDoNow = determineWorkResult.Batch;
+ errorMessage = determineWorkResult.ErrorMessage;
+ workDetermined = determineWorkResult.Success;
+ }
+
// Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
- if (!TaskOrchestrationDispatcher.ReconcileMessagesWithState(workItem, nameof(TaskEntityDispatcher), this.errorPropagationMode, this.logHelper))
+ if (!reconciled || !workDetermined)
{
// TODO : mark an orchestration as faulted if there is data corruption
- this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
+ this.logHelper.DroppingOrchestrationWorkItem(workItem, errorMessage);
+ if (this.poisonMessageHandler != null
+ && await this.poisonMessageHandler.HandleInvalidWorkItemAsync(workItem, errorMessage))
+ {
+ // Signal the extended session to end if one is running
+ return null;
+ }
}
else
{
- bool firstExecutionIfExtendedSession = schedulerState == null;
-
- // we start with processing all the requests and figuring out which ones to execute now
- // results can depend on whether the entity is locked, what the maximum batch size is,
- // and whether the messages arrived out of order
-
- this.DetermineWork(workItem.OrchestrationRuntimeState,
- ref schedulerState,
- out Work workToDoNow);
-
if (workToDoNow.OperationCount > 0)
{
// execute the user-defined operations on this entity, via the middleware
@@ -437,14 +460,18 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request)
{
- this.logHelper.EntityLockAcquired(effects.InstanceId, request);
+ bool isPoisonMessage = request.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount;
+ if (!isPoisonMessage)
+ {
+ this.logHelper.EntityLockAcquired(effects.InstanceId, request);
- // mark the entity state as locked
- schedulerState.LockedBy = request.ParentInstanceId;
+ // mark the entity state as locked
+ schedulerState.LockedBy = request.ParentInstanceId;
- request.Position++;
+ request.Position++;
+ }
- if (request.Position < request.LockSet.Length)
+ if (request.Position < request.LockSet.Length && !isPoisonMessage)
{
// send lock request to next entity in the lock set
var target = new OrchestrationInstance() { InstanceId = request.LockSet[request.Position].ToString() };
@@ -454,7 +481,22 @@ void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState,
{
// send lock acquisition completed response back to originating orchestration instance
var target = new OrchestrationInstance() { InstanceId = request.ParentInstanceId, ExecutionId = request.ParentExecutionId };
- this.SendLockResponseMessage(effects, target, request.Id);
+
+ // In the case of a poison message, it will be the locking instance's responsibility to unlock any other entities for whom the
+ // lock request may have succeeded.
+ this.SendLockResponseMessage(
+ effects,
+ target,
+ request.Id,
+ isPoisonMessage ?
+ new FailureDetails(
+ "PoisonMessage",
+ $"Entity lock request has dispatch count {request.DispatchCount} " +
+ $"which exceeds the maximum dispatch count of {this.poisonMessageHandler.MaxDispatchCount}.",
+ stackTrace: null,
+ innerFailure: null,
+ isNonRetriable: true)
+ : null);
}
}
@@ -475,12 +517,29 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState)
#region Preprocess to determine work
- void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState schedulerState, out Work batch)
+ readonly struct DetermineWorkResult
{
+ public DetermineWorkResult(bool success, SchedulerState schedulerState, Work batch, string errorMessage)
+ {
+ this.Success = success;
+ this.SchedulerState = schedulerState;
+ this.Batch = batch;
+ this.ErrorMessage = errorMessage;
+ }
+
+ public bool Success { get; }
+ public SchedulerState SchedulerState { get; }
+ public Work Batch { get; }
+ public string ErrorMessage { get; }
+ }
+
+ async Task DetermineWorkAsync(TaskOrchestrationWorkItem workItem, SchedulerState schedulerState)
+ {
+ OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
string instanceId = runtimeState.OrchestrationInstance.InstanceId;
bool deserializeState = schedulerState == null;
schedulerState ??= new();
- batch = new Work();
+ Work batch = new Work();
Queue lockHolderMessages = null;
@@ -501,6 +560,15 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc
}
catch (Exception exception)
{
+ if (this.poisonMessageHandler != null)
+ {
+ string errorMessage = $"Failed to deserialize the entity scheduler state from the {EventType.ExecutionStarted} input.";
+ this.logHelper.PoisonMessageDetected(
+ runtimeState.OrchestrationInstance,
+ e,
+ $"Dropping entity work item: {errorMessage}");
+ return new DetermineWorkResult(success: false, schedulerState, batch, errorMessage);
+ }
throw new EntitySchedulerException("Failed to deserialize entity scheduler state - may be corrupted or wrong version.", exception);
}
}
@@ -508,6 +576,7 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc
case EventType.EventRaised:
EventRaisedEvent eventRaisedEvent = (EventRaisedEvent)e;
+ bool isPoisonMessage = eventRaisedEvent.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount;
if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name))
{
@@ -520,9 +589,25 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc
}
catch (Exception exception)
{
- throw new EntitySchedulerException("Failed to deserialize incoming request message - may be corrupted or wrong version.", exception);
+ string failureReason = $"Failed to deserialize incoming entity request message - may be corrupted or wrong version: {exception.Message}";
+ if (this.poisonMessageHandler != null)
+ {
+ this.logHelper.PoisonMessageDetected(
+ runtimeState.OrchestrationInstance,
+ eventRaisedEvent,
+ failureReason);
+ if (await this.poisonMessageHandler.HandlePoisonMessageAsync(
+ workItem.OrchestrationRuntimeState.OrchestrationInstance,
+ eventRaisedEvent,
+ failureReason))
+ {
+ break;
+ }
+ }
+ throw new EntitySchedulerException(failureReason, exception);
}
+ requestMessage.DispatchCount = eventRaisedEvent.DispatchCount;
IEnumerable deliverNow;
if (requestMessage.ScheduledTime.HasValue)
@@ -585,7 +670,36 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc
}
catch (Exception exception)
{
- throw new EntitySchedulerException("Failed to deserialize lock release message - may be corrupted or wrong version.", exception);
+ string failureReason = $"Failed to deserialize entity lock release message - may be corrupted or wrong version: {exception.Message}";
+ if (this.poisonMessageHandler != null)
+ {
+ this.logHelper.PoisonMessageDetected(
+ runtimeState.OrchestrationInstance,
+ eventRaisedEvent,
+ failureReason);
+ if (await this.poisonMessageHandler.HandlePoisonMessageAsync(
+ workItem.OrchestrationRuntimeState.OrchestrationInstance,
+ eventRaisedEvent,
+ failureReason))
+ {
+ break;
+ }
+ }
+ throw new EntitySchedulerException(failureReason, exception);
+ }
+
+ if (isPoisonMessage)
+ {
+ string failureReason = $"Entity lock release message from parent instance '{message.ParentInstanceId}' has dispatch count " +
+ $"{eventRaisedEvent.DispatchCount} which exceeds the maximum allowed dispatch count of {this.poisonMessageHandler.MaxDispatchCount}.";
+ this.logHelper.PoisonMessageDetected(runtimeState.OrchestrationInstance, message, eventRaisedEvent.DispatchCount, failureReason);
+ if (await this.poisonMessageHandler.HandlePoisonMessageAsync(
+ workItem.OrchestrationRuntimeState.OrchestrationInstance,
+ eventRaisedEvent,
+ failureReason))
+ {
+ break;
+ }
}
if (schedulerState.LockedBy == message.ParentInstanceId)
@@ -596,6 +710,20 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc
}
else
{
+ if (isPoisonMessage)
+ {
+ string failureReason = $"Entity self-continue message has dispatch count {eventRaisedEvent.DispatchCount} which exceeds the maximum allowed " +
+ $"dispatch count of {this.poisonMessageHandler.MaxDispatchCount}.";
+ this.logHelper.PoisonMessageDetected(runtimeState.OrchestrationInstance, eventRaisedEvent, failureReason);
+ if (await this.poisonMessageHandler.HandlePoisonMessageAsync(
+ workItem.OrchestrationRuntimeState.OrchestrationInstance,
+ eventRaisedEvent,
+ failureReason))
+ {
+ break;
+ }
+ }
+
// this is a continue message.
// Resumes processing of previously queued operations, if any.
schedulerState.Suspended = false;
@@ -627,6 +755,15 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc
}
var request = schedulerState.Dequeue();
+ var poisonMessageHandler = this.poisonMessageHandler;
+ if (poisonMessageHandler != null && request.DispatchCount > poisonMessageHandler.MaxDispatchCount)
+ {
+ this.logHelper.PoisonMessageDetected(
+ runtimeState.OrchestrationInstance,
+ request,
+ $"Entity request has dispatch count {request.DispatchCount} which exceeds the maximum dispatch count " +
+ $"of {poisonMessageHandler.MaxDispatchCount} and will be failed.");
+ }
if (request.IsLockRequest)
{
@@ -639,6 +776,8 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc
}
}
}
+
+ return new DetermineWorkResult(success: true, schedulerState, batch, errorMessage: null);
}
bool EntityIsDeleted(SchedulerState schedulerState)
@@ -721,7 +860,7 @@ public void ToBeContinued(SchedulerState schedulerState)
parentTraceContext);
}
- // We still want to add the trace activity to the list even if it was not successfully created and is null. This is because otherwise we have no easy way of mapping OperationResults to Activities otherwise if the lists
+ // We still want to add the trace activity to the list even if it was not successfully created and is null. This is because otherwise we have no easy way of mapping OperationResults to Activities if the lists
// do not have the same length in TraceHelper.EndActivitiesForProcessingEntityInvocation. We will simply skip ending the Activity if it is null in this method
traceActivities.Add(traceActivity);
@@ -846,12 +985,13 @@ void SendLockRequestMessage(WorkItemEffects effects, SchedulerState schedulerSta
this.ProcessSendEventMessage(effects, target, EntityMessageEventNames.RequestMessageEventName, message);
}
- void SendLockResponseMessage(WorkItemEffects effects, OrchestrationInstance target, Guid requestId)
+ void SendLockResponseMessage(WorkItemEffects effects, OrchestrationInstance target, Guid requestId, FailureDetails failureDetails)
{
var message = new ResponseMessage()
{
// content is ignored by receiver but helps with tracing
- Result = ResponseMessage.LockAcquisitionCompletion,
+ Result = ResponseMessage.LockAcquisitionCompletion,
+ FailureDetails = failureDetails,
};
this.ProcessSendEventMessage(effects, target, EntityMessageEventNames.ResponseMessageEventName(requestId), message);
}
@@ -955,13 +1095,30 @@ internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRunt
async Task ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState, bool isExtendedSession, bool includeEntityState)
{
+ var startTime = DateTime.UtcNow;
var (operations, traceActivities) = workToDoNow.GetOperationRequestsAndTraceActivities(instance.InstanceId);
+
+ bool poisonMessagesExist = workToDoNow.Operations.Any(op => op.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount);
+ var operationsToSend = operations;
+
+ if (poisonMessagesExist)
+ {
+ operationsToSend = new List();
+ for (int i = 0; i < operations.Count; i++)
+ {
+ if (workToDoNow.Operations[i].DispatchCount <= this.poisonMessageHandler.MaxDispatchCount)
+ {
+ operationsToSend.Add(operations[i]);
+ }
+ }
+ }
+
// the request object that will be passed to the worker
var request = new EntityBatchRequest()
{
InstanceId = instance.InstanceId,
EntityState = serializedEntityState,
- Operations = operations,
+ Operations = operationsToSend,
};
this.logHelper.EntityBatchExecuting(request);
@@ -999,11 +1156,50 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
}
var result = await taskEntity.ExecuteOperationBatchAsync(request);
-
+
dispatchContext.SetProperty(result);
});
var result = dispatchContext.GetProperty();
+
+ if (poisonMessagesExist)
+ {
+ // We initialize with an initial capacity of at least the middleware operations count,
+ // though we will have more results if there are poison messages
+ var resultAfterPoisonMessageHandling = new List(result.Results.Count);
+ int middlewareResultIndex = 0;
+
+ // We end iteration once we reach the end of the middleware results, any remaining operations
+ // (including potential poison messages) will be deferred
+ for (int i = 0; i < operations.Count && middlewareResultIndex < result.Results.Count; i++)
+ {
+ if (workToDoNow.Operations[i].DispatchCount <= this.poisonMessageHandler.MaxDispatchCount)
+ {
+ resultAfterPoisonMessageHandling.Add(result.Results[middlewareResultIndex++]);
+ }
+ else
+ {
+ resultAfterPoisonMessageHandling.Add(
+ new()
+ {
+ FailureDetails = new
+ (
+ "PoisonMessage",
+ $"Entity operation request has dispatch count {workToDoNow.Operations[i].DispatchCount} " +
+ $"which exceeds the maximum dispatch count of {this.poisonMessageHandler.MaxDispatchCount}.",
+ stackTrace: null,
+ innerFailure: null,
+ isNonRetriable: true
+ ),
+ StartTimeUtc = startTime,
+ EndTimeUtc = DateTime.UtcNow
+ }
+ );
+ }
+ }
+ result.Results = resultAfterPoisonMessageHandling;
+ }
+
TraceHelper.EndActivitiesForProcessingEntityInvocation(traceActivities, result.Results, result.FailureDetails);
this.logHelper.EntityBatchExecuted(request, result);
diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
index 751a64b78..8199fb859 100644
--- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
+++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs
@@ -50,6 +50,7 @@ public class TaskOrchestrationDispatcher
readonly TaskOrchestrationEntityParameters? entityParameters;
readonly VersioningSettings? versioningSettings;
readonly IExceptionPropertiesProvider? exceptionPropertiesProvider;
+ readonly IPoisonMessageHandler? poisonMessageHandler;
///
/// Initializes a new instance of the class with an exception properties provider.
@@ -80,6 +81,7 @@ internal TaskOrchestrationDispatcher(
this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties);
this.versioningSettings = versioningSettings;
this.exceptionPropertiesProvider = exceptionPropertiesProvider;
+ this.poisonMessageHandler = orchestrationService as IPoisonMessageHandler;
this.dispatcher = new WorkItemDispatcher(
"TaskOrchestrationDispatcher",
@@ -374,10 +376,16 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
try
{
// Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
- if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper))
+ if (!ReconcileMessagesWithState(
+ workItem,
+ nameof(TaskOrchestrationDispatcher),
+ this.errorPropagationMode,
+ logHelper,
+ this.poisonMessageHandler != null,
+ out string? errorMessage))
{
// TODO : mark an orchestration as faulted if there is data corruption
- this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
+ this.logHelper.DroppingOrchestrationWorkItem(workItem, errorMessage!);
TraceHelper.TraceSession(
TraceEventType.Error,
"TaskOrchestrationDispatcher-DeletedOrchestration",
@@ -385,6 +393,11 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
"Received work-item for an invalid orchestration");
isCompleted = true;
traceActivity?.Dispose();
+ if (this.poisonMessageHandler != null &&
+ await this.poisonMessageHandler.HandleInvalidWorkItemAsync(workItem, errorMessage!))
+ {
+ return isCompleted;
+ }
}
else
{
@@ -433,6 +446,34 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
}
}
+ var poisonEvents = runtimeState.NewEvents.Where(evt => evt.DispatchCount > this.poisonMessageHandler?.MaxDispatchCount);
+ if (poisonEvents.Count() > 0)
+ {
+ foreach (var poisonEvent in poisonEvents)
+ {
+ this.logHelper.PoisonMessageDetected(
+ runtimeState.OrchestrationInstance!,
+ poisonEvent,
+ $"Orchestration has received an event with dispatch count {poisonEvent.DispatchCount} which exceeds the maximum dispatch" +
+ $"count of {this.poisonMessageHandler!.MaxDispatchCount} and will be failed.");
+ }
+
+ var failureAction = new OrchestrationCompleteOrchestratorAction
+ {
+ Id = runtimeState.PastEvents.Count,
+ FailureDetails = new FailureDetails(
+ "PoisonMessages",
+ $"Orchestration has received messages of type {string.Join(",", poisonEvents.Select(e => e.EventType))} " +
+ $"with dispatch counts {string.Join(",", poisonEvents.Select(e => e.DispatchCount))} which exceed the " +
+ $"maximum dispatch count of {this.poisonMessageHandler!.MaxDispatchCount}.",
+ stackTrace: null,
+ innerFailure: null,
+ isNonRetriable: true),
+ OrchestrationStatus = OrchestrationStatus.Failed,
+ };
+ decisions = new List { failureAction };
+ }
+
this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
TraceHelper.TraceInstance(
TraceEventType.Verbose,
@@ -441,7 +482,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
"Executing user orchestration: {0}",
JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));
- if (!versioningFailed)
+ if (!versioningFailed && poisonEvents.Count() == 0)
{
// In this case we skip the orchestration's execution since all tasks have been completed and it is in a terminal state.
// Instead we "rewind" its execution by removing all failed tasks (see ProcessRewindOrchestrationDecision).
@@ -873,14 +914,31 @@ await this.dispatchPipeline.RunAsync(dispatchContext, _ =>
/// The name of the dispatcher, used for tracing.
/// The error propagation mode.
/// The log helper.
+ /// Indicates whether poison message handling is enabled.
+ /// If it is, the method will not throw any exceptions under the expectation that the poison message handler will handle the
+ /// invalid work item.
+ /// In the case that the work item should be dropped (this method return false), provides the reason why.
/// True if workItem should be processed further. False otherwise.
- internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workItem, string dispatcher, ErrorPropagationMode errorPropagationMode, LogHelper logHelper)
+ internal static bool ReconcileMessagesWithState(
+ TaskOrchestrationWorkItem workItem,
+ string dispatcher,
+ ErrorPropagationMode errorPropagationMode,
+ LogHelper logHelper,
+ bool isPoisonMessageHandlingEnabled,
+ out string? errorMessage)
{
foreach (TaskMessage message in workItem.NewMessages)
{
OrchestrationInstance orchestrationInstance = message.OrchestrationInstance;
if (string.IsNullOrWhiteSpace(orchestrationInstance?.InstanceId))
{
+ if (isPoisonMessageHandlingEnabled)
+ {
+ errorMessage = $"Work item includes a message with no orchestration instance ID with event type {message.Event.EventType}";
+ logHelper.PoisonMessageDetected(workItem.OrchestrationRuntimeState.OrchestrationInstance, message.Event, errorMessage);
+ return false;
+ }
+
throw TraceHelper.TraceException(
TraceEventType.Error,
$"{dispatcher}-OrchestrationInstanceMissing",
@@ -890,6 +948,11 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
if (!workItem.OrchestrationRuntimeState.IsValid)
{
// we get here if the orchestration history is somehow corrupted (partially deleted, etc.)
+ string corruptionType = workItem.OrchestrationRuntimeState.Events.Count == 1 ?
+ $"its history contains exactly one event which is neither an {EventType.ExecutionStarted} or " +
+ $"{EventType.OrchestratorStarted} but rather has type {workItem.OrchestrationRuntimeState.Events[0].EventType}" :
+ $"its history contains multiple events but no {EventType.ExecutionStarted} event";
+ errorMessage = $"Orchestration runtime state is invalid: {corruptionType}";
return false;
}
@@ -898,6 +961,7 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
// we get here because of:
// i) responses for scheduled tasks after the orchestrations have been completed
// ii) responses for explicitly deleted orchestrations
+ errorMessage = $"Orchestration contains no {EventType.ExecutionStarted} event in its history and did not receive one as part of its new messages.";
return false;
}
@@ -905,15 +969,9 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
&& workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running
&& workItem.NewMessages.Count > 1)
{
- foreach (TaskMessage droppedMessage in workItem.NewMessages)
- {
- if (droppedMessage.Event.EventType != EventType.ExecutionRewound)
- {
- logHelper.DroppingOrchestrationMessage(workItem, droppedMessage, "Multiple messages sent to an instance " +
- "that is attempting to rewind from a terminal state. The only message that can be sent in " +
- "this case is the rewind request.");
- }
- }
+ errorMessage = "Multiple messages sent to an instance that is attempting to rewind from a terminal state. " +
+ "The only message that can be sent in this case is the rewind request.";
+ logHelper.PoisonMessageDetected(orchestrationInstance, message.Event, errorMessage);
return false;
}
@@ -1005,6 +1063,7 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
}
}
+ errorMessage = null;
return true;
}
diff --git a/src/DurableTask.Core/Tracing/TraceHelper.cs b/src/DurableTask.Core/Tracing/TraceHelper.cs
index 532e67a56..a912887fe 100644
--- a/src/DurableTask.Core/Tracing/TraceHelper.cs
+++ b/src/DurableTask.Core/Tracing/TraceHelper.cs
@@ -626,51 +626,51 @@ internal static void EmitTraceActivityForTimer(
/// the entity did not return results for all of the requests
internal static void EndActivitiesForProcessingEntityInvocation(List traceActivities, List results, FailureDetails? batchFailureDetails)
{
- if (results.Count == traceActivities.Count)
+ foreach (var (activity, result) in traceActivities.Zip(results, (activity, result) => (activity, result)))
{
- foreach (var (activity, result) in traceActivities.Zip(results, (activity, result) => (activity, result)))
+ if (activity != null)
{
- if (activity != null)
+ if (result.ErrorMessage != null || result.FailureDetails != null)
{
- if (result.ErrorMessage != null || result.FailureDetails != null)
- {
- string errorDetails = result.ErrorMessage ?? result.FailureDetails!.ErrorMessage;
- activity.SetTag(Schema.Task.ErrorMessage, errorDetails);
- activity.SetStatus(ActivityStatusCode.Error, errorDetails);
- }
- else
- {
- activity.SetStatus(ActivityStatusCode.OK, "Completed");
- }
- if (result.StartTimeUtc is DateTime startTime)
- {
- activity.SetStartTime(startTime);
- }
- if (result.EndTimeUtc is DateTime endTime)
- {
- activity.SetEndTime(endTime);
- }
- activity.Dispose();
+ string errorDetails = result.ErrorMessage ?? result.FailureDetails!.ErrorMessage;
+ activity.SetTag(Schema.Task.ErrorMessage, errorDetails);
+ activity.SetStatus(ActivityStatusCode.Error, errorDetails);
}
+ else
+ {
+ activity.SetStatus(ActivityStatusCode.OK, "Completed");
+ }
+ if (result.StartTimeUtc is DateTime startTime)
+ {
+ activity.SetStartTime(startTime);
+ }
+ if (result.EndTimeUtc is DateTime endTime)
+ {
+ activity.SetEndTime(endTime);
+ }
+ activity.Dispose();
}
}
- // This can happen if some of the operations failed and have no corresponding OperationResult
- // There is no way to map the successful operation results to the corresponding operation requests or trace activities, so we will just "fail" the trace activities in this case and dispose them
- else
+
+ // This can happen if not all of the operations in the batch were executed, in which case we populate the remaining
+ // activities with the failure details if they are available.
+ // If not, this work will be deferred and tried again, so we do not want to publish the activity.
+ for (int i = results.Count; i < traceActivities.Count; i++)
{
- string errorMessage = "Unable to generate a trace activity for the entity invocation even though it may have succeeded.";
- if (batchFailureDetails is FailureDetails failureDetails)
- {
- errorMessage += $" If it failed, it may be due to {failureDetails.ErrorMessage}";
- }
- foreach (var activity in traceActivities)
+ var activity = traceActivities[i];
+ if (activity != null)
{
- if (activity != null)
+ if (batchFailureDetails != null)
+ {
+ activity.SetTag(Schema.Task.ErrorMessage, batchFailureDetails.ErrorMessage);
+ activity.SetStatus(ActivityStatusCode.Error, batchFailureDetails.ErrorMessage);
+ }
+ else
{
- activity.SetTag(Schema.Task.ErrorMessage, errorMessage);
- activity.SetStatus(ActivityStatusCode.Error, errorMessage);
- activity.Dispose();
+ // This will only work if the listener honors the Recorded flag, so this is a best effort attempt
+ activity.ActivityTraceFlags &= ~ActivityTraceFlags.Recorded;
}
+ activity.Dispose();
}
}
}