Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use the partitioning strategy with the http client? #953

Open
gabrielgiussi opened this issue Jul 11, 2023 · 2 comments
Open

How to use the partitioning strategy with the http client? #953

gabrielgiussi opened this issue Jul 11, 2023 · 2 comments

Comments

@gabrielgiussi
Copy link

Is your feature request related to a problem? Please describe.
I want to perform load balancing based on the request to g et affinity with servers that have a hot cache for the data each request will access.
Currently this can't be done in the Load Balancer layer because the pick method doesn't have access to the request.

Describe the solution you'd like
I think having a partitioning aware client could be the solution, but I wonder why the service ConsistentHashPartitioningService is private, which makes it impossible to reuse with an Http Client.
Even if that class wasn't private, I'm not sure how the wiring should be done, it seems I need to use the Stack API.

Describe alternatives you've considered
Using a ServiceFactory to instantiate a service per partition (since it performs LB only once), but this makes using the client much harder since now I have to maintaining this mapping of key to service.
Besides that, I don't have a way to achieve the affinity, aka telling that service instance which is the subset of nodes that should use to load balance.

@xin301x
Copy link

xin301x commented Jul 11, 2023 via email

@gabrielgiussi
Copy link
Author

I was able to use the ConsistentHashPartitioningService with the Http client and it is working as expected, the HashRingNodeManager returns one Service from the ring and this only sees one Host, so whatever load balancer strategy was configured it will always route to that node.

However, I want to also add a new behavior which is fallback to the configured load balancing strategy when the header that includes the partition key is not present.
The ConsistentHashPartitioningService doesn't allow such fallback, since we can only return a failed feature when the partition key is not present, overriding noPartitionInformationHandler.
The approach I've implemented is to return a Service wrapper that checks for the presence of the header, in which case it forwards to the partitioning service, otherwise it materializes a ServiceFactory from the stack and forwards to it.
This last part is the one I don't understand if it is semantically correct and/or safe in terms of resource utilization, but it achieves the intended behavior.

Here is the module

import com.twitter.finagle
import com.twitter.finagle.Stack.{Module0, Params}
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.loadbalancer.LoadBalancerFactory
import com.twitter.finagle.partitioning.{ConsistentHashPartitioningService, PartitioningService}
import com.twitter.finagle.partitioning.param.{KeyHasher, NumReps}
import com.twitter.util.Future
import sun.nio.cs.UTF_8

object AffinityHttpClientModule {

  val role = Stack.Role("HttpPartitioning")

  val module = new Stack.Module[ServiceFactory[Request,Response]] {

    val parameters = Seq(
      implicitly[Stack.Param[LoadBalancerFactory.Dest]],
      implicitly[Stack.Param[finagle.param.Stats]]
    )

    def newConsistentHashPartitioningService(underlying: Stack[ServiceFactory[Request, Response]],
                                             params: Params
                                            ): ConsistentHashPartitioningService[Request, Response, String] = {
      val KeyHasher(hasher) = params[KeyHasher]
      val NumReps(numReps) = params[NumReps]

      new ConsistentHashPartitioningService[Request,Response,String](underlying,params,hasher,numReps) {

        override protected def getKeyBytes(key: String): Array[Byte] = key.getBytes(UTF_8.INSTANCE)

        override protected def getPartitionKeys(request: Request): Iterable[String] = {
          request.headerMap.get("request-key").toList
        }

        override protected def createPartitionRequestForKeys(original: Request, keys: Seq[String]): Request = {
          // TODO We are not creating multiple requests from one so...
          original
        }

        override protected def mergeResponses(originalReq: Request, results: PartitioningService.PartitionedResults[Request, Response]): Response = {
          // TODO I'm not doing N requests so this should be enough
          results.successes.toList match {
            case (_,res)::_ => res
            case _ => throw results.failures.toList.head._2
          }
        }

        override protected def noPartitionInformationHandler(req: Request): Future[Nothing] = {
          Future.exception(new Exception())
        }
      }
    }

    final override def make(
                             params: Params,
                             next: Stack[ServiceFactory[Request, Response]]
                           ): Stack[ServiceFactory[Request, Response]] = {
      val partitioningService: Service[Request, Response] = newConsistentHashPartitioningService(next, params)

      // TODO validate if it is ok to materialize the stack here
      val serviceFactory = next.make(params)

      Stack.leaf(role, ServiceFactory.const(new Service[Request,Response]{
        override def apply(request: Request): Future[Response] = {
          partitioningService(request)
          // if the request key is set choose the host based on the consistent hash of the key
          if (request.headerMap.contains("request-key"))
            partitioningService(request)
          else {
            // otherwise, load balance using the configured load balancer
            serviceFactory.apply().flatMap(_.apply(request))
          }

        }
      }))
    }

    override def role: Stack.Role = AffinityHttpClientModule.role

    override def description: String = ""
  }

}

And then I do a insertAfter in the http client stack

  def client: Http.Client = Client().withStack(_.insertAfter(
    BindingFactory.role,
    AffinityHttpClientModule.module
  ))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants