diff --git a/src/main/java/com/dashjoin/jsonata/Functions.java b/src/main/java/com/dashjoin/jsonata/Functions.java index c564805..6441d80 100644 --- a/src/main/java/com/dashjoin/jsonata/Functions.java +++ b/src/main/java/com/dashjoin/jsonata/Functions.java @@ -1561,9 +1561,10 @@ public static List hofFuncArgs(Object func, Object arg1, Object arg2, Object arg */ public static Object funcApply(Object func, List funcArgs) throws Throwable { Object res; - if (isLambda(func)) - res = Jsonata.current.get().apply(func, funcArgs, null, Jsonata.current.get().environment); - else + if (isLambda(func)) { + Jsonata.EvalContext ctx = Jsonata.evalContext.get(); + res = ctx.instance.apply(func, funcArgs, null, ctx.environment); + } else res = ((JFunction)func).call(null, funcArgs); return res; } @@ -2375,7 +2376,8 @@ public static Object functionEval(String expr, Object focus) { if(expr == null) { return null; } - Object input = Jsonata.current.get().input; // = this.input; + Jsonata.EvalContext ctx = Jsonata.evalContext.get(); + Object input = ctx.input; if(focus != null) { input = focus; // if the input is a JSON array, then wrap it in a singleton sequence so it gets treated as a single input @@ -2386,7 +2388,7 @@ public static Object functionEval(String expr, Object focus) { } Jsonata ast; - Jsonata.Frame env = Jsonata.current.get().environment; + Jsonata.Frame env = ctx.environment; try { ast = jsonata(expr); } catch(Throwable err) { @@ -2397,7 +2399,7 @@ public static Object functionEval(String expr, Object focus) { } Object result = null; try { - result = Jsonata.current.get().evaluate(ast.ast, input, env); + result = ctx.instance.evaluate(ast.ast, input, env); } catch(Throwable err) { // error evaluating the expression passed to $eval //populateMessage(err); @@ -2412,7 +2414,7 @@ public static Object functionEval(String expr, Object focus) { // return datetime.fromMillis(timestamp.getTime(), picture, timezone); // }, "")); public static String now(String picture, String timezone) { - long t = Jsonata.current.get().timestamp; + long t = Jsonata.evalContext.get().timestamp; return dateTimeFromMillis(t, picture, timezone); } @@ -2420,7 +2422,7 @@ public static String now(String picture, String timezone) { // return timestamp.getTime(); // }, "<:n>")); public static long millis() { - long t = Jsonata.current.get().timestamp; + long t = Jsonata.evalContext.get().timestamp; return t; } } diff --git a/src/main/java/com/dashjoin/jsonata/Jsonata.java b/src/main/java/com/dashjoin/jsonata/Jsonata.java index d60bf19..dd4a7f4 100644 --- a/src/main/java/com/dashjoin/jsonata/Jsonata.java +++ b/src/main/java/com/dashjoin/jsonata/Jsonata.java @@ -127,115 +127,122 @@ public void setEvaluateExitCallback(ExitCallback cb) { * @returns {*} Evaluated input data */ Object evaluate(Symbol expr, Object input, Frame environment) { - // Thread safety: - // Make sure each evaluate is executed on an instance per thread - return getPerThreadInstance()._evaluate(expr, input, environment); + return _evaluate(expr, input, environment); } Object _evaluate(Symbol expr, Object input, Frame environment) { Object result = null; - // Store the current input + environment - // This is required by Functions.functionEval for current $eval() input context - this.input = input; - this.environment = environment; + // Save and restore the evaluation context so that nested + // evaluations (e.g. $eval()) see the correct context. + // All mutable per-evaluation state lives on EvalContext, not on this instance. + EvalContext ctx = evalContext.get(); + Object savedInput = ctx.input; + Frame savedEnvironment = ctx.environment; + ctx.input = input; + ctx.environment = environment; - if (parser.dbg) System.out.println("eval expr="+expr+" type="+expr.type);//+" input="+input); + try { + if (parser.dbg) System.out.println("eval expr="+expr+" type="+expr.type);//+" input="+input); - var entryCallback = environment.lookup("__evaluate_entry"); - if(entryCallback!=null) { - ((EntryCallback)entryCallback).callback(expr, input, environment); - } - - if (expr.type!=null) - switch (expr.type) { - case "path": - result = /* await */ evaluatePath(expr, input, environment); - break; - case "binary": - result = /* await */ evaluateBinary(expr, input, environment); - break; - case "unary": - result = /* await */ evaluateUnary(expr, input, environment); - break; - case "name": - result = evaluateName(expr, input, environment); - if (parser.dbg) System.out.println("evalName "+result); - break; - case "string": - case "number": - case "value": - result = evaluateLiteral(expr); //, input, environment); - break; - case "wildcard": - result = evaluateWildcard(expr, input); //, environment); - break; - case "descendant": - result = evaluateDescendants(expr, input); //, environment); - break; - case "parent": - result = environment.lookup(expr.slot.label); - break; - case "condition": - result = /* await */ evaluateCondition(expr, input, environment); - break; - case "block": - result = /* await */ evaluateBlock(expr, input, environment); - break; - case "bind": - result = /* await */ evaluateBindExpression(expr, input, environment); - break; - case "regex": - result = evaluateRegex(expr); //, input, environment); - break; - case "function": - result = /* await */ evaluateFunction(expr, input, environment, Utils.NONE); - break; - case "variable": - result = evaluateVariable(expr, input, environment); - break; - case "lambda": - result = evaluateLambda(expr, input, environment); - break; - case "partial": - result = /* await */ evaluatePartialApplication(expr, input, environment); - break; - case "apply": - result = /* await */ evaluateApplyExpression(expr, input, environment); - break; - case "transform": - result = evaluateTransformExpression(expr, input, environment); - break; - } - - if (expr.predicate!=null) - for(var ii = 0; ii < expr.predicate.size(); ii++) { - result = /* await */ evaluateFilter(expr.predicate.get(ii).expr, result, environment); + var entryCallback = environment.lookup("__evaluate_entry"); + if(entryCallback!=null) { + ((EntryCallback)entryCallback).callback(expr, input, environment); } - - if (!expr.type.equals("path") && expr.group!=null) { - result = /* await */ evaluateGroupExpression(expr.group, result, environment); - } - - var exitCallback = environment.lookup("__evaluate_exit"); - if(exitCallback!=null) { - ((ExitCallback)exitCallback).callback(expr, input, environment, result); - } - - // mangle result (list of 1 element -> 1 element, empty list -> null) - if(result!=null && Utils.isSequence(result) && !((JList)result).tupleStream) { - JList _result = (JList)result; - if(expr.keepArray) { - _result.keepSingleton = true; + + if (expr.type!=null) + switch (expr.type) { + case "path": + result = /* await */ evaluatePath(expr, input, environment); + break; + case "binary": + result = /* await */ evaluateBinary(expr, input, environment); + break; + case "unary": + result = /* await */ evaluateUnary(expr, input, environment); + break; + case "name": + result = evaluateName(expr, input, environment); + if (parser.dbg) System.out.println("evalName "+result); + break; + case "string": + case "number": + case "value": + result = evaluateLiteral(expr); //, input, environment); + break; + case "wildcard": + result = evaluateWildcard(expr, input); //, environment); + break; + case "descendant": + result = evaluateDescendants(expr, input); //, environment); + break; + case "parent": + result = environment.lookup(expr.slot.label); + break; + case "condition": + result = /* await */ evaluateCondition(expr, input, environment); + break; + case "block": + result = /* await */ evaluateBlock(expr, input, environment); + break; + case "bind": + result = /* await */ evaluateBindExpression(expr, input, environment); + break; + case "regex": + result = evaluateRegex(expr); //, input, environment); + break; + case "function": + result = /* await */ evaluateFunction(expr, input, environment, Utils.NONE); + break; + case "variable": + result = evaluateVariable(expr, input, environment); + break; + case "lambda": + result = evaluateLambda(expr, input, environment); + break; + case "partial": + result = /* await */ evaluatePartialApplication(expr, input, environment); + break; + case "apply": + result = /* await */ evaluateApplyExpression(expr, input, environment); + break; + case "transform": + result = evaluateTransformExpression(expr, input, environment); + break; } - if(_result.isEmpty()) { - result = null; - } else if(_result.size() == 1) { - result = _result.keepSingleton ? _result : _result.get(0); + + if (expr.predicate!=null) + for(var ii = 0; ii < expr.predicate.size(); ii++) { + result = /* await */ evaluateFilter(expr.predicate.get(ii).expr, result, environment); + } + + if (!expr.type.equals("path") && expr.group!=null) { + result = /* await */ evaluateGroupExpression(expr.group, result, environment); } - } - return result; + var exitCallback = environment.lookup("__evaluate_exit"); + if(exitCallback!=null) { + ((ExitCallback)exitCallback).callback(expr, input, environment, result); + } + + // mangle result (list of 1 element -> 1 element, empty list -> null) + if(result!=null && Utils.isSequence(result) && !((JList)result).tupleStream) { + JList _result = (JList)result; + if(expr.keepArray) { + _result.keepSingleton = true; + } + if(_result.isEmpty()) { + result = null; + } else if(_result.size() == 1) { + result = _result.keepSingleton ? _result : _result.get(0); + } + } + + return result; + } finally { + ctx.input = savedInput; + ctx.environment = savedEnvironment; + } } /** @@ -1549,29 +1556,34 @@ boolean isFunctionLike(Object o) { return Utils.isFunction(o) || Functions.isLambda(o) || (o instanceof Pattern); } - final static ThreadLocal current = new ThreadLocal<>(); - /** - * Returns a per thread instance of this parsed expression. - * - * @return + * Mutable evaluation context, stored on a static ThreadLocal. + * Holds all per-evaluation state that was previously stored as mutable fields + * on the Jsonata instance. This allows the Jsonata instance itself to remain + * immutable after construction, making it freely shareable across threads + * with no cloning needed. */ - Jsonata getPerThreadInstance() { - Jsonata threadInst = current.get(); - // Fast path - if (threadInst!=null) - return threadInst; - - synchronized(this) { - threadInst = current.get(); - if (threadInst==null) { - threadInst = new Jsonata(this); - current.set(threadInst); - } - return threadInst; + static final class EvalContext { + Object input; + Frame environment; + long timestamp; + final Jsonata instance; + + EvalContext(Jsonata instance) { + this.instance = instance; } } + /** + * Thread-local evaluation context. Set at the entry point of evaluate() + * and restored after evaluation completes. Read by Functions.java for + * $eval(), $now(), $millis(), and funcApply(). + * + * This is the ONLY ThreadLocal needed. No per-instance ThreadLocals, + * no cloning, no mutable state on the Jsonata instance during evaluation. + */ + static final ThreadLocal evalContext = new ThreadLocal<>(); + /** * Evaluate Object against input data * @param {Object} expr - JSONata expression @@ -1582,8 +1594,6 @@ Jsonata getPerThreadInstance() { /* async */ Object evaluateFunction(Symbol expr, Object input, Frame environment, Object applytoContext) { Object result = null; - // this.current is set by getPerThreadInstance() at this point - // create the procedure // can"t assume that expr.procedure is a lambda type directly // could be an expression that evaluates to a Object (e.g. variable reference, parens expr etc. @@ -2474,8 +2484,6 @@ Exception populateMessage(Exception err) { List errors; Frame environment; Symbol ast; - long timestamp; - Object input; static { staticFrame = new Frame(null); @@ -2508,8 +2516,6 @@ public static Jsonata jsonata(String expression) { } environment = createFrame(staticFrame); - timestamp = System.currentTimeMillis(); // will be overridden on each call to evalute() - // Note: now and millis are implemented in Functions // environment.bind("now", defineFunction(function(picture, timezone) { // return datetime.fromMillis(timestamp.getTime(), picture, timezone); @@ -2525,21 +2531,8 @@ public static Jsonata jsonata(String expression) { // jsonata.RegexEngine = RegExp; // } - // Set instance for this thread - current.set(this); } - /** - * Creates a clone of the given Jsonata instance. - * Package-private copy constructor used to create per thread instances. - * - * @param other - */ - Jsonata(Jsonata other) { - this.ast = other.ast; - this.environment = other.environment; - this.timestamp = other.timestamp; - } /** * Flag: validate input objects to comply with JSON types @@ -2581,13 +2574,10 @@ public Object evaluate(Object input, Frame bindings) { // FIXME:, callback) { } else { exec_env = environment; } - // put the input document into the environment as the root object exec_env.bind("$", input); - // capture the timestamp and put it in the execution environment - // the $now() and $millis() functions will return this value - whenever it is called - timestamp = System.currentTimeMillis(); - //exec_env.timestamp = timestamp; + // Timestamp for $now() and $millis() — captured once per evaluation + long ts = System.currentTimeMillis(); // if the input is a JSON array, then wrap it in a singleton sequence so it gets treated as a single input if((input instanceof List) && !Utils.isSequence(input)) { @@ -2598,18 +2588,24 @@ public Object evaluate(Object input, Frame bindings) { // FIXME:, callback) { if (validateInput) Functions.validateInput(input); + EvalContext prev = evalContext.get(); + EvalContext ctx = new EvalContext(this); + ctx.input = input; + ctx.environment = exec_env; + ctx.timestamp = ts; + evalContext.set(ctx); + Object it; try { it = /* await */ evaluate(ast, input, exec_env); - // if (typeof callback === "function") { - // callback(null, it); - // } it = Utils.convertNulls(it); return it; } catch (Exception err) { // insert error message into structure populateMessage(err); // possible side-effects on `err` throw err; + } finally { + evalContext.set(prev); } } diff --git a/src/test/java/com/dashjoin/jsonata/ThreadSafetyTest.java b/src/test/java/com/dashjoin/jsonata/ThreadSafetyTest.java new file mode 100644 index 0000000..bec51de --- /dev/null +++ b/src/test/java/com/dashjoin/jsonata/ThreadSafetyTest.java @@ -0,0 +1,296 @@ +package com.dashjoin.jsonata; + +import static com.dashjoin.jsonata.Jsonata.jsonata; +import static org.junit.jupiter.api.Assertions.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +/** + * Thread safety and instance isolation tests for the Jsonata ThreadLocal fix. + * Covers GitHub issues #90 and #93, plus high-throughput concurrent scenarios. + */ +public class ThreadSafetyTest { + + @Test + public void testTwoInstancesSameThread_bindingsDontLeak() { + Jsonata exprA = jsonata("$test"); + Jsonata.Frame env = exprA.createFrame(); + env.bind("test", "value_from_A"); + + // Constructing exprB must NOT corrupt exprA's evaluation context + Jsonata exprB = jsonata("$test"); + + Object resultA = exprA.evaluate("", env); + Object resultB = exprB.evaluate(""); + + assertEquals("value_from_A", resultA, "exprA should see its own binding"); + assertNull(resultB, "exprB should NOT see exprA's binding"); + } + + @Test + public void testTwoInstancesSameThread_differentExpressions() { + Jsonata exprA = jsonata("a"); + Jsonata exprB = jsonata("b"); + + assertEquals(1, exprA.evaluate(Map.of("a", 1, "b", 99))); + assertEquals(2, exprB.evaluate(Map.of("a", 99, "b", 2))); + // Re-evaluate exprA to confirm it still works correctly + assertEquals(3, exprA.evaluate(Map.of("a", 3, "b", 99))); + } + + @Test + public void testManyInstancesSameThread_interleaved() { + Jsonata add = jsonata("a + b"); + Jsonata mul = jsonata("a * b"); + Jsonata evalExpr = jsonata("$eval('a')"); + + for (int i = 1; i <= 500; i++) { + assertEquals(i + 1, add.evaluate(Map.of("a", i, "b", 1)), + "add failed at iteration " + i); + assertEquals(i * 2, mul.evaluate(Map.of("a", i, "b", 2)), + "mul failed at iteration " + i); + assertEquals(i, evalExpr.evaluate(Map.of("a", i)), + "eval failed at iteration " + i); + } + } + + @Test + public void testEvalDeepContext() { + // This is the exact reproduction from issue #90 + Jsonata expr = jsonata("$eval($.funcs.func)"); + Object input = Map.of( + "funcs", Map.of("func", "$.a + $.b"), + "a", 3, + "b", 4 + ); + assertEquals(7, expr.evaluate(input)); + } + + @Test + public void testEvalWithSimplePath() { + Jsonata expr = jsonata("$eval('a')"); + assertEquals(42, expr.evaluate(Map.of("a", 42))); + } + + @Test + public void testEvalWithNestedPath() { + Jsonata expr = jsonata("$eval('a.b.c')"); + assertEquals(99, expr.evaluate( + Map.of("a", Map.of("b", Map.of("c", 99))))); + } + + @Test + public void testNestedEval() { + Jsonata expr = jsonata("$eval(\"$eval('a')\")"); + assertEquals(7, expr.evaluate(Map.of("a", 7))); + } + + @Test + public void testEvalWithinPathStepUsesCurrentItemContext() { + Jsonata expr = jsonata("items.$eval('a')"); + Object input = Map.of( + "items", List.of( + Map.of("a", 1), + Map.of("a", 2) + ) + ); + assertEquals(List.of(1, 2), expr.evaluate(input)); + } + + @Test + public void testEvalWithinFilterUsesCurrentItemContext() { + Jsonata expr = jsonata("items[$eval('a') = 2].a"); + Object input = Map.of( + "items", List.of( + Map.of("a", 1), + Map.of("a", 2) + ) + ); + assertEquals(2, expr.evaluate(input)); + } + + @Test + public void testCachedInstanceConcurrentThreads() throws Exception { + int threads = 10; + int itersPerThread = 1000; + Jsonata expr = jsonata("a + b"); + + CountDownLatch startGate = new CountDownLatch(1); + ExecutorService pool = Executors.newFixedThreadPool(threads); + AtomicInteger errorCount = new AtomicInteger(0); + List> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + futures.add(pool.submit(() -> { + try { + startGate.await(); + } catch (InterruptedException e) { return; } + + for (int i = 0; i < itersPerThread; i++) { + Object result = expr.evaluate(Map.of("a", threadId, "b", 1)); + if (!Integer.valueOf(threadId + 1).equals(result)) { + errorCount.incrementAndGet(); + } + } + })); + } + + startGate.countDown(); // release all threads simultaneously + for (Future f : futures) f.get(30, TimeUnit.SECONDS); + pool.shutdown(); + + assertEquals(0, errorCount.get(), + "Concurrent evaluation of cached instance produced wrong results"); + } + + @Test + public void testHighThroughputWithEval() throws Exception { + int threads = 16; + int itersPerThread = 2000; + Jsonata expr = jsonata("$eval('a') + b"); + + CountDownLatch startGate = new CountDownLatch(1); + ExecutorService pool = Executors.newFixedThreadPool(threads); + AtomicInteger errorCount = new AtomicInteger(0); + List sampleErrors = Collections.synchronizedList(new ArrayList<>()); + List> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + futures.add(pool.submit(() -> { + try { + startGate.await(); + } catch (InterruptedException e) { return; } + + int expected = threadId * 10 + threadId; // threadId * 11 + for (int i = 0; i < itersPerThread; i++) { + try { + Object result = expr.evaluate(Map.of("a", threadId * 10, "b", threadId)); + if (!Integer.valueOf(expected).equals(result)) { + errorCount.incrementAndGet(); + if (sampleErrors.size() < 5) { + sampleErrors.add("thread-" + threadId + " iter-" + i + + ": expected " + expected + " got " + result); + } + } + } catch (Exception e) { + errorCount.incrementAndGet(); + if (sampleErrors.size() < 5) { + sampleErrors.add("thread-" + threadId + " iter-" + i + + ": " + e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + } + })); + } + + startGate.countDown(); + for (Future f : futures) f.get(60, TimeUnit.SECONDS); + pool.shutdown(); + + assertEquals(0, errorCount.get(), + "High-throughput eval errors: " + sampleErrors); + } + + @Test + public void testCustomFunctionMultiThread() throws Exception { + int threads = 10; + int itersPerThread = 500; + Jsonata expr = jsonata("$double(a)"); + expr.registerFunction("double", (Integer x) -> x * 2); + + CountDownLatch startGate = new CountDownLatch(1); + ExecutorService pool = Executors.newFixedThreadPool(threads); + AtomicInteger errorCount = new AtomicInteger(0); + List> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + final int threadId = t + 1; // 1-based to avoid $double(0) + futures.add(pool.submit(() -> { + try { + startGate.await(); + } catch (InterruptedException e) { return; } + + int expected = threadId * 2; + for (int i = 0; i < itersPerThread; i++) { + Object result = expr.evaluate(Map.of("a", threadId)); + if (!Integer.valueOf(expected).equals(result)) { + errorCount.incrementAndGet(); + } + } + })); + } + + startGate.countDown(); + for (Future f : futures) f.get(30, TimeUnit.SECONDS); + pool.shutdown(); + + assertEquals(0, errorCount.get(), + "Custom function multi-thread evaluation produced wrong results"); + } + + @Test + public void testCachedInstanceWithBindingsMultiThread() throws Exception { + int threads = 8; + int itersPerThread = 1000; + Jsonata expr = jsonata("$myVar + a"); + + CountDownLatch startGate = new CountDownLatch(1); + ExecutorService pool = Executors.newFixedThreadPool(threads); + AtomicInteger errorCount = new AtomicInteger(0); + List> futures = new ArrayList<>(); + + for (int t = 0; t < threads; t++) { + final int threadId = t; + Jsonata.Frame frame = expr.createFrame(); + frame.bind("myVar", threadId * 100); + + futures.add(pool.submit(() -> { + try { + startGate.await(); + } catch (InterruptedException e) { return; } + + int expected = threadId * 100 + threadId; + for (int i = 0; i < itersPerThread; i++) { + Object result = expr.evaluate(Map.of("a", threadId), frame); + if (!Integer.valueOf(expected).equals(result)) { + errorCount.incrementAndGet(); + } + } + })); + } + + startGate.countDown(); + for (Future f : futures) f.get(30, TimeUnit.SECONDS); + pool.shutdown(); + + assertEquals(0, errorCount.get(), + "Cached instance with per-thread bindings produced wrong results"); + } + + @Test + public void testNowWithCachedInstance() throws Exception { + Jsonata expr = jsonata("$now()"); + Object r1 = expr.evaluate(null); + Thread.sleep(1100); // $now() has second-level precision + Object r2 = expr.evaluate(null); + assertNotNull(r1); + assertNotNull(r2); + assertNotEquals(r1, r2, "$now() should return different values on different calls"); + } + + @Test + public void testMillisWithCachedInstance() throws Exception { + Jsonata expr = jsonata("$millis()"); + long r1 = ((Number) expr.evaluate(null)).longValue(); + Thread.sleep(10); + long r2 = ((Number) expr.evaluate(null)).longValue(); + assertTrue(r2 > r1, "$millis() should advance across separate evaluations"); + } + +}