Security: add flags for TCP connection limits and timeouts#7518
Open
SungJin1212 wants to merge 4 commits into
Open
Security: add flags for TCP connection limits and timeouts#7518SungJin1212 wants to merge 4 commits into
SungJin1212 wants to merge 4 commits into
Conversation
c875ced to
6c101cc
Compare
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
6c101cc to
ac76ac2
Compare
friedrichg
reviewed
May 14, 2026
Member
friedrichg
left a comment
There was a problem hiding this comment.
I think the fix is not working as expected though.
I think this triggers the behavior I am seeing
// TestTCPTransport_StreamHoldsSlotUntilClose asserts that
// -memberlist.max-concurrent-connections bounds the number of *live* inbound
// TCP connections: once a stream conn has been handed off to memberlist via
// StreamCh(), its slot stays held until the conn is actually closed.
//
// NOTE: this test is expected to FAIL against the current implementation,
// which releases the slot as soon as handleConnection returns (i.e. on
// handoff, not on close). It documents the intended semantics of the flag.
func TestTCPTransport_StreamHoldsSlotUntilClose(t *testing.T) {
logger := log.NewNopLogger()
const maxConns = 2
cfg := TCPTransportConfig{}
flagext.DefaultValues(&cfg)
cfg.BindAddrs = []string{"127.0.0.1"}
cfg.BindPort = 0
cfg.PacketReadTimeout = 5 * time.Second
cfg.MaxConcurrentConnections = maxConns
transport, err := NewTCPTransport(cfg, logger)
require.NoError(t, err)
defer transport.Shutdown() //nolint:errcheck
port := transport.GetAutoBindPort()
// Consumer goroutine: drains StreamCh and holds conns alive (never closes
// them) — simulating memberlist actively using streams.
var heldMu sync.Mutex
var held []net.Conn
done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case c := <-transport.StreamCh():
heldMu.Lock()
held = append(held, c)
heldMu.Unlock()
}
}
}()
defer func() {
close(done)
heldMu.Lock()
for _, c := range held {
c.Close() //nolint:errcheck
}
heldMu.Unlock()
}()
openStreamConn := func() net.Conn {
c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
require.NoError(t, err)
_, err = c.Write([]byte{byte(stream)})
require.NoError(t, err)
return c
}
// Fill the semaphore with maxConns live stream handoffs.
clients := make([]net.Conn, 0, maxConns+1)
defer func() {
for _, c := range clients {
c.Close() //nolint:errcheck
}
}()
for i := 0; i < maxConns; i++ {
clients = append(clients, openStreamConn())
}
// Wait until memberlist side has observed all maxConns streams.
require.Eventually(t, func() bool {
return testutil.ToFloat64(transport.incomingStreams) == float64(maxConns)
}, 2*time.Second, 10*time.Millisecond)
// One extra stream conn. If the slot is correctly held for the conn's
// real lifetime, the transport must reject this one because all slots
// are still occupied by the held streams above.
clients = append(clients, openStreamConn())
require.Eventually(t, func() bool {
return testutil.ToFloat64(transport.rejectedConnections) >= 1
}, 2*time.Second, 10*time.Millisecond,
"expected extra stream conn to be rejected while %d prior streams are held open, "+
"but the transport released the slot on handoff — flag does not bound live connections",
maxConns)
}
| }() | ||
|
|
||
| // Give goroutines a moment to acquire the semaphore slots. | ||
| time.Sleep(100 * time.Millisecond) |
Member
There was a problem hiding this comment.
will this be flaky in slow workers?.
How about something like
Suggested change
| time.Sleep(100 * time.Millisecond) | |
| require.Eventually(t, func() bool { | |
| return testutil.ToFloat64(transport.receivedPackets) == float64(maxConns) | |
| }, 2*time.Second, 10*time.Millisecond, "server never accepted %d connections", maxConns) |
| if t.connSemaphore != nil { | ||
| if !t.connSemaphore.TryAcquire(1) { | ||
| t.rejectedConnections.Inc() | ||
| level.Warn(t.logger).Log("msg", "max concurrent connections reached, closing connection", "remote", conn.RemoteAddr()) |
Member
There was a problem hiding this comment.
we have the metric. Maybe make this a debug? I can see in the event of a DOS, this will spam the logs
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
Member
Author
|
@friedrichg |
friedrichg
approved these changes
May 15, 2026
Member
friedrichg
left a comment
There was a problem hiding this comment.
Just one more nit. Pre-approved!
| // Reject oversized packets | ||
| if t.cfg.MaxPacketSize > 0 && int64(len(buf)) > t.cfg.MaxPacketSize { | ||
| t.receivedPacketsErrors.Inc() | ||
| level.Warn(t.logger).Log("msg", "packet too large, dropping", "size", len(buf), "max", t.cfg.MaxPacketSize, "remote", conn.RemoteAddr()) |
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR adds TCP connection flags to address the security issues.
-memberlist.packet-read-timeout: Read deadline applied to every inbound packet connection. Connections that do not complete within this window are closed.-memberlist.max-packet-size: Maximum size of a single inbound gossip packet. Enforced viaio.LimitReaderbeforeio.ReadAll, preventing heap exhaustion from oversized payloads. Applies to packet-type messages only.-memberlist.max-concurrent-connections: Maximum number of concurrent inbound TCP connections. Connections exceeding this limit are rejected immediately.Which issue(s) this PR fixes:
Fixes #
Checklist
CHANGELOG.mdupdated - the order of entries should be[CHANGE],[FEATURE],[ENHANCEMENT],[BUGFIX]docs/configuration/v1-guarantees.mdupdated if this PR introduces experimental flags