Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand Down Expand Up @@ -146,12 +147,12 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
private volatile boolean peerNoRfc7540Priorities;


private static final long STREAM_TIMEOUT_GRANULARITY_MILLIS = 1000;
private long lastStreamTimeoutCheckMillis;
private static final long STREAM_TIMEOUT_GRANULARITY_NANOS = TimeUnit.SECONDS.toNanos(1);
private long lastStreamTimeoutCheckNanos;

private static final long VALIDATE_AFTER_INACTIVITY_GRANULARITY_MILLIS = 1000;
private static final long VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS = TimeUnit.SECONDS.toNanos(1);
private final Timeout validateAfterInactivity;
private volatile long lastActivityTime;
private volatile long lastActivityNanos;

AbstractH2StreamMultiplexer(
final ProtocolIOSession ioSession,
Expand Down Expand Up @@ -199,7 +200,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }

this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
this.streamListener = streamListener;
this.lastActivityTime = System.currentTimeMillis();
this.lastActivityNanos = System.nanoTime();
this.validateAfterInactivity = validateAfterInactivity;
}

Expand Down Expand Up @@ -539,8 +540,8 @@ public final void onOutput() throws HttpException, IOException {

if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
final long t = TimeValue.isPositive(validateAfterInactivity) ?
Math.max(validateAfterInactivity.toMilliseconds(), VALIDATE_AFTER_INACTIVITY_GRANULARITY_MILLIS) : 0;
final boolean hasBeenIdleTooLong = t > 0 && System.currentTimeMillis() - lastActivityTime > t;
Math.max(validateAfterInactivity.toNanoseconds(), VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS) : 0;
final boolean hasBeenIdleTooLong = t > 0 && System.nanoTime() - lastActivityNanos > t;
if (hasBeenIdleTooLong && ioSession.hasCommands() && pingHandlers.isEmpty()) {
final Timeout socketTimeout = ioSession.getSocketTimeout();
ioSession.setSocketTimeout(Timeout.ofSeconds(5));
Expand Down Expand Up @@ -1510,7 +1511,7 @@ class H2StreamChannelImpl implements H2StreamChannel {
private final AtomicInteger outputWindow;

private volatile boolean localClosed;
private volatile long localResetTime;
private volatile long localResetNanos = Long.MIN_VALUE;

H2StreamChannelImpl(final int id, final int initialInputWindowSize, final int initialOutputWindowSize) {
this.id = id;
Expand Down Expand Up @@ -1676,7 +1677,7 @@ public boolean localReset(final int code) throws IOException {
return false;
}
localClosed = true;
localResetTime = System.currentTimeMillis();
localResetNanos = System.nanoTime();

final RawFrame resetStream = frameFactory.createResetStream(id, code);
commitFrameInternal(resetStream);
Expand All @@ -1687,8 +1688,8 @@ public boolean localReset(final int code) throws IOException {
}

@Override
public long getLocalResetTime() {
return localResetTime;
public long getLocalResetNanos() {
return localResetNanos;
}

@Override
Expand Down Expand Up @@ -1743,14 +1744,14 @@ private void checkStreamTimeouts(final long nowNanos) throws IOException {
}

private void validateStreamTimeouts() throws IOException {
final long nowMillis = System.currentTimeMillis();
if ((nowMillis - lastStreamTimeoutCheckMillis) >= STREAM_TIMEOUT_GRANULARITY_MILLIS) {
lastStreamTimeoutCheckMillis = nowMillis;
checkStreamTimeouts(System.nanoTime());
final long nowNanos = System.nanoTime();
if ((nowNanos - lastStreamTimeoutCheckNanos) >= STREAM_TIMEOUT_GRANULARITY_NANOS) {
lastStreamTimeoutCheckNanos = nowNanos;
checkStreamTimeouts(nowNanos);
}
}

private void updateLastActivity() {
this.lastActivityTime = System.currentTimeMillis();
this.lastActivityNanos = System.nanoTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.nio.channels.CancelledKeyException;
import java.nio.charset.CharacterCodingException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -50,7 +51,7 @@

class H2Stream implements StreamControl {

private static final long LINGER_TIME = 1000; // 1 second
private static final long LINGER_TIME_NANOS = TimeUnit.SECONDS.toNanos(1);

private final H2StreamChannel channel;
private final H2StreamHandler handler;
Expand Down Expand Up @@ -124,8 +125,8 @@ AtomicInteger getInputWindow() {
}

private boolean isPastLingerDeadline() {
final long localResetTime = channel.getLocalResetTime();
return localResetTime > 0 && localResetTime + LINGER_TIME < System.currentTimeMillis();
final long localResetNanos = channel.getLocalResetNanos();
return localResetNanos != Long.MIN_VALUE && System.nanoTime() - localResetNanos > LINGER_TIME_NANOS;
}

boolean isClosedPastLingerDeadline() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ default void terminate() {
}
}

long getLocalResetTime();
long getLocalResetNanos();

default boolean isLocalReset() {
return getLocalResetTime() > 0;
return getLocalResetNanos() != Long.MIN_VALUE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ protected void validateSession(
final TimeValue timeValue = validateAfterInactivity;
if (TimeValue.isNonNegative(timeValue)) {
final long lastAccessTime = Math.min(ioSession.getLastReadTime(), ioSession.getLastWriteTime());
final long deadline = lastAccessTime + timeValue.toMilliseconds();
if (deadline <= System.currentTimeMillis()) {
final long deadline = lastAccessTime + timeValue.toNanoseconds();
if (deadline <= System.nanoTime()) {
ioSession.enqueue(new StaleCheckCommand(callback::execute), Command.Priority.NORMAL);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1706,23 +1706,23 @@ void testKeepAliveDisabledNeverEmitsPing() throws Exception {
Assertions.assertTrue(frames.stream().noneMatch(FrameStub::isPing), "Disabled policy must never emit PING");
}

private static long getValidateAfterInactivityGranularityMillis() throws Exception {
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("VALIDATE_AFTER_INACTIVITY_GRANULARITY_MILLIS");
private static long getValidateAfterInactivityGranularityNanos() throws Exception {
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("VALIDATE_AFTER_INACTIVITY_GRANULARITY_NANOS");
field.setAccessible(true);
return field.getLong(null);
}

private static void setLastActivityTime(final AbstractH2StreamMultiplexer mux, final long millis) throws Exception {
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("lastActivityTime");
private static void setLastActivityNanos(final AbstractH2StreamMultiplexer mux, final long nanos) throws Exception {
final Field field = AbstractH2StreamMultiplexer.class.getDeclaredField("lastActivityNanos");
field.setAccessible(true);
field.setLong(mux, millis);
field.setLong(mux, nanos);
}

private static void makeMuxIdle(final AbstractH2StreamMultiplexer mux, final Timeout validateAfterInactivity) throws Exception {
final long granularityMillis = getValidateAfterInactivityGranularityMillis();
final long configuredMillis = validateAfterInactivity != null ? validateAfterInactivity.toMilliseconds() : 0;
final long effectiveMillis = configuredMillis > 0 ? Math.max(configuredMillis, granularityMillis) : 0;
setLastActivityTime(mux, System.currentTimeMillis() - effectiveMillis - 10);
final long granularityNanos = getValidateAfterInactivityGranularityNanos();
final long configuredNanos = validateAfterInactivity != null ? validateAfterInactivity.toNanoseconds() : 0;
final long effectiveNanos = configuredNanos > 0 ? Math.max(configuredNanos, granularityNanos) : 0;
setLastActivityNanos(mux, System.nanoTime() - effectiveNanos - TimeUnit.MILLISECONDS.toNanos(10));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hc.core5.concurrent.FutureCallback;
Expand Down Expand Up @@ -97,8 +98,9 @@ void testValidateSessionEnqueuesStaleCheck() {

final IOSession session = Mockito.mock(IOSession.class);
Mockito.when(session.isOpen()).thenReturn(true);
Mockito.when(session.getLastReadTime()).thenReturn(0L);
Mockito.when(session.getLastWriteTime()).thenReturn(0L);
final long farPastNanos = System.nanoTime() - TimeUnit.DAYS.toNanos(1);
Mockito.when(session.getLastReadTime()).thenReturn(farPastNanos);
Mockito.when(session.getLastWriteTime()).thenReturn(farPastNanos);

@SuppressWarnings("unchecked")
final Callback<Boolean> callback = (Callback<Boolean>) Mockito.mock(Callback.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ private Results doExecute(final HttpAsyncRequester requester, final Stats stats)

final long deadline = config.getTimeLimit() != null ? config.getTimeLimit().toMilliseconds() : Long.MAX_VALUE;

final long startTime = System.currentTimeMillis();
final long startNanos = System.nanoTime();

for (int i = 0; i < workers.length; i++) {
workers[i].execute();
Expand All @@ -485,7 +485,7 @@ private Results doExecute(final HttpAsyncRequester requester, final Stats stats)
System.out.println("...done");
}

final long endTime = System.currentTimeMillis();
final long endNanos = System.nanoTime();

for (int i = 0; i < workers.length; i++) {
workers[i].releaseResources();
Expand All @@ -499,7 +499,7 @@ private Results doExecute(final HttpAsyncRequester requester, final Stats stats)
requestUri.toASCIIString(),
stats.getContentLength(),
config.getConcurrencyLevel(),
endTime - startTime,
TimeUnit.NANOSECONDS.toMillis(endNanos - startNanos),
stats.getSuccessCount(),
stats.getFailureCount(),
stats.getKeepAliveCount(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.core5.net.InetAddressUtils;
Expand Down Expand Up @@ -254,7 +255,7 @@ public void start() throws IOException {
}

public void shutdown(final TimeValue timeout) throws InterruptedException {
final long waitUntil = System.currentTimeMillis() + timeout.toMilliseconds();
final long deadlineNanos = System.nanoTime() + timeout.toNanoseconds();
Thread t = null;
lock.lock();
try {
Expand All @@ -272,18 +273,18 @@ public void shutdown(final TimeValue timeout) throws InterruptedException {
handler.shutdown();
}
while (!this.handlers.isEmpty()) {
final long waitTime = waitUntil - System.currentTimeMillis();
if (waitTime > 0) {
wait(waitTime);
final long remainingNanos = deadlineNanos - System.nanoTime();
if (remainingNanos > 0) {
wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
}
}
} finally {
lock.unlock();
}
if (t != null) {
final long waitTime = waitUntil - System.currentTimeMillis();
if (waitTime > 0) {
t.join(waitTime);
final long remainingNanos = deadlineNanos - System.nanoTime();
if (remainingNanos > 0) {
t.join(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,26 @@ public T get(final long timeout, final TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Args.notNull(unit, "Time unit");
final long msecs = unit.toMillis(timeout);
final long startTime = (msecs <= 0) ? 0 : System.currentTimeMillis();
long waitTime = msecs;
final long waitNanos = unit.toNanos(timeout);
final long startNanos = (waitNanos <= 0) ? 0 : System.nanoTime();
long remainingNanos = waitNanos;
try {
lock.lock();
if (this.completed) {
return getResult();
} else if (waitTime <= 0) {
throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime));
} else if (remainingNanos <= 0) {
throw TimeoutValueException.fromMilliseconds(msecs, msecs);
} else {
for (; ; ) {
condition.await(waitTime, TimeUnit.MILLISECONDS);
condition.await(remainingNanos, TimeUnit.NANOSECONDS);
if (this.completed) {
return getResult();
}
waitTime = msecs - (System.currentTimeMillis() - startTime);
if (waitTime <= 0) {
throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime));
remainingNanos = waitNanos - (System.nanoTime() - startNanos);
if (remainingNanos <= 0) {
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(
System.nanoTime() - startNanos);
throw TimeoutValueException.fromMilliseconds(msecs, elapsedMillis);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public final void enumAvailable(final Callback<IOSession> callback) {
}

public final void closeIdle(final TimeValue idleTime) {
final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
final long deadline = System.nanoTime() - (TimeValue.isPositive(idleTime) ? idleTime.toNanoseconds() : 0);
for (final PoolEntry poolEntry: sessionPool.values()) {
if (poolEntry.session != null) {
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ public void execute() {
@Override
public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException {
Args.notNull(waitTime, "Wait time");
final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds();
long remaining = waitTime.toMilliseconds();
final long deadlineNanos = System.nanoTime() + waitTime.toNanoseconds();
long remainingNanos = waitTime.toNanoseconds();
lock.lock();
try {
while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
condition.await(remaining, TimeUnit.MILLISECONDS);
remaining = deadline - System.currentTimeMillis();
if (remaining <= 0) {
condition.await(remainingNanos, TimeUnit.NANOSECONDS);
remainingNanos = deadlineNanos - System.nanoTime();
if (remainingNanos <= 0) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,25 +206,26 @@ enum Status {
void setSocketTimeout(Timeout timeout);

/**
* Returns timestamp of the last read event.
* Returns monotonic nanosecond timestamp of the last read event.
*
* @return timestamp.
* @return nanosecond timestamp obtained from {@link System#nanoTime()}.
*/
long getLastReadTime();

/**
* Returns timestamp of the last write event.
* Returns monotonic nanosecond timestamp of the last write event.
*
* @return timestamp.
* @return nanosecond timestamp obtained from {@link System#nanoTime()}.
*/
long getLastWriteTime();

/**
* Returns timestamp of the last I/O event including socket timeout reset.
* Returns monotonic nanosecond timestamp of the last I/O event including
* socket timeout reset.
*
* @see #getSocketTimeout()
*
* @return timestamp.
* @return nanosecond timestamp obtained from {@link System#nanoTime()}.
*/
long getLastEventTime();

Expand Down
Loading
Loading