diff --git a/cmd/localstack/awsutil.go b/cmd/localstack/awsutil.go index 31235e4..d7cdf5c 100644 --- a/cmd/localstack/awsutil.go +++ b/cmd/localstack/awsutil.go @@ -92,7 +92,7 @@ func getBootstrap(args []string) (interop.Bootstrap, string) { return NewSimpleBootstrap(bootstrapLookupCmd, currentWorkingDir), handler } -func PrintEndReports(invokeId string, initDuration string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) { +func PrintEndReports(invokeId string, initDuration string, status string, memorySize string, invokeStart time.Time, timeoutDuration time.Duration, w io.Writer) { // Calculate invoke duration invokeDuration := math.Min(float64(time.Now().Sub(invokeStart).Nanoseconds()), float64(timeoutDuration.Nanoseconds())) / float64(time.Millisecond) @@ -102,11 +102,12 @@ func PrintEndReports(invokeId string, initDuration string, memorySize string, in // not a clean way to get this information from rapidcore _, _ = fmt.Fprintf(w, "REPORT RequestId: %s\t"+ - initDuration+ "Duration: %.2f ms\t"+ "Billed Duration: %.f ms\t"+ "Memory Size: %s MB\t"+ - "Max Memory Used: %s MB\t\n", + "Max Memory Used: %s MB\t"+ + initDuration+ + status+"\n", invokeId, invokeDuration, math.Ceil(invokeDuration), memorySize, memorySize) } diff --git a/cmd/localstack/custom_interop.go b/cmd/localstack/custom_interop.go index f547551..61cf9ae 100644 --- a/cmd/localstack/custom_interop.go +++ b/cmd/localstack/custom_interop.go @@ -12,14 +12,17 @@ import ( "net/http" "strconv" "strings" + "sync/atomic" "time" "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/core/statejson" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror" "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore" "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone" "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lsapi" "github.com/go-chi/chi/v5" + "github.com/google/uuid" log "github.com/sirupsen/logrus" ) @@ -28,6 +31,44 @@ type CustomInteropServer struct { localStackAdapter *LocalStackAdapter port string upstreamEndpoint string + // logCollector accumulates the runtime's stdout/stderr plus the synthetic START/REPORT/ + // INIT_REPORT lines that are flushed to LocalStack with each invocation's logs. + logCollector *LogCollector + // eventsAPI provides rapid's authoritative Init-phase duration (see events.go), used for + // the REPORT/INIT_REPORT log lines instead of wall-clock measurements at invoke arrival. + eventsAPI *LocalStackEventsAPI + // initStart is set once in Init() and warmStart is flipped on the first invoke. + // Both are accessed only from the single sequential init -> invoke flow (the RIE + // processes one invocation at a time), so they need no additional synchronization. + initStart time.Time + warmStart bool + // initTimedOut is set by ReportInitTimeout when the init phase exceeds its timeout. It is + // written from the init-await flow and read from the invoke flow, so it uses atomic access. + // When set, the first invocation's REPORT omits Init Duration (init was already reported as + // timed out and is re-run as a suppressed init during that invocation). + initTimedOut atomic.Bool + // initErrorForwarded is set once the runtime's own /init/error has been forwarded to + // LocalStack via SendInitErrorResponse, so the crash-path fallback (SendInitError) does + // not send a duplicate error status for the same failed initialization. + initErrorForwarded atomic.Bool + // initErrorType holds rapidcore's scrubbed fatal error type (e.g. Runtime.Unknown) when init + // failed, used to render the INIT_REPORT(phase=invoke) and REPORT Status/Error Type lines for + // the on-demand folded-into-invoke path. Stores a string; empty/unset means init did not fail. + // It persists while invocations keep failing (each one re-runs the init as a suppressed init + // and AWS re-emits the failure envelope), and is cleared by the invoke handler once an + // invocation succeeds so a recovered environment is not tainted by the original failure. + initErrorType atomic.Value + // onDemand is true for on-demand functions, where AWS folds a failed cold-start init into + // the first invocation (suppressed init). For these we do NOT report init failures via + // /status/error; instead we signal ready and let the first invoke surface the error with + // the full INIT_REPORT/START/END/REPORT envelope. Provisioned concurrency and Managed + // Instances keep the provisioning-time /status/error model. SnapStart environments are + // also classified on-demand here (LocalStack sets AWS_LAMBDA_INITIALIZATION_TYPE=on-demand + // for them and initializes them lazily at the first invoke, not at version publish), so the + // fold-into-invoke model applies to them too. + // TODO: set AWS_LAMBDA_INITIALIZATION_TYPE=snap-start on the LocalStack side for env-var + // parity with AWS once SnapStart environments get their own initialization type. + onDemand bool } type LocalStackAdapter struct { @@ -44,10 +85,11 @@ const ( func (l *LocalStackAdapter) SendStatus(status LocalStackStatus, payload []byte) error { statusUrl := fmt.Sprintf("%s/status/%s/%s", l.UpstreamEndpoint, l.RuntimeId, status) - _, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload)) + resp, err := http.Post(statusUrl, "application/json", bytes.NewReader(payload)) if err != nil { return err } + defer resp.Body.Close() return nil } @@ -57,8 +99,12 @@ func (l *LocalStackAdapter) SendLogs(invokeId string, logs lsapi.LogResponse) er if err != nil { return err } - _, err = http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized)) - return err + resp, err := http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized)) + if err != nil { + return err + } + defer resp.Body.Close() + return nil } // SendResult posts the invocation result body to LocalStack. @@ -78,19 +124,23 @@ func (l *LocalStackAdapter) SendResult(invokeId string, body []byte, isError boo } else { log.Infoln("Sending to /response") } - _, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body)) - return err + resp, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body)) + if err != nil { + return err + } + defer resp.Body.Close() + return nil } -func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) { +func NewCustomInteropServer(lsOpts *LsOpts, adapter *LocalStackAdapter, delegate interop.Server, logCollector *LogCollector, eventsAPI *LocalStackEventsAPI) (server *CustomInteropServer) { server = &CustomInteropServer{ - delegate: delegate.(*rapidcore.Server), - port: lsOpts.InteropPort, - upstreamEndpoint: lsOpts.RuntimeEndpoint, - localStackAdapter: &LocalStackAdapter{ - UpstreamEndpoint: lsOpts.RuntimeEndpoint, - RuntimeId: lsOpts.RuntimeId, - }, + delegate: delegate.(*rapidcore.Server), + port: lsOpts.InteropPort, + upstreamEndpoint: lsOpts.RuntimeEndpoint, + localStackAdapter: adapter, + logCollector: logCollector, + eventsAPI: eventsAPI, + onDemand: GetenvWithDefault("AWS_LAMBDA_INITIALIZATION_TYPE", "on-demand") == "on-demand", } // TODO: extract this @@ -110,8 +160,36 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto } invokeResp := &standalone.ResponseWriterProxy{} - functionVersion := GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION") // default $LATEST - _, _ = fmt.Fprintf(logCollector, "START RequestId: %s Version: %s\n", invokeR.InvokeId, functionVersion) + // The synthetic START line is emitted via LocalStackEventsAPI.SendInvokeStart so it + // lands after any inline (suppressed) init, matching AWS — see events.go. + + initErrType, _ := server.initErrorType.Load().(string) + + // First invocation into a successfully initialized on-demand environment: REPORT + // carries the Init phase duration as measured by rapid (init start -> init end). + // Provisioned concurrency / Managed Instances initialize at provisioning time and + // AWS omits Init Duration from their invokes' REPORT lines. + initDuration := "" + if server.onDemand && !server.warmStart && !server.initTimedOut.Load() && initErrType == "" { + if initTimeMS, ok := server.eventsAPI.InitDurationMS(); ok { + initDuration = fmt.Sprintf("Init Duration: %.2f ms\t", initTimeMS) + } + } + server.warmStart = true + + // On-demand init failure folded into this invocation (AWS suppressed init): emit + // the INIT_REPORT(phase=invoke) line before START (emitted during Invoke below), + // reporting the failed init's duration (rapid's measurement when available; the + // wall-clock fallback covers inits that died before emitting INIT_REPORT). + if initErrType != "" { + initTimeMS, ok := server.eventsAPI.InitDurationMS() + if !ok { + initTimeMS = float64(time.Since(server.initStart).Nanoseconds()) / float64(time.Millisecond) + } + _, _ = fmt.Fprintf(logCollector, + "INIT_REPORT Init Duration: %.2f ms\tPhase: invoke\tStatus: error\tError Type: %s\n", + initTimeMS, initErrType) + } invokeStart := time.Now() err = server.Invoke(invokeResp, &interop.Invoke{ @@ -134,15 +212,17 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto }) timeout := int(server.delegate.GetInvokeTimeout().Seconds()) isErr := false + status := "" if err != nil { switch { case errors.Is(err, rapidcore.ErrInvokeTimeout): log.Debugf("Got invoke timeout") isErr = true + status = "Status: timeout" errorResponse := lsapi.ErrorResponse{ + ErrorType: "Sandbox.Timedout", ErrorMessage: fmt.Sprintf( - "%s %s Task timed out after %d.00 seconds", - time.Now().Format("2006-01-02T15:04:05Z"), + "RequestId: %s Error: Task timed out after %d.00 seconds", invokeR.InvokeId, timeout, ), @@ -161,6 +241,20 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto log.Fatalln(err) } } + // On-demand init failure folded into this invocation: when the suppressed init + // re-run (and thus the invoke) failed again, the REPORT carries the failure status + // and rapidcore's scrubbed fatal error type (e.g. Runtime.Unknown). When the + // invocation succeeded (the suppressed re-init recovered from a transient init + // failure), the result stands on its own — AWS reports it as successful — and the + // cached init failure is cleared so later invocations are not tainted by it. + if initErrType != "" { + if err != nil { + isErr = true + status = "Status: error\tError Type: " + initErrType + } else { + server.initErrorType.Store("") + } + } // optional sleep. can be used for debugging purposes if lsOpts.PostInvokeWaitMS != "" { waitMS, err := strconv.Atoi(lsOpts.PostInvokeWaitMS) @@ -171,7 +265,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto } timeoutDuration := time.Duration(timeout) * time.Second memorySize := GetEnvOrDie("AWS_LAMBDA_FUNCTION_MEMORY_SIZE") - PrintEndReports(invokeR.InvokeId, "", memorySize, invokeStart, timeoutDuration, logCollector) + PrintEndReports(invokeR.InvokeId, initDuration, status, memorySize, invokeStart, timeoutDuration, logCollector) if err2 := server.localStackAdapter.SendLogs(invokeR.InvokeId, logCollector.getLogs()); err2 != nil { log.Error("failed to send logs to LocalStack: ", err2) @@ -204,13 +298,116 @@ func (c *CustomInteropServer) SendErrorResponse(invokeID string, resp *interop.E return c.delegate.SendErrorResponse(invokeID, resp) } -// SendInitErrorResponse writes error response during init to a shared memory and sends GIRD FAULT. +// SendInitErrorResponse forwards the init error reported by the runtime (via /init/error) to +// LocalStack and then propagates it to the delegate. It marks initErrorForwarded so the +// crash-path fallback in main.go (SendInitError) does not send a duplicate error status for +// the same failed initialization. func (c *CustomInteropServer) SendInitErrorResponse(resp *interop.ErrorInvokeResponse) error { log.Traceln("SendInitErrorResponse called") - if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil { - log.Fatalln("Failed to send init error to LocalStack " + err.Error() + ". Exiting.") + // Mark synchronously, before sending: this runs in the init flow before + // AwaitInitializedWithDetails unblocks in main.go, so the fallback observes the flag. + c.initErrorForwarded.Store(true) + // Record rapidcore's scrubbed fatal error type so the folded-into-invoke path can render the + // INIT_REPORT(phase=invoke) and REPORT Status/Error Type lines (on-demand). + c.initErrorType.Store(string(resp.FunctionError.Type)) + + // Always cache the structured error in the delegate so the first invoke can surface it. + defer c.delegate.SendInitErrorResponse(resp) + + // On-demand folds the failed init into the first invocation, which carries the error and + // logs; reporting it here via /status/error too would race the invoke and fail the env + // startup before the invoke runs. PC/SnapStart/MI report at provisioning time below. + if c.onDemand { + return nil + } + + // Forward the runtime's structured payload as-is and only inject the requestId. Decoding + // into a map rather than a typed struct preserves fields exactly as the runtime emitted + // them — in particular an empty but present "stackTrace": [] (e.g. Runtime.HandlerNotFound), + // which a typed struct with omitempty would drop on re-marshal. + var payload map[string]any + if err := json.Unmarshal(resp.Payload, &payload); err != nil { + log.WithError(err).Warn("Failed to parse init error payload; forwarding raw payload") + if err := c.localStackAdapter.SendStatus(Error, resp.Payload); err != nil { + log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId). + Error("Failed to send init error to LocalStack") + } + return nil + } + + // No invocation is active during the init phase, so this is typically blank; AWS still + // includes a (blank) requestId in the init error payload. + payload["requestId"] = c.delegate.GetCurrentInvokeID() + + body, err := json.Marshal(payload) + if err != nil { + log.WithError(err).Error("Failed to marshal adapted init error response") + body = resp.Payload } - return c.delegate.SendInitErrorResponse(resp) + + if err := c.localStackAdapter.SendStatus(Error, body); err != nil { + log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId). + Error("Failed to send init error to LocalStack") + } + return nil +} + +// SendInitError reports a structured init failure to LocalStack when the runtime failed to +// initialize WITHOUT calling /init/error itself (e.g. it crashed, called sys.exit, or had an +// invalid entrypoint). The init failure is detected by the existing rapidcore machinery +// (watchEvents -> InitFailure -> AwaitInitializedWithDetails) and surfaced to main.go. +// It is a no-op if SendInitErrorResponse already forwarded the runtime's own structured error. +func (c *CustomInteropServer) SendInitError(errType fatalerror.ErrorType, errMsg error) { + if c.initErrorForwarded.Load() { + log.Debug("Init error already forwarded to LocalStack; skipping duplicate") + return + } + + if errType == "" { + errType = fatalerror.RuntimeExit + } + + message := "Runtime exited during initialization" + if errMsg != nil { + message = errMsg.Error() + } + + // Match AWS's fault message format "RequestId: Error: ". No invocation is active + // during the init phase (LocalStack only dispatches an invoke after the runtime reports + // ready), so synthesize a request ID, preferring the current invoke ID if one exists. + requestID := c.delegate.GetCurrentInvokeID() + if requestID == "" { + requestID = uuid.NewString() + } + + payload, err := json.Marshal(lsapi.ErrorResponse{ + ErrorType: string(errType), + ErrorMessage: fmt.Sprintf("RequestId: %s Error: %s", requestID, message), + }) + if err != nil { + log.WithError(err).Error("Failed to marshal init error response") + return + } + + if err := c.localStackAdapter.SendStatus(Error, payload); err != nil { + log.WithError(err).WithField("runtime-id", c.localStackAdapter.RuntimeId). + Error("Failed to send init error to LocalStack") + } +} + +// RecordInitError records the structured init failure detected by rapidcore for runtimes that +// failed WITHOUT calling /init/error (crash, sys.exit, invalid entrypoint), so the on-demand +// folded-into-invoke path renders the same INIT_REPORT(phase=invoke) and REPORT Status/Error +// Type lines as the /init/error-reported flavor. It must not overwrite a type already recorded +// by SendInitErrorResponse: the runtime-reported error is the authoritative one. +func (c *CustomInteropServer) RecordInitError(errType fatalerror.ErrorType) { + if recorded, _ := c.initErrorType.Load().(string); recorded != "" { + return + } + if errType == "" { + errType = fatalerror.RuntimeExit + } + c.initErrorType.Store(string(errType)) } func (c *CustomInteropServer) GetCurrentInvokeID() string { @@ -225,9 +422,20 @@ func (c *CustomInteropServer) SendRuntimeReady() error { func (c *CustomInteropServer) Init(i *interop.Init, invokeTimeoutMs int64) error { log.Traceln("Init called") + c.initStart = time.Now() return c.delegate.Init(i, invokeTimeoutMs) } +// ReportInitTimeout emits an AWS-style INIT_REPORT timeout line into the log collector and +// marks the init as timed out. The init is then re-run as a suppressed init during the first +// invocation (under the function timeout), and that invocation's REPORT omits Init Duration. +func (c *CustomInteropServer) ReportInitTimeout() { + c.initTimedOut.Store(true) + initTimeMS := float64(time.Since(c.initStart).Nanoseconds()) / float64(time.Millisecond) + _, _ = fmt.Fprintf(c.logCollector, + "INIT_REPORT Init Duration: %.2f ms\tPhase: init\tStatus: timeout\n", initTimeMS) +} + func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error { log.Traceln("Invoke called") return c.delegate.Invoke(responseWriter, invoke) diff --git a/cmd/localstack/events.go b/cmd/localstack/events.go new file mode 100644 index 0000000..6a336ab --- /dev/null +++ b/cmd/localstack/events.go @@ -0,0 +1,52 @@ +package main + +import ( + "fmt" + "sync/atomic" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone/telemetry" + lambdatelemetry "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/telemetry" +) + +// LocalStackEventsAPI rides rapidcore's invoke lifecycle events to emit the synthetic START +// log line at the AWS-faithful point. rapidcore calls SendInvokeStart after any inline +// (suppressed) init and before the runtime handles the invocation (see doInvoke in +// internal/lambda/rapid/handlers.go), so emitting START here — rather than eagerly when +// LocalStack dispatches /invoke — places it after a re-run init's logs, matching AWS. +type LocalStackEventsAPI struct { + *telemetry.StandaloneEventsAPI + logCollector *LogCollector + // initDurationMS holds rapid's authoritative measurement of the Init phase (init start -> + // init end, monotonic), captured from the INIT_REPORT(phase=init) lifecycle event emitted + // by doRuntimeDomainInit. The /invoke handler renders it as the first invocation's REPORT + // "Init Duration" instead of re-measuring at invoke arrival, which would wrongly include + // the idle gap between init completion and the first invoke dispatch. + initDurationMS atomic.Value // float64 +} + +func NewLocalStackEventsAPI(logCollector *LogCollector) *LocalStackEventsAPI { + return &LocalStackEventsAPI{ + StandaloneEventsAPI: new(telemetry.StandaloneEventsAPI), + logCollector: logCollector, + } +} + +func (e *LocalStackEventsAPI) SendInvokeStart(data interop.InvokeStartData) error { + _, _ = fmt.Fprintf(e.logCollector, "START RequestId: %s Version: %s\n", data.RequestID, data.Version) + return e.StandaloneEventsAPI.SendInvokeStart(data) +} + +func (e *LocalStackEventsAPI) SendInitReport(data interop.InitReportData) error { + if data.Phase == lambdatelemetry.InitInsideInitPhase { + e.initDurationMS.Store(data.Metrics.DurationMs) + } + return e.StandaloneEventsAPI.SendInitReport(data) +} + +// InitDurationMS returns rapid's measured duration of the startup Init phase and whether one +// was recorded (false e.g. when the init phase timed out and never completed). +func (e *LocalStackEventsAPI) InitDurationMS() (float64, bool) { + durationMS, ok := e.initDurationMS.Load().(float64) + return durationMS, ok +} diff --git a/cmd/localstack/logs.go b/cmd/localstack/logs.go index ac11b0a..385d348 100644 --- a/cmd/localstack/logs.go +++ b/cmd/localstack/logs.go @@ -38,6 +38,11 @@ func (lc *LogCollector) reset() { func (lc *LogCollector) getLogs() lsapi.LogResponse { lc.mutex.Lock() defer lc.mutex.Unlock() + // Emit the captured runtime output verbatim. Do NOT rewrite bare carriage returns to line + // feeds: AWS keeps a bare CR inside a single CloudWatch log event (it splits records on LF + // only), so a user `print("a\rb")` must stay the one event "a\rb". LocalStack's log ingestion + // likewise splits on "\n" (see services/lambda_/.../logs.py), so converting CR to LF here + // would wrongly split such records — see TestCloudwatchLogs::test_multi_line_prints. response := lsapi.LogResponse{ Logs: strings.Join(lc.RuntimeLogs, ""), } diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index b03d877..b310bb7 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -4,17 +4,30 @@ package main import ( "context" + "errors" + "fmt" "os" "runtime/debug" "strconv" "strings" "time" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror" "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore" log "github.com/sirupsen/logrus" ) +const ( + // defaultInitPhaseTimeoutSeconds matches AWS's 10s init-phase limit. When init exceeds + // this, the init is retried at the time of the first invocation under the function + // timeout ("suppressed init"). Override via LOCALSTACK_INIT_PHASE_TIMEOUT. + defaultInitPhaseTimeoutSeconds = 10 + // initResetTimeoutMs bounds the reset that aborts a timed-out init so rapidcore re-runs + // it on the first invocation. + initResetTimeoutMs = 2000 +) + type LsOpts struct { InteropPort string RuntimeEndpoint string @@ -32,6 +45,7 @@ type LsOpts struct { EnableXRayTelemetry string PostInvokeWaitMS string MaxPayloadSize string + InitPhaseTimeout string } func GetEnvOrDie(env string) string { @@ -49,12 +63,13 @@ func InitLsOpts() *LsOpts { RuntimeId: GetEnvOrDie("LOCALSTACK_RUNTIME_ID"), AccountId: GetenvWithDefault("LOCALSTACK_FUNCTION_ACCOUNT_ID", "000000000000"), // optional with default - InteropPort: GetenvWithDefault("LOCALSTACK_INTEROP_PORT", "9563"), - InitTracingPort: GetenvWithDefault("LOCALSTACK_RUNTIME_TRACING_PORT", "9564"), - User: GetenvWithDefault("LOCALSTACK_USER", "sbx_user1051"), - InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "warn"), - EdgePort: GetenvWithDefault("EDGE_PORT", "4566"), - MaxPayloadSize: GetenvWithDefault("LOCALSTACK_MAX_PAYLOAD_SIZE", "6291556"), + InteropPort: GetenvWithDefault("LOCALSTACK_INTEROP_PORT", "9563"), + InitTracingPort: GetenvWithDefault("LOCALSTACK_RUNTIME_TRACING_PORT", "9564"), + User: GetenvWithDefault("LOCALSTACK_USER", "sbx_user1051"), + InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "warn"), + EdgePort: GetenvWithDefault("EDGE_PORT", "4566"), + MaxPayloadSize: GetenvWithDefault("LOCALSTACK_MAX_PAYLOAD_SIZE", "6291556"), + InitPhaseTimeout: GetenvWithDefault("LOCALSTACK_INIT_PHASE_TIMEOUT", strconv.Itoa(defaultInitPhaseTimeoutSeconds)), // optional or empty CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"), HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","), @@ -82,6 +97,7 @@ func UnsetLsEnvs() { "LOCALSTACK_POST_INVOKE_WAIT_MS", "LOCALSTACK_FUNCTION_ACCOUNT_ID", "LOCALSTACK_MAX_PAYLOAD_SIZE", + "LOCALSTACK_INIT_PHASE_TIMEOUT", "LOCALSTACK_CHMOD_PATHS", // Docker container ID @@ -179,6 +195,16 @@ func main() { localStackLogsEgressApi := NewLocalStackLogsEgressAPI(logCollector) tracer := NewLocalStackTracer() + // Create LocalStack adapter upfront so it can be shared with the interop server + lsAdapter := &LocalStackAdapter{ + UpstreamEndpoint: lsOpts.RuntimeEndpoint, + RuntimeId: lsOpts.RuntimeId, + } + + // Events API rides rapidcore's invoke lifecycle to emit the synthetic START log line after + // any inline (suppressed) init, matching AWS's ordering. + lsEventsAPI := NewLocalStackEventsAPI(logCollector) + // build sandbox sandbox := rapidcore. NewSandboxBuilder(). @@ -190,7 +216,8 @@ func main() { SetExtensionsFlag(true). SetInitCachingFlag(true). SetLogsEgressAPI(localStackLogsEgressApi). - SetTracer(tracer) + SetTracer(tracer). + SetEventsAPI(lsEventsAPI) // Corresponds to the 'AWS_LAMBDA_RUNTIME_API' environment variable. // We need to ensure the runtime server is up before the INIT phase, @@ -211,7 +238,7 @@ func main() { runDaemon(d) // async defaultInterop := sandbox.DefaultInteropServer() - interopServer := NewCustomInteropServer(lsOpts, defaultInterop, logCollector) + interopServer := NewCustomInteropServer(lsOpts, lsAdapter, defaultInterop, logCollector, lsEventsAPI) sandbox.SetInteropServer(interopServer) if len(handler) > 0 { sandbox.SetHandler(handler) @@ -248,12 +275,73 @@ func main() { log.Debugln("Starting runtime init.") InitHandler(sandbox.LambdaInvokeAPI(), GetEnvOrDie("AWS_LAMBDA_FUNCTION_VERSION"), int64(invokeTimeoutSeconds), bootstrap, lsOpts.AccountId) // TODO: replace this with a custom init + initPhaseTimeoutSeconds := defaultInitPhaseTimeoutSeconds + if parsed, perr := strconv.Atoi(lsOpts.InitPhaseTimeout); perr == nil { + initPhaseTimeoutSeconds = parsed + } else { + log.Warnln("Invalid LOCALSTACK_INIT_PHASE_TIMEOUT, using default:", perr) + } + log.Debugln("Awaiting initialization of runtime init.") - if err := interopServer.delegate.AwaitInitialized(); err != nil { - // Error cases: ErrInitDoneFailed or ErrInitResetReceived + initResp, timedOut, err := interopServer.delegate.AwaitInitializedWithTimeout( + time.Duration(initPhaseTimeoutSeconds) * time.Second, + ) + switch { + case timedOut && !interopServer.onDemand: + // Provisioned concurrency / Managed Instances: AWS fails the provisioning operation + // when the extended init window is exceeded — there is no suppressed-init retry at + // invoke time. Report the failure and exit instead of signaling ready. + // TODO: validate the exact provisioning-failure errorType/message against AWS + // (e.g. the Managed Instances API model's FUNCTION_ERROR_INIT_TIMEOUT). + log.Errorf("Extended init phase timed out after %ds. Exiting.", initPhaseTimeoutSeconds) + interopServer.SendInitError( + fatalerror.SandboxTimeout, + fmt.Errorf("Init phase timed out after %d seconds", initPhaseTimeoutSeconds), + ) + return + case timedOut: + // On-demand: AWS limits the init phase to 10s. When exceeded, init is retried at the + // time of the first invocation under the function timeout ("suppressed init"). We + // report the init timeout, reset the in-progress init so rapidcore re-runs a fresh + // Init phase when the first invoke arrives, and only then signal ready. + // The reset must complete BEFORE signaling ready: its cleanup (Clear/Release in + // rapidcore.Server.Reset) releases the current reservation, so running it concurrently + // with the first invoke's Reserve() would cancel that invoke's reservation mid-flight. + // The reset cannot block on the unconsumed init failure: awaitInitCompletion acks rapid + // before the (still pending) initFailures channel send, which the invoke path's + // Reserve()/awaitInitialized() later consumes to trigger the suppressed init. + log.Debugln("Init phase timed out; deferring to suppressed init on first invocation.") + interopServer.ReportInitTimeout() + if _, resetErr := interopServer.delegate.Reset("initTimeout", initResetTimeoutMs); resetErr != nil { + // A non-nil error only carries the aborted init's fatal error type; the reset + // itself has completed and the suppressed-init retry stays valid. + log.Debugf("Reset after init timeout returned: %s", resetErr) + } + // Consume the reset-interrupted init's failure notification: if the first invoke's + // awaitInitialized() consumed it instead, rapidcore would cache a generic placeholder + // error (Sandbox.Failure with an empty payload) that masks the real error when the + // suppressed init re-run fails (e.g. a runtime crash without /init/error). + interopServer.delegate.DrainInitFailure() + case interopServer.onDemand && errors.Is(err, rapidcore.ErrInitDoneFailed): + // On-demand: AWS folds a failed cold-start init into the first invocation (suppressed + // init). Signal ready and keep the process alive so LocalStack dispatches the first + // invoke, which surfaces the cached init error (or a runtime-exit error) together with + // the full INIT_REPORT/START/END/REPORT log envelope. SendInitErrorResponse has already + // cached the structured error (without reporting via /status/error for on-demand). + // Record the failure type detected by rapidcore so runtimes that crashed WITHOUT + // calling /init/error still get the error envelope (no-op when /init/error already + // recorded the runtime-reported type). + log.Debugln("Init failed; deferring to first invocation (on-demand suppressed init).") + interopServer.RecordInitError(initResp.InitErrorType) + case err != nil: + // PC/SnapStart/MI, or an init-phase reset: report the failure now and exit. When the + // runtime reported its own error via /init/error, SendInitErrorResponse already + // forwarded it and SendInitError is a no-op. When the runtime crashed/exited without + // reporting, this is the only callback that notifies LocalStack. log.Errorln("Runtime init failed to initialize: " + err.Error() + ". Exiting.") - // NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the - // callback SendInitErrorResponse because it contains the correct error response payload. + if !errors.Is(err, rapidcore.ErrInitResetReceived) { + interopServer.SendInitError(initResp.InitErrorType, initResp.InitErrorMessage) + } return } diff --git a/internal/lambda/rapidcore/server_localstack.go b/internal/lambda/rapidcore/server_localstack.go new file mode 100644 index 0000000..a40cde9 --- /dev/null +++ b/internal/lambda/rapidcore/server_localstack.go @@ -0,0 +1,94 @@ +package rapidcore + +// This file contains LocalStack-specific additions to the rapidcore Server. It is kept +// separate from server.go (which is vendored upstream from +// aws-lambda-runtime-interface-emulator) so that upstream stays byte-identical and rebases +// never conflict. Because the logic needs the unexported init-failures channel and runtime +// state helpers, it must live in package rapidcore rather than in cmd/localstack. + +import ( + "time" + + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/fatalerror" + "github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop" + + log "github.com/sirupsen/logrus" +) + +// InitCompletionResponse carries the structured init failure cause (error type and message) +// extracted from the InitFailure. It lets standalone callers report the failure instead of +// only seeing the sentinel error (ErrInitDoneFailed / ErrInitResetReceived). +type InitCompletionResponse struct { + InitErrorType fatalerror.ErrorType + InitErrorMessage error +} + +// interpretInitFailure mirrors the upstream Server.awaitInitialized body, mapping an +// InitFailure to the sentinel error and structured cause. It is duplicated here (rather than +// refactored out of server.go) to keep the upstream file untouched. +func (s *Server) interpretInitFailure(initFailure interop.InitFailure, awaitingInitStatus bool) (InitCompletionResponse, error) { + resp := InitCompletionResponse{} + + if initFailure.ResetReceived { + // Resets during Init are only received in standalone + // during an invoke timeout + s.setRuntimeState(runtimeInitFailed) + resp.InitErrorType = initFailure.ErrorType + resp.InitErrorMessage = initFailure.ErrorMessage + return resp, ErrInitResetReceived + } + + if awaitingInitStatus { + // channel not closed, received init failure + // Sandbox can be reserved even if init failed (due to function errors) + s.setRuntimeState(runtimeInitFailed) + resp.InitErrorType = initFailure.ErrorType + resp.InitErrorMessage = initFailure.ErrorMessage + return resp, ErrInitDoneFailed + } + + // not awaiting init status (channel closed) + return resp, nil +} + +// DrainInitFailure consumes (without any side effects) the init-failure notification left +// behind after resetting a timed-out init. The reset aborts the in-progress init, leaving +// awaitInitCompletion parked on the unbuffered initFailures channel with a ResetReceived +// failure; if the first invoke's Reserve()/awaitInitialized() consumed that instead, it would +// cache a generic placeholder error (Sandbox.Failure with an empty payload, see the +// ErrInitResetReceived handling in Invoke) that later masks the real outcome of the suppressed +// init re-run. Draining it here lets awaitInitialized() observe the closed channel and treat +// the init outcome as pending, so the suppressed init's own result is authoritative. +// The receive cannot block indefinitely: once the reset has completed, awaitInitCompletion is +// committed to either sending the failure or closing the channel (init succeeded just before +// the reset took effect). +func (s *Server) DrainInitFailure() { + <-s.getInitFailuresChan() +} + +// AwaitInitializedWithTimeout behaves like the upstream AwaitInitialized but (1) returns the +// structured init error on failure and (2) returns early if init does not complete within the +// timeout. On timeout it returns timedOut=true WITHOUT consuming the init-failures channel and +// without any side effects, so a subsequent invoke's Reserve()/awaitInitialized() can still +// observe the init outcome and trigger the suppressed init. The caller is expected to reset the +// in-progress init so that outcome becomes available. +func (s *Server) AwaitInitializedWithTimeout(timeout time.Duration) (resp InitCompletionResponse, timedOut bool, err error) { + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-timer.C: + return InitCompletionResponse{}, true, nil + case initFailure, awaitingInitStatus := <-s.getInitFailuresChan(): + resp, err = s.interpretInitFailure(initFailure, awaitingInitStatus) + if err != nil { + if releaseErr := s.Release(); releaseErr != nil { + log.Infof("Error releasing after init failure %s: %s", err, releaseErr) + } + s.setRuntimeState(runtimeInitFailed) + return resp, false, err + } + s.setRuntimeState(runtimeInitComplete) + return resp, false, nil + } +} diff --git a/internal/lsapi/types.go b/internal/lsapi/types.go index d38776d..9f4b134 100644 --- a/internal/lsapi/types.go +++ b/internal/lsapi/types.go @@ -15,8 +15,10 @@ type LogResponse struct { // ErrorResponse is sent to LocalStack when encountering an error. type ErrorResponse struct { - ErrorMessage string `json:"errorMessage"` - ErrorType string `json:"errorType,omitempty"` - RequestId string `json:"requestId,omitempty"` - StackTrace []string `json:"stackTrace,omitempty"` + ErrorMessage string `json:"errorMessage"` + ErrorType string `json:"errorType,omitempty"` + // RequestId uses *string so that an empty string "" is serialized (not omitted), + // while nil is omitted — init errors always set this field, fault events leave it nil. + RequestId *string `json:"requestId,omitempty"` + StackTrace []string `json:"stackTrace,omitempty"` }