Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: refactor server code in preparation for a move #50515

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 37 additions & 39 deletions pilot/pkg/model/context.go
Expand Up @@ -37,6 +37,7 @@ import (
"istio.io/istio/pilot/pkg/serviceregistry/util/label"
"istio.io/istio/pilot/pkg/trustbundle"
networkutil "istio.io/istio/pilot/pkg/util/network"
v3 "istio.io/istio/pilot/pkg/xds/v3"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/host"
Expand All @@ -51,6 +52,7 @@ import (
netutil "istio.io/istio/pkg/util/net"
"istio.io/istio/pkg/util/protomarshal"
"istio.io/istio/pkg/util/sets"
"istio.io/istio/pkg/xds"
)

type (
Expand Down Expand Up @@ -271,11 +273,7 @@ func AnyToUnnamedResources(r []*anypb.Any) Resources {
}

func ResourcesToAny(r Resources) []*anypb.Any {
a := make([]*anypb.Any, 0, len(r))
for _, rr := range r {
a = append(a, rr.Resource)
}
return a
return xds.ResourcesToAny(r)
}

// XdsUpdates include information about the subset of updated resources.
Expand Down Expand Up @@ -410,40 +408,7 @@ type Proxy struct {
LastPushTime time.Time
}

// WatchedResource tracks an active DiscoveryRequest subscription.
type WatchedResource struct {
// TypeUrl is copied from the DiscoveryRequest.TypeUrl that initiated watching this resource.
// nolint
TypeUrl string

// ResourceNames tracks the list of resources that are actively watched.
// For LDS and CDS, all resources of the TypeUrl type are watched if it is empty.
// For endpoints the resource names will have list of clusters and for clusters it is empty.
// For Delta Xds, all resources of the TypeUrl that a client has subscribed to.
ResourceNames []string

// Wildcard indicates the subscription is a wildcard subscription. This only applies to types that
// allow both wildcard and non-wildcard subscriptions.
Wildcard bool

// NonceSent is the nonce sent in the last sent response. If it is equal with NonceAcked, the
// last message has been processed. If empty: we never sent a message of this type.
NonceSent string

// NonceAcked is the last acked message.
NonceAcked string

// AlwaysRespond, if true, will ensure that even when a request would otherwise be treated as an
// ACK, it will be responded to. This typically happens when a proxy reconnects to another instance of
// Istiod. In that case, Envoy expects us to respond to EDS/RDS/SDS requests to finish warming of
// clusters/listeners.
// Typically, this should be set to 'false' after response; keeping it true would likely result in an endless loop.
AlwaysRespond bool

// LastResources tracks the contents of the last push.
// This field is extremely expensive to maintain and is typically disabled
LastResources Resources
}
type WatchedResource = xds.WatchedResource

var istioVersionRegexp = regexp.MustCompile(`^([1-9]+)\.([0-9]+)(\.([0-9]+))?`)

Expand Down Expand Up @@ -962,6 +927,39 @@ func (node *Proxy) GetWatchedResource(typeURL string) *WatchedResource {
return node.WatchedResources[typeURL]
}

func (node *Proxy) NewWatchedResource(typeURL string, names []string) {
node.Lock()
defer node.Unlock()

node.WatchedResources[typeURL] = &WatchedResource{TypeUrl: typeURL, ResourceNames: names}
// For all EDS requests that we have already responded with in the same stream let us
// force the response. It is important to respond to those requests for Envoy to finish
// warming of those resources(Clusters).
// This can happen with the following sequence
// 1. Envoy disconnects and reconnects to Istiod.
// 2. Envoy sends EDS request and we respond with it.
// 3. Envoy sends CDS request and we respond with clusters.
// 4. Envoy detects a change in cluster state and tries to warm those clusters and send EDS request for them.
// 5. We should respond to the EDS request with Endpoints to let Envoy finish cluster warming.
keithmattix marked this conversation as resolved.
Show resolved Hide resolved
// Refer to https://github.com/envoyproxy/envoy/issues/13009 for more details.
for _, dependent := range WarmingDependencies(typeURL) {
if dwr, exists := node.WatchedResources[dependent]; exists {
dwr.AlwaysRespond = true
}
}
}

// WarmingDependencies returns the dependent typeURLs that need to be responded with
// for warming of this typeURL.
func WarmingDependencies(typeURL string) []string {
switch typeURL {
case v3.ClusterType:
return []string{v3.EndpointType}
default:
return nil
}
}

func (node *Proxy) AddOrUpdateWatchedResource(r *WatchedResource) {
if r == nil {
return
Expand Down
13 changes: 2 additions & 11 deletions pilot/pkg/model/push_context.go
Expand Up @@ -49,6 +49,7 @@ import (
"istio.io/istio/pkg/spiffe"
"istio.io/istio/pkg/util/sets"
"istio.io/istio/pkg/workloadapi"
"istio.io/istio/pkg/xds"
)

// Metrics is an interface for capturing metrics on a per-node basis.
Expand Down Expand Up @@ -380,17 +381,7 @@ type PushRequest struct {
Delta ResourceDelta
}

// ResourceDelta records the difference in requested resources by an XDS client
type ResourceDelta struct {
// Subscribed indicates the client requested these additional resources
Subscribed sets.String
// Unsubscribed indicates the client no longer requires these resources
Unsubscribed sets.String
}

func (rd ResourceDelta) IsEmpty() bool {
return len(rd.Subscribed) == 0 && len(rd.Unsubscribed) == 0
}
type ResourceDelta = xds.ResourceDelta

type ReasonStats map[TriggerReason]int

Expand Down