Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SOCK_NONBLOCK to reduce system calls for outgoing connections #293

Merged
merged 4 commits into from Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 49 additions & 39 deletions src/anet.c
Expand Up @@ -381,19 +381,51 @@ static int anetSetReuseAddr(char *err, int fd) {
return ANET_OK;
}

static int anetCreateSocket(char *err, int domain) {
/* In general, SOCK_CLOEXEC won't have noticeable effect
* except for cases which really need this flag.
* Otherwise, it is just a flag that is nice to have.
* Its absence shouldn't affect a common socket's functionality.
*/
#define ANET_SOCKET_CLOEXEC 1
#define ANET_SOCKET_NONBLOCK 2
#define ANET_SOCKET_REUSEADDR 4
static int anetCreateSocket(char *err, int domain, int type, int protocol, int flags) {
int s;
if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {

#ifdef SOCK_CLOEXEC
if (flags & ANET_SOCKET_CLOEXEC) {
type |= SOCK_CLOEXEC;
flags &= ~ANET_SOCKET_CLOEXEC;
}
#endif

#ifdef SOCK_NONBLOCK
if (flags & ANET_SOCKET_NONBLOCK) {
type |= SOCK_NONBLOCK;
flags &= ~ANET_SOCKET_NONBLOCK;
}
#endif

if ((s = socket(domain, type, protocol)) == -1) {
anetSetError(err, "creating socket: %s", strerror(errno));
return ANET_ERR;
}

/* Make sure connection-intensive things like the benchmark tool
* will be able to close/open sockets a zillion of times */
if (anetSetReuseAddr(err,s) == ANET_ERR) {
if (flags & ANET_SOCKET_CLOEXEC && anetCloexec(s) == ANET_ERR) {
panjf2000 marked this conversation as resolved.
Show resolved Hide resolved
close(s);
return ANET_ERR;
}

if (flags & ANET_SOCKET_NONBLOCK && anetNonBlock(err, s) == ANET_ERR) {
close(s);
return ANET_ERR;
}

if (flags & ANET_SOCKET_REUSEADDR && anetSetReuseAddr(err,s) == ANET_ERR) {
close(s);
return ANET_ERR;
}

return s;
}

Expand All @@ -419,12 +451,15 @@ static int anetTcpGenericConnect(char *err, const char *addr, int port,
for (p = servinfo; p != NULL; p = p->ai_next) {
/* Try to create the socket and to connect it.
* If we fail in the socket() call, or on connect(), we retry with
* the next entry in servinfo. */
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
* the next entry in servinfo.
*
* Make sure connection-intensive things like the benchmark tool
* will be able to close/open sockets a zillion of times.
*/
int sockflags = ANET_SOCKET_CLOEXEC | ANET_SOCKET_REUSEADDR;
if (flags & ANET_CONNECT_NONBLOCK) sockflags |= ANET_SOCKET_NONBLOCK;
if ((s = anetCreateSocket(err,p->ai_family,p->ai_socktype,p->ai_protocol,sockflags)) == ANET_ERR)
continue;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)
goto error;
if (source_addr) {
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
Expand Down Expand Up @@ -492,34 +527,6 @@ int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port,
ANET_CONNECT_NONBLOCK|ANET_CONNECT_BE_BINDING);
}

int anetUnixGenericConnect(char *err, const char *path, int flags)
{
int s;
panjf2000 marked this conversation as resolved.
Show resolved Hide resolved
struct sockaddr_un sa;

if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)
return ANET_ERR;

sa.sun_family = AF_LOCAL;
valkey_strlcpy(sa.sun_path,path,sizeof(sa.sun_path));
if (flags & ANET_CONNECT_NONBLOCK) {
if (anetNonBlock(err,s) != ANET_OK) {
close(s);
return ANET_ERR;
}
}
if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) {
if (errno == EINPROGRESS &&
flags & ANET_CONNECT_NONBLOCK)
return s;

anetSetError(err, "connect: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return s;
}

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog, mode_t perm) {
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
Expand Down Expand Up @@ -608,7 +615,10 @@ int anetUnixServer(char *err, char *path, mode_t perm, int backlog)
anetSetError(err,"unix socket path too long (%zu), must be under %zu", strlen(path), sizeof(sa.sun_path));
return ANET_ERR;
}
if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)

int type = SOCK_STREAM;
int flags = ANET_SOCKET_CLOEXEC | ANET_SOCKET_NONBLOCK | ANET_SOCKET_REUSEADDR;
if ((s = anetCreateSocket(err,AF_LOCAL,type,0,flags)) == ANET_ERR)
return ANET_ERR;

memset(&sa,0,sizeof(sa));
Expand Down
2 changes: 0 additions & 2 deletions src/unix.c
Expand Up @@ -66,8 +66,6 @@ static int connUnixListen(connListener *listener) {
serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL, fd);
anetCloexec(fd);
listener->fd[listener->count++] = fd;
}

Expand Down