Skip to content

Commit 0d0520b

Browse files
committed
added priotization intercepter
1 parent 07276c9 commit 0d0520b

File tree

3 files changed

+255
-1
lines changed

3 files changed

+255
-1
lines changed

index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ module.exports.interceptors = {
5252
dns: require('./lib/interceptor/dns'),
5353
cache: require('./lib/interceptor/cache'),
5454
decompress: require('./lib/interceptor/decompress'),
55-
deduplicate: require('./lib/interceptor/deduplicate')
55+
deduplicate: require('./lib/interceptor/deduplicate'),
56+
priority: require('./lib/interceptor/priority')
5657
}
5758

5859
module.exports.cacheStores = {

lib/interceptor/priority.js

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
'use strict'
2+
3+
const DecoratorHandler = require('../handler/decorator-handler')
4+
5+
class PriorityQueue {
6+
#queue = []
7+
#concurrency
8+
#running = 0
9+
10+
constructor (concurrency = 1) {
11+
this.#concurrency = concurrency
12+
}
13+
14+
acquire (callback, priority = 0) {
15+
this.#queue.push({ callback, priority })
16+
this.#queue.sort((a, b) => b.priority - a.priority)
17+
this.#dispatch()
18+
}
19+
20+
release () {
21+
this.#running--
22+
this.#dispatch()
23+
}
24+
25+
#dispatch () {
26+
while (this.#running < this.#concurrency && this.#queue.length > 0) {
27+
const entry = this.#queue.shift()
28+
this.#running++
29+
entry.callback()
30+
}
31+
}
32+
}
33+
34+
class PriorityHandler extends DecoratorHandler {
35+
#priorityQueue
36+
37+
constructor (handler, priorityQueue) {
38+
super(handler)
39+
this.#priorityQueue = priorityQueue
40+
}
41+
42+
onResponseEnd (controller, trailers) {
43+
this.#release()
44+
return super.onResponseEnd(controller, trailers)
45+
}
46+
47+
onResponseError (controller, err) {
48+
this.#release()
49+
return super.onResponseError(controller, err)
50+
}
51+
52+
#release () {
53+
if (this.#priorityQueue) {
54+
const priorityQueue = this.#priorityQueue
55+
this.#priorityQueue = null
56+
priorityQueue.release()
57+
}
58+
}
59+
}
60+
61+
function createPriorityInterceptor ({ concurrency } = { concurrency: 1 }) {
62+
return (dispatch) => {
63+
const queues = new Map()
64+
65+
return function priorityInterceptor (opts, handler) {
66+
if (opts.priority == null || !opts.origin) {
67+
return dispatch(opts, handler)
68+
}
69+
70+
let queue = queues.get(opts.origin)
71+
if (!queue) {
72+
queue = new PriorityQueue(concurrency)
73+
queues.set(opts.origin, queue)
74+
}
75+
76+
queue.acquire(() => {
77+
const priorityHandler = new PriorityHandler(handler, queue)
78+
try {
79+
dispatch(opts, priorityHandler)
80+
} catch (err) {
81+
priorityHandler.onResponseError(null, err)
82+
}
83+
}, opts.priority)
84+
}
85+
}
86+
}
87+
88+
module.exports = createPriorityInterceptor

test/interceptors/priority.js

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
'use strict'
2+
3+
const { createServer } = require('node:http')
4+
const { describe, test, after } = require('node:test')
5+
const { once } = require('node:events')
6+
const { strictEqual, deepStrictEqual } = require('node:assert')
7+
const { setTimeout: sleep } = require('node:timers/promises')
8+
const { Client, interceptors } = require('../../index')
9+
10+
describe('Priority Interceptor', () => {
11+
test('dispatches requests without priority normally', async () => {
12+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
13+
res.end('ok')
14+
}).listen(0)
15+
16+
await once(server, 'listening')
17+
18+
const client = new Client(`http://localhost:${server.address().port}`)
19+
.compose(interceptors.priority())
20+
21+
after(async () => {
22+
await client.close()
23+
server.close()
24+
await once(server, 'close')
25+
})
26+
27+
const res = await client.request({
28+
origin: `http://localhost:${server.address().port}`,
29+
method: 'GET',
30+
path: '/'
31+
})
32+
33+
const body = await res.body.text()
34+
strictEqual(res.statusCode, 200)
35+
strictEqual(body, 'ok')
36+
})
37+
38+
test('dispatches requests with priority', async () => {
39+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
40+
res.end('ok')
41+
}).listen(0)
42+
43+
await once(server, 'listening')
44+
45+
const client = new Client(`http://localhost:${server.address().port}`)
46+
.compose(interceptors.priority())
47+
48+
after(async () => {
49+
await client.close()
50+
server.close()
51+
await once(server, 'close')
52+
})
53+
54+
const res = await client.request({
55+
origin: `http://localhost:${server.address().port}`,
56+
method: 'GET',
57+
path: '/',
58+
priority: 1
59+
})
60+
61+
const body = await res.body.text()
62+
strictEqual(res.statusCode, 200)
63+
strictEqual(body, 'ok')
64+
})
65+
66+
test('higher priority requests are dispatched first', async () => {
67+
const order = []
68+
const server = createServer({ joinDuplicateHeaders: true }, async (req, res) => {
69+
await sleep(50)
70+
order.push(req.url)
71+
res.end(req.url)
72+
}).listen(0)
73+
74+
await once(server, 'listening')
75+
76+
const client = new Client(`http://localhost:${server.address().port}`)
77+
.compose(interceptors.priority({ concurrency: 1 }))
78+
79+
after(async () => {
80+
await client.close()
81+
server.close()
82+
await once(server, 'close')
83+
})
84+
85+
const origin = `http://localhost:${server.address().port}`
86+
87+
// Send requests with different priorities
88+
// With concurrency 1, the first request dispatches immediately.
89+
// The remaining requests queue by priority (higher = first).
90+
const results = await Promise.all([
91+
client.request({ origin, method: 'GET', path: '/first', priority: 1 }),
92+
client.request({ origin, method: 'GET', path: '/high', priority: 10 }),
93+
client.request({ origin, method: 'GET', path: '/low', priority: 0 }),
94+
client.request({ origin, method: 'GET', path: '/medium', priority: 5 })
95+
])
96+
97+
// Read all bodies to ensure completion
98+
for (const res of results) {
99+
await res.body.text()
100+
}
101+
102+
// The first request dispatched immediately, then high, medium, low
103+
deepStrictEqual(order, ['/first', '/high', '/medium', '/low'])
104+
})
105+
106+
test('requests without priority bypass the queue', async () => {
107+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
108+
res.end('ok')
109+
}).listen(0)
110+
111+
await once(server, 'listening')
112+
113+
const client = new Client(`http://localhost:${server.address().port}`)
114+
.compose(interceptors.priority())
115+
116+
after(async () => {
117+
await client.close()
118+
server.close()
119+
await once(server, 'close')
120+
})
121+
122+
const origin = `http://localhost:${server.address().port}`
123+
124+
// Request without priority should go through directly
125+
const res = await client.request({
126+
origin,
127+
method: 'GET',
128+
path: '/'
129+
})
130+
131+
const body = await res.body.text()
132+
strictEqual(res.statusCode, 200)
133+
strictEqual(body, 'ok')
134+
})
135+
136+
test('handles request errors gracefully', async () => {
137+
const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
138+
res.destroy()
139+
}).listen(0)
140+
141+
await once(server, 'listening')
142+
143+
const client = new Client(`http://localhost:${server.address().port}`)
144+
.compose(interceptors.priority())
145+
146+
after(async () => {
147+
await client.close()
148+
server.close()
149+
await once(server, 'close')
150+
})
151+
152+
const origin = `http://localhost:${server.address().port}`
153+
154+
await client.request({
155+
origin,
156+
method: 'GET',
157+
path: '/',
158+
priority: 1
159+
}).then(() => {
160+
throw new Error('should have thrown')
161+
}).catch((err) => {
162+
strictEqual(err.code, 'UND_ERR_SOCKET')
163+
})
164+
})
165+
})

0 commit comments

Comments
 (0)