Skip to content

Commit cdb3e14

Browse files
committed
improvement(mcp): swap PG advisory lock for Redis mutex on OAuth refresh
withMcpOauthRefreshLock now uses `coalesceLocally` + Redis acquireLock/ releaseLock with polling — the same primitives backing regular OAuth refresh (`app/api/auth/oauth/utils.ts`). No more pinning a Postgres connection for the duration of the SDK's OAuth HTTP refresh. - In-process dedup: shared promise via `coalesceLocally`. - Cross-process: Redis SET NX EX mutex; followers poll until the leader releases (30s max wait, 100ms poll), then acquire and run fn(). - Each MCP caller still constructs its own client (semantics preserved). - Falls open when Redis is unavailable — same behavior as the regular OAuth refresh code path.
1 parent d01506b commit cdb3e14

1 file changed

Lines changed: 54 additions & 31 deletions

File tree

apps/sim/lib/mcp/oauth/storage.ts

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ import { db } from '@sim/db'
77
import { mcpServerOauth } from '@sim/db/schema'
88
import { createLogger } from '@sim/logger'
99
import { toError } from '@sim/utils/errors'
10-
import { generateId } from '@sim/utils/id'
11-
import { and, eq, gt, sql } from 'drizzle-orm'
10+
import { sleep } from '@sim/utils/helpers'
11+
import { generateId, generateShortId } from '@sim/utils/id'
12+
import { and, eq, gt } from 'drizzle-orm'
13+
import { coalesceLocally } from '@/lib/concurrency/singleflight'
14+
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
1215
import { decryptSecret, encryptSecret } from '@/lib/core/security/encryption'
1316

1417
const logger = createLogger('McpOauthStorage')
@@ -232,38 +235,58 @@ export async function clearState(rowId: string): Promise<void> {
232235
* refreshes against the same row would race and one would receive
233236
* `invalid_grant`, wiping credentials.
234237
*
235-
* Two-tier locking:
236-
* 1) In-process Promise chain — cheap, avoids DB roundtrips when the same
237-
* Node process holds concurrent callers.
238-
* 2) Postgres advisory transaction lock — blocks across processes; released
239-
* automatically when the transaction ends.
238+
* Two-tier coordination, matching the regular OAuth refresh pattern
239+
* (`app/api/auth/oauth/utils.ts`):
240+
* 1) `coalesceLocally` — in-process dedup; concurrent same-process callers
241+
* share a single inflight promise.
242+
* 2) Redis distributed lock (`acquireLock` / `releaseLock`) — cross-process
243+
* mutex. Followers poll until the leader releases, then acquire and run
244+
* their own `fn()` (each MCP caller needs its own client connection).
240245
*
241-
* Tradeoff: the connection is held for the duration of `fn()`, which includes
242-
* the SDK's OAuth HTTP refresh. Session-level locks (`pg_advisory_lock`) would
243-
* release the connection earlier, but they don't survive PgBouncer transaction
244-
* pooling — they're scoped to the underlying physical connection, which can be
245-
* swapped between statements. `pg_advisory_xact_lock` is the correct choice
246-
* here. If pool pressure becomes a real concern at scale, swap this for a
247-
* Redis-based distributed lock (Redlock) that doesn't pin a DB connection.
246+
* Falls open if Redis is unavailable — `acquireLock` no-ops, all callers run
247+
* `fn()` uncoordinated. The in-process layer still serializes within a
248+
* process; cross-process races become possible but rare in practice.
248249
*/
249-
const refreshLocks = new Map<string, Promise<unknown>>()
250+
const REFRESH_LOCK_TTL_SEC = 30
251+
const REFRESH_POLL_INTERVAL_MS = 100
252+
const REFRESH_MAX_WAIT_MS = 30_000
250253

251254
export async function withMcpOauthRefreshLock<T>(rowId: string, fn: () => Promise<T>): Promise<T> {
252-
const prev = refreshLocks.get(rowId) ?? Promise.resolve()
253-
const next = prev
254-
.catch(() => undefined)
255-
.then(() =>
256-
db.transaction(async (tx) => {
257-
await tx.execute(
258-
sql`SELECT pg_advisory_xact_lock(hashtextextended(${`mcp_oauth_refresh:${rowId}`}, 0))`
259-
)
255+
const lockKey = `mcp:oauth:refresh:${rowId}`
256+
return coalesceLocally(lockKey, async () => {
257+
const ownerToken = generateShortId()
258+
const deadline = Date.now() + REFRESH_MAX_WAIT_MS
259+
260+
while (true) {
261+
let acquired = false
262+
try {
263+
acquired = await acquireLock(lockKey, ownerToken, REFRESH_LOCK_TTL_SEC)
264+
} catch (error) {
265+
logger.warn('Redis unavailable, running OAuth flow uncoordinated', {
266+
rowId,
267+
error: toError(error).message,
268+
})
260269
return fn()
261-
})
262-
)
263-
refreshLocks.set(rowId, next)
264-
const cleanup = () => {
265-
if (refreshLocks.get(rowId) === next) refreshLocks.delete(rowId)
266-
}
267-
next.then(cleanup, cleanup)
268-
return next
270+
}
271+
272+
if (acquired) {
273+
try {
274+
return await fn()
275+
} finally {
276+
await releaseLock(lockKey, ownerToken).catch((error) => {
277+
logger.warn('Refresh lock release failed (will expire via TTL)', {
278+
rowId,
279+
error: toError(error).message,
280+
})
281+
})
282+
}
283+
}
284+
285+
if (Date.now() >= deadline) {
286+
logger.warn('Refresh lock wait timed out, running uncoordinated', { rowId })
287+
return fn()
288+
}
289+
await sleep(REFRESH_POLL_INTERVAL_MS)
290+
}
291+
})
269292
}

0 commit comments

Comments
 (0)