Skip to content

Commit

Permalink
[release-1.21] Fix eds cluster loadassignment with invalid endpoint (#…
Browse files Browse the repository at this point in the history
…50720)

* move port name and labels check outside of filterIstioEndpoint (#49629)

* move port name and labels check outside of filterIstioEndpoint

* fix slices filter

* fix ut

* Fix eds cluster loadassignment with invalid endpoint (#50690)

* fix eds cluster has invalid endpoints

* Add release note

* Fix specical case ep address empty

* add uds addr check

* use IsValidIP

---------

Co-authored-by: Zhonghu Xu <xuzhonghu@huawei.com>
  • Loading branch information
zirain and hzxuzhonghu committed Apr 29, 2024
1 parent 7e1e4ae commit a276ce8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
42 changes: 27 additions & 15 deletions pilot/pkg/xds/endpoints/endpoint_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/spiffe"
"istio.io/istio/pkg/util/hash"
netutil "istio.io/istio/pkg/util/net"
)

var (
Expand Down Expand Up @@ -327,7 +328,30 @@ func (b *EndpointBuilder) FromServiceEndpoints() []*endpoint.LocalityLbEndpoints
// BuildClusterLoadAssignment converts the shards for this EndpointBuilder's Service
// into a ClusterLoadAssignment. Used for EDS.
func (b *EndpointBuilder) BuildClusterLoadAssignment(endpointIndex *model.EndpointIndex) *endpoint.ClusterLoadAssignment {
svcPort := b.servicePort(b.port)
if svcPort == nil {
return buildEmptyClusterLoadAssignment(b.clusterName)
}
svcEps := b.snapshotShards(endpointIndex)
svcEps = slices.FilterInPlace(svcEps, func(ep *model.IstioEndpoint) bool {
// filter out endpoints that don't match the service port
if svcPort.Name != ep.ServicePortName {
return false
}
// filter out endpoint that has invalid ip address, mostly domain name. Because this is generated from ServiceEntry.
// There are other two cases that should not be filtered out:
// 1. ep.Address can be empty since https://github.com/istio/istio/pull/45150, in this case we will replace it with gateway ip.
// 2. ep.Address can be uds when EndpointPort = 0
if ep.Address != "" && ep.EndpointPort != 0 && !netutil.IsValidIPAddress(ep.Address) {
return false
}
// filter out endpoints that don't match the subset
if !b.subsetLabels.SubsetOf(ep.Labels) {
return false
}
return true
})

localityLbEndpoints := b.generate(svcEps, false)
if len(localityLbEndpoints) == 0 {
return buildEmptyClusterLoadAssignment(b.clusterName)
Expand Down Expand Up @@ -362,13 +386,9 @@ func (b *EndpointBuilder) generate(eps []*model.IstioEndpoint, allowPrecomputed
if !b.ServiceFound() {
return nil
}
svcPort := b.servicePort(b.port)
if svcPort == nil {
return nil
}

eps = slices.Filter(eps, func(ep *model.IstioEndpoint) bool {
return b.filterIstioEndpoint(ep, svcPort)
return b.filterIstioEndpoint(ep)
})

localityEpMap := make(map[string]*LocalityEndpoints)
Expand Down Expand Up @@ -461,7 +481,7 @@ func addUint32(left, right uint32) (uint32, bool) {
return left + right, false
}

func (b *EndpointBuilder) filterIstioEndpoint(ep *model.IstioEndpoint, svcPort *model.Port) bool {
func (b *EndpointBuilder) filterIstioEndpoint(ep *model.IstioEndpoint) bool {
// for ServiceInternalTrafficPolicy
if b.service.Attributes.NodeLocal && ep.NodeName != b.proxy.GetNodeName() {
return false
Expand All @@ -481,13 +501,6 @@ func (b *EndpointBuilder) filterIstioEndpoint(ep *model.IstioEndpoint, svcPort *
if !ep.IsDiscoverableFromProxy(b.proxy) {
return false
}
if svcPort.Name != ep.ServicePortName {
return false
}
// Port labels
if !b.subsetLabels.SubsetOf(ep.Labels) {
return false
}
// If we don't know the address we must eventually use a gateway address
if ep.Address == "" && (!b.gateways().IsMultiNetworkEnabled() || b.proxy.InNetwork(ep.Network)) {
return false
Expand Down Expand Up @@ -518,9 +531,9 @@ func (b *EndpointBuilder) snapshotShards(endpointIndex *model.EndpointIndex) []*
// Determine whether or not the target service is considered local to the cluster
// and should, therefore, not be accessed from outside the cluster.
isClusterLocal := b.clusterLocal

var eps []*model.IstioEndpoint
shards.RLock()
defer shards.RUnlock()
// Extract shard keys so we can iterate in order. This ensures a stable EDS output.
keys := shards.Keys()
// The shards are updated independently, now need to filter and merge for this cluster
Expand All @@ -534,7 +547,6 @@ func (b *EndpointBuilder) snapshotShards(endpointIndex *model.EndpointIndex) []*
}
eps = append(eps, shards.Shards[shardKey]...)
}
shards.RUnlock()
return eps
}

Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/xds/endpoints/ep_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ func testShards() *model.EndpointIndex {
// network1 has one endpoint in each cluster
{Cluster: "cluster1a"}: {
{Network: "network1", Address: "10.0.0.1"},
{Network: "network1", Address: "foo.bar"}, // endpoint generated from ServiceEntry
},
{Cluster: "cluster1b"}: {
{Network: "network1", Address: "10.0.0.2"},
Expand Down
8 changes: 8 additions & 0 deletions releasenotes/notes/50688.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: release-notes/v2
kind: bug-fix
area: traffic-management
issue:
- 50688
releaseNotes:
- |
**Fixed** build EDS typed cluster endpoints with domain address.

0 comments on commit a276ce8

Please sign in to comment.