Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.21] Fix eds cluster loadassignment with invalid endpoint #50720

Merged
merged 2 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 27 additions & 15 deletions pilot/pkg/xds/endpoints/endpoint_builder.go
Expand Up @@ -42,6 +42,7 @@ import (
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/spiffe"
"istio.io/istio/pkg/util/hash"
netutil "istio.io/istio/pkg/util/net"
)

var (
Expand Down Expand Up @@ -327,7 +328,30 @@ func (b *EndpointBuilder) FromServiceEndpoints() []*endpoint.LocalityLbEndpoints
// BuildClusterLoadAssignment converts the shards for this EndpointBuilder's Service
// into a ClusterLoadAssignment. Used for EDS.
func (b *EndpointBuilder) BuildClusterLoadAssignment(endpointIndex *model.EndpointIndex) *endpoint.ClusterLoadAssignment {
svcPort := b.servicePort(b.port)
if svcPort == nil {
return buildEmptyClusterLoadAssignment(b.clusterName)
}
svcEps := b.snapshotShards(endpointIndex)
svcEps = slices.FilterInPlace(svcEps, func(ep *model.IstioEndpoint) bool {
// filter out endpoints that don't match the service port
if svcPort.Name != ep.ServicePortName {
return false
}
// filter out endpoint that has invalid ip address, mostly domain name. Because this is generated from ServiceEntry.
// There are other two cases that should not be filtered out:
// 1. ep.Address can be empty since https://github.com/istio/istio/pull/45150, in this case we will replace it with gateway ip.
// 2. ep.Address can be uds when EndpointPort = 0
if ep.Address != "" && ep.EndpointPort != 0 && !netutil.IsValidIPAddress(ep.Address) {
return false
}
// filter out endpoints that don't match the subset
if !b.subsetLabels.SubsetOf(ep.Labels) {
return false
}
return true
})

localityLbEndpoints := b.generate(svcEps, false)
if len(localityLbEndpoints) == 0 {
return buildEmptyClusterLoadAssignment(b.clusterName)
Expand Down Expand Up @@ -362,13 +386,9 @@ func (b *EndpointBuilder) generate(eps []*model.IstioEndpoint, allowPrecomputed
if !b.ServiceFound() {
return nil
}
svcPort := b.servicePort(b.port)
if svcPort == nil {
return nil
}

eps = slices.Filter(eps, func(ep *model.IstioEndpoint) bool {
return b.filterIstioEndpoint(ep, svcPort)
return b.filterIstioEndpoint(ep)
})

localityEpMap := make(map[string]*LocalityEndpoints)
Expand Down Expand Up @@ -461,7 +481,7 @@ func addUint32(left, right uint32) (uint32, bool) {
return left + right, false
}

func (b *EndpointBuilder) filterIstioEndpoint(ep *model.IstioEndpoint, svcPort *model.Port) bool {
func (b *EndpointBuilder) filterIstioEndpoint(ep *model.IstioEndpoint) bool {
// for ServiceInternalTrafficPolicy
if b.service.Attributes.NodeLocal && ep.NodeName != b.proxy.GetNodeName() {
return false
Expand All @@ -481,13 +501,6 @@ func (b *EndpointBuilder) filterIstioEndpoint(ep *model.IstioEndpoint, svcPort *
if !ep.IsDiscoverableFromProxy(b.proxy) {
return false
}
if svcPort.Name != ep.ServicePortName {
return false
}
// Port labels
if !b.subsetLabels.SubsetOf(ep.Labels) {
return false
}
// If we don't know the address we must eventually use a gateway address
if ep.Address == "" && (!b.gateways().IsMultiNetworkEnabled() || b.proxy.InNetwork(ep.Network)) {
return false
Expand Down Expand Up @@ -518,9 +531,9 @@ func (b *EndpointBuilder) snapshotShards(endpointIndex *model.EndpointIndex) []*
// Determine whether or not the target service is considered local to the cluster
// and should, therefore, not be accessed from outside the cluster.
isClusterLocal := b.clusterLocal

var eps []*model.IstioEndpoint
shards.RLock()
defer shards.RUnlock()
// Extract shard keys so we can iterate in order. This ensures a stable EDS output.
keys := shards.Keys()
// The shards are updated independently, now need to filter and merge for this cluster
Expand All @@ -534,7 +547,6 @@ func (b *EndpointBuilder) snapshotShards(endpointIndex *model.EndpointIndex) []*
}
eps = append(eps, shards.Shards[shardKey]...)
}
shards.RUnlock()
return eps
}

Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/xds/endpoints/ep_filters_test.go
Expand Up @@ -834,6 +834,7 @@ func testShards() *model.EndpointIndex {
// network1 has one endpoint in each cluster
{Cluster: "cluster1a"}: {
{Network: "network1", Address: "10.0.0.1"},
{Network: "network1", Address: "foo.bar"}, // endpoint generated from ServiceEntry
},
{Cluster: "cluster1b"}: {
{Network: "network1", Address: "10.0.0.2"},
Expand Down
8 changes: 8 additions & 0 deletions releasenotes/notes/50688.yaml
@@ -0,0 +1,8 @@
apiVersion: release-notes/v2
kind: bug-fix
area: traffic-management
issue:
- 50688
releaseNotes:
- |
**Fixed** build EDS typed cluster endpoints with domain address.