Skip to content

Commit

Permalink
WIP: feature: support standalone seamless ha
Browse files Browse the repository at this point in the history
Signed-off-by: bodong.ybd <bodong.ybd@alibaba-inc.com>
  • Loading branch information
yangbodong22011 committed May 8, 2024
1 parent 12974bc commit b773f91
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/main/java/io/jackey/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ final HostAndPort getHostAndPort() {
return ((DefaultJedisSocketFactory) socketFactory).getHostAndPort();
}

public final ConnectionPool getMemberOf() {
return memberOf;
}

public int getSoTimeout() {
return soTimeout;
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/jackey/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class Protocol {
private static final String NOAUTH_PREFIX = "NOAUTH";
private static final String WRONGPASS_PREFIX = "WRONGPASS";
private static final String NOPERM_PREFIX = "NOPERM";
private static final String REDIRECT_PREFIX = "REDIRECT ";

private Protocol() {
throw new InstantiationError("Must not instantiate this class");
Expand Down Expand Up @@ -100,6 +101,9 @@ private static void processError(final RedisInputStream is) {
// throw new JedisAskDataException(message, new HostAndPort(askInfo[1],
// Integer.parseInt(askInfo[2])), Integer.parseInt(askInfo[0]));
throw new JedisAskDataException(message, HostAndPort.from(askInfo[1]), Integer.parseInt(askInfo[0]));
} else if (message.startsWith(REDIRECT_PREFIX)) {
String host = parseTargetHost(message);
throw new JedisRedirectionException(message, HostAndPort.from(host), -1); // slot -1 means standalone
} else if (message.startsWith(CLUSTERDOWN_PREFIX)) {
throw new JedisClusterException(message);
} else if (message.startsWith(BUSY_PREFIX)) {
Expand Down Expand Up @@ -140,6 +144,11 @@ private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
return response;
}

private static String parseTargetHost(String clusterRedirectResponse) {
String[] messageInfo = clusterRedirectResponse.split(" ");
return messageInfo[1];
}

private static Object process(final RedisInputStream is) {
final byte b = is.readByte();
//System.out.println((char) b);
Expand Down
153 changes: 153 additions & 0 deletions src/main/java/io/jackey/executors/RedirectCommandExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package io.jackey.executors;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import io.jackey.CommandObject;
import io.jackey.Connection;
import io.jackey.annots.VisibleForTesting;
import io.jackey.exceptions.JedisClusterOperationException;
import io.jackey.exceptions.JedisConnectionException;
import io.jackey.exceptions.JedisException;
import io.jackey.exceptions.JedisRedirectionException;
import io.jackey.providers.RedirectConnectionProvider;
import io.jackey.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedirectCommandExecutor implements CommandExecutor {

private final Logger log = LoggerFactory.getLogger(getClass());

public final RedirectConnectionProvider provider;
protected final int maxAttempts;
protected final Duration maxTotalRetriesDuration;

public RedirectCommandExecutor(RedirectConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
this.provider = provider;
this.maxAttempts = maxAttempts;
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
}

@Override
public void close() {
this.provider.close();
}

@Override
public final <T> T executeCommand(CommandObject<T> commandObject) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

int consecutiveConnectionFailures = 0;
Exception lastException = null;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Connection connection = null;
try {
connection = provider.getConnection();
return execute(connection, commandObject);

} catch (JedisConnectionException jce) {
lastException = jce;
++consecutiveConnectionFailures;
log.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
}
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
log.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
provider.renewPool(connection, jre.getTargetNode());
} finally {
IOUtils.closeQuietly(connection);
}
if (Instant.now().isAfter(deadline)) {
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
}
}

JedisException maxRedirectException = new JedisException("No more redirect attempts left.");
maxRedirectException.addSuppressed(lastException);
throw maxRedirectException;
}

/**
* WARNING: This method is accessible for the purpose of testing.
* This should not be used or overriden.
*/
@VisibleForTesting
protected <T> T execute(Connection connection, CommandObject<T> commandObject) {
return connection.executeCommand(commandObject);
}

/**
* Related values should be reset if <code>TRUE</code> is returned.
*
* @param attemptsLeft
* @param consecutiveConnectionFailures
* @param doneDeadline
* @return true - if some actions are taken
* <br /> false - if no actions are taken
*/
private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) {
if (this.maxAttempts < 3) {
// Since we only renew the slots cache after two consecutive connection
// failures (see consecutiveConnectionFailures above), we need to special
// case the situation where we max out after two or fewer attempts.
// Otherwise, on two or fewer max attempts, the slots cache would never be
// renewed.
if (attemptsLeft == 0) {
provider.renewPool();
return true;
}
return false;
}

if (consecutiveConnectionFailures < 2) {
return false;
}

sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
//We need this because if node is not reachable anymore - we need to finally initiate slots
//renewing, or we can stuck with cluster state without one node in opposite case.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
provider.renewPool();
return true;
}

private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) {
if (attemptsLeft <= 0) {
return 0;
}

long millisLeft = Duration.between(Instant.now(), deadline).toMillis();
if (millisLeft < 0) {
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
}

long maxBackOff = millisLeft / (attemptsLeft * attemptsLeft);
return ThreadLocalRandom.current().nextLong(maxBackOff + 1);
}

/**
* WARNING: This method is accessible for the purpose of testing.
* This should not be used or overriden.
*/
@VisibleForTesting
protected void sleep(long sleepMillis) {
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
throw new JedisClusterOperationException(e);
}
}
}
96 changes: 96 additions & 0 deletions src/main/java/io/jackey/providers/RedirectConnectionProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.jackey.providers;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.jackey.CommandArguments;
import io.jackey.Connection;
import io.jackey.ConnectionPool;
import io.jackey.DefaultJedisClientConfig;
import io.jackey.HostAndPort;
import io.jackey.JedisClientConfig;
import io.jackey.util.Pool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class RedirectConnectionProvider implements ConnectionProvider {
private Pool<Connection> pool;
private HostAndPort hostAndPort;
private JedisClientConfig clientConfig;
private GenericObjectPoolConfig<Connection> poolConfig;
private final Lock lock = new ReentrantLock();

public RedirectConnectionProvider(HostAndPort hostAndPort) {
this(hostAndPort, DefaultJedisClientConfig.builder().build());
}

public RedirectConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(hostAndPort, clientConfig, new GenericObjectPoolConfig<>());
}

public RedirectConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig) {
this.hostAndPort = hostAndPort;
this.clientConfig = clientConfig;
this.poolConfig = poolConfig;
this.pool = new ConnectionPool(hostAndPort, clientConfig, poolConfig);
}

public void renewPool() {
lock.lock();
try {
this.pool = new ConnectionPool(hostAndPort, clientConfig, poolConfig);
} finally {
lock.unlock();
}
}

public void renewPool(Connection connection, HostAndPort hostAndPort) {
lock.lock();
try {
if (connection.getMemberOf().isClosed()) {
return;
}
this.hostAndPort = hostAndPort;
this.pool = new ConnectionPool(hostAndPort, clientConfig, poolConfig);
} finally {
lock.unlock();
}
}

@Override
public void close() {
lock.lock();
try {
pool.destroy();
} finally {
lock.unlock();
}
}

@Override
public Connection getConnection(CommandArguments args) {
return getConnection();
}

@Override
public Connection getConnection() {
lock.lock();
try {
return pool.getResource();
} finally {
lock.unlock();
}
}

@Override
public Map<?, Pool<Connection>> getConnectionMap() {
lock.lock();
try {
return Collections.singletonMap(hostAndPort, pool);
} finally {
lock.unlock();
}
}
}

0 comments on commit b773f91

Please sign in to comment.