// Package cache provides a caching layer for Kubernetes cluster resources with support for
// hierarchical parent-child relationships, including cross-namespace relationships between
// cluster-scoped parents and namespaced children.
//
// The cache maintains:
//   - A complete index of all monitored resources in the cluster
//   - Hierarchical relationships between resources via owner references
//   - Cross-namespace relationships from cluster-scoped resources to namespaced children
//   - Efficient traversal of resource hierarchies for dependency analysis
//
// Key features:
//   - Watches cluster resources and maintains an in-memory cache synchronized with the cluster state
//   - Supports both same-namespace parent-child relationships and cross-namespace relationships
//   - Uses pre-computed indexes for efficient hierarchy traversal without full cluster scans
//   - Provides configurable namespaces and resource filtering
//   - Handles dynamic resource discovery including CRDs
//
// Cross-namespace hierarchy traversal:
// The cache supports discovering namespaced resources that are owned by cluster-scoped resources.
// This is essential for tracking resources like namespaced Deployments owned by cluster-scoped
// custom resources.
//
// The parentUIDToChildren index enables efficient O(1) cross-namespace traversal by mapping
// any resource's UID to its direct children, eliminating the need for O(n) graph building.
package cache

import (
	"context"
	"fmt"
	"runtime/debug"
	"strings"
	"sync"
	"time"

	"github.com/go-logr/logr"
	"golang.org/x/sync/semaphore"
	authorizationv1 "k8s.io/api/authorization/v1"
	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/managedfields"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/apimachinery/pkg/watch"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/kubernetes"
	authType1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/pager"
	watchutil "k8s.io/client-go/tools/watch"
	"k8s.io/client-go/util/retry"
	"k8s.io/klog/v2/textlogger"
	"k8s.io/kubectl/pkg/util/openapi"

	"github.com/argoproj/argo-cd/gitops-engine/pkg/utils/kube"
	"github.com/argoproj/argo-cd/gitops-engine/pkg/utils/tracing"
)

const (
	watchResourcesRetryTimeout = 1 * time.Second
	ClusterRetryTimeout        = 10 * time.Second

	// default duration before we invalidate entire cluster cache. Can be set to 0 to never invalidate cache
	defaultClusterResyncTimeout = 24 * time.Hour

	// default duration before restarting individual resource watch
	defaultWatchResyncTimeout = 10 * time.Minute

	// Same page size as in k8s.io/client-go/tools/pager/pager.go
	defaultListPageSize = 500
	// Prefetch only a single page
	defaultListPageBufferSize = 1
	// listSemaphore is used to limit the number of concurrent memory consuming operations on the
	// k8s list queries results.
	// Limit is required to avoid memory spikes during cache initialization.
	// The default limit of 50 is chosen based on experiments.
	defaultListSemaphoreWeight = 50
	// defaultEventProcessingInterval is the default interval for processing events
	defaultEventProcessingInterval = 100 * time.Millisecond
)

const (
	// RespectRbacDisabled default value for respectRbac
	RespectRbacDisabled = iota
	// RespectRbacNormal checks only api response for forbidden/unauthorized errors
	RespectRbacNormal
	// RespectRbacStrict checks both api response for forbidden/unauthorized errors and SelfSubjectAccessReview
	RespectRbacStrict
)

// callState tracks whether action() has been called on a resource during hierarchy iteration.
type callState int

const (
	notCalled  callState = iota // action() has not been called yet
	inProgress                  // action() is currently being processed (in call stack)
	completed                   // action() has been called and processing is complete
)

type apiMeta struct {
	namespaced bool
	// watchCancel stops the watch of all resources for this API. This gets called when the cache is invalidated or when
	// the watched API ceases to exist (e.g. a CRD gets deleted).
	watchCancel context.CancelFunc
}

type eventMeta struct {
	event watch.EventType
	un    *unstructured.Unstructured
}

// ClusterInfo holds cluster cache stats
type ClusterInfo struct {
	// Server holds cluster API server URL
	Server string
	// K8SVersion holds Kubernetes version
	K8SVersion string
	// ResourcesCount holds number of observed Kubernetes resources
	ResourcesCount int
	// APIsCount holds number of observed Kubernetes API count
	APIsCount int
	// LastCacheSyncTime holds time of most recent cache synchronization
	LastCacheSyncTime *time.Time
	// SyncError holds most recent cache synchronization error
	SyncError error
	// APIResources holds list of API resources supported by the cluster
	APIResources []kube.APIResourceInfo
}

// OnEventHandler is a function that handles Kubernetes event
type OnEventHandler func(event watch.EventType, un *unstructured.Unstructured)

// OnProcessEventsHandler handles process events event
type OnProcessEventsHandler func(duration time.Duration, processedEventsNumber int)

// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
type OnPopulateResourceInfoHandler func(un *unstructured.Unstructured, isRoot bool) (info any, cacheManifest bool)

// OnResourceUpdatedHandler handlers resource update event
type (
	OnResourceUpdatedHandler func(newRes *Resource, oldRes *Resource, namespaceResources map[kube.ResourceKey]*Resource)
	Unsubscribe              func()
)

type ClusterCache interface {
	// EnsureSynced checks cache state and synchronizes it if necessary
	EnsureSynced() error
	// GetServerVersion returns observed cluster version
	GetServerVersion() string
	// GetAPIResources returns information about observed API resources
	GetAPIResources() []kube.APIResourceInfo
	// GetOpenAPISchema returns open API schema of supported API resources
	GetOpenAPISchema() openapi.Resources
	// GetGVKParser returns a parser able to build a TypedValue used in
	// structured merge diffs.
	GetGVKParser() *managedfields.GvkParser
	// Invalidate cache and executes callback that optionally might update cache settings
	Invalidate(opts ...UpdateSettingsFunc)
	// FindResources returns resources that matches given list of predicates from specified namespace or everywhere if specified namespace is empty
	FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource
	// IterateHierarchyV2 iterates resource tree starting from the specified top level resources and executes callback for each resource in the tree.
	// The action callback returns true if iteration should continue and false otherwise.
	IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool)
	// IsNamespaced answers if specified group/kind is a namespaced resource API or not
	IsNamespaced(gk schema.GroupKind) (bool, error)
	// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
	// The function returns all resources from cache for those `isManaged` function returns true and resources
	// specified in targetObjs list.
	GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(r *Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error)
	// GetClusterInfo returns cluster cache statistics
	GetClusterInfo() ClusterInfo
	// OnResourceUpdated register event handler that is executed every time when resource get's updated in the cache
	OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe
	// OnEvent register event handler that is executed every time when new K8S event received
	OnEvent(handler OnEventHandler) Unsubscribe
	// OnProcessEventsHandler register event handler that is executed every time when events were processed
	OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe
}

type WeightedSemaphore interface {
	Acquire(ctx context.Context, n int64) error
	TryAcquire(n int64) bool
	Release(n int64)
}

type ListRetryFunc func(err error) bool

// NewClusterCache creates new instance of cluster cache
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
	log := textlogger.NewLogger(textlogger.NewConfig())
	cache := &clusterCache{
		settings:           Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
		apisMeta:           make(map[schema.GroupKind]*apiMeta),
		eventMetaCh:        nil,
		listPageSize:       defaultListPageSize,
		listPageBufferSize: defaultListPageBufferSize,
		listSemaphore:      semaphore.NewWeighted(defaultListSemaphoreWeight),
		resources:          make(map[kube.ResourceKey]*Resource),
		nsIndex:            make(map[string]map[kube.ResourceKey]*Resource),
		config:             config,
		kubectl: &kube.KubectlCmd{
			Log:    log,
			Tracer: tracing.NopTracer{},
		},
		syncStatus: clusterCacheSync{
			resyncTimeout: defaultClusterResyncTimeout,
			syncTime:      nil,
		},
		watchResyncTimeout:      defaultWatchResyncTimeout,
		clusterSyncRetryTimeout: ClusterRetryTimeout,
		eventProcessingInterval: defaultEventProcessingInterval,
		resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
		eventHandlers:           map[uint64]OnEventHandler{},
		processEventsHandlers:   map[uint64]OnProcessEventsHandler{},
		log:                     log,
		listRetryLimit:          1,
		listRetryUseBackoff:     false,
		listRetryFunc:           ListRetryFuncNever,
		parentUIDToChildren:     make(map[types.UID]map[kube.ResourceKey]struct{}),
	}
	for i := range opts {
		opts[i](cache)
	}
	return cache
}

type clusterCache struct {
	syncStatus clusterCacheSync

	apisMeta              map[schema.GroupKind]*apiMeta
	batchEventsProcessing bool
	eventMetaCh           chan eventMeta
	serverVersion         string
	apiResources          []kube.APIResourceInfo
	// namespacedResources is a simple map which indicates a groupKind is namespaced
	namespacedResources map[schema.GroupKind]bool

	// maximum time we allow watches to run before relisting the group/kind and restarting the watch
	watchResyncTimeout time.Duration
	// sync retry timeout for cluster when sync error happens
	clusterSyncRetryTimeout time.Duration
	// ticker interval for events processing
	eventProcessingInterval time.Duration

	// size of a page for list operations pager.
	listPageSize int64
	// number of pages to prefetch for list pager.
	listPageBufferSize int32
	listSemaphore      WeightedSemaphore

	// retry options for list operations
	listRetryLimit      int32
	listRetryUseBackoff bool
	listRetryFunc       ListRetryFunc

	// lock is a rw lock which protects the fields of clusterInfo
	lock      sync.RWMutex
	resources map[kube.ResourceKey]*Resource
	nsIndex   map[string]map[kube.ResourceKey]*Resource

	kubectl          kube.Kubectl
	log              logr.Logger
	config           *rest.Config
	namespaces       []string
	clusterResources bool
	settings         Settings

	handlersLock                sync.Mutex
	handlerKey                  uint64
	populateResourceInfoHandler OnPopulateResourceInfoHandler
	resourceUpdatedHandlers     map[uint64]OnResourceUpdatedHandler
	eventHandlers               map[uint64]OnEventHandler
	processEventsHandlers       map[uint64]OnProcessEventsHandler
	openAPISchema               openapi.Resources
	gvkParser                   *managedfields.GvkParser

	respectRBAC int

	// Parent-to-children index for O(1) child lookup during hierarchy traversal
	// Maps any resource's UID to a set of its direct children's ResourceKeys
	// Using a set eliminates O(k) duplicate checking on insertions
	// Used for cross-namespace hierarchy traversal; namespaced traversal still builds a graph
	parentUIDToChildren map[types.UID]map[kube.ResourceKey]struct{}
}

type clusterCacheSync struct {
	// When using this struct:
	// 1) 'lock' mutex should be acquired when reading/writing from fields of this struct.
	// 2) The parent 'clusterCache.lock' does NOT need to be owned to r/w from fields of this struct (if it is owned, that is fine, but see below)
	// 3) To prevent deadlocks, do not acquire parent 'clusterCache.lock' after acquiring this lock; if you need both locks, always acquire the parent lock first
	lock          sync.Mutex
	syncTime      *time.Time
	syncError     error
	resyncTimeout time.Duration
}

// ListRetryFuncNever never retries on errors
func ListRetryFuncNever(_ error) bool {
	return false
}

// ListRetryFuncAlways always retries on errors
func ListRetryFuncAlways(_ error) bool {
	return true
}

// OnResourceUpdated register event handler that is executed every time when resource get's updated in the cache
func (c *clusterCache) OnResourceUpdated(handler OnResourceUpdatedHandler) Unsubscribe {
	c.handlersLock.Lock()
	defer c.handlersLock.Unlock()
	key := c.handlerKey
	c.handlerKey++
	c.resourceUpdatedHandlers[key] = handler
	return func() {
		c.handlersLock.Lock()
		defer c.handlersLock.Unlock()
		delete(c.resourceUpdatedHandlers, key)
	}
}

func (c *clusterCache) getResourceUpdatedHandlers() []OnResourceUpdatedHandler {
	c.handlersLock.Lock()
	defer c.handlersLock.Unlock()
	var handlers []OnResourceUpdatedHandler
	for _, h := range c.resourceUpdatedHandlers {
		handlers = append(handlers, h)
	}
	return handlers
}

// OnEvent register event handler that is executed every time when new K8S event received
func (c *clusterCache) OnEvent(handler OnEventHandler) Unsubscribe {
	c.handlersLock.Lock()
	defer c.handlersLock.Unlock()
	key := c.handlerKey
	c.handlerKey++
	c.eventHandlers[key] = handler
	return func() {
		c.handlersLock.Lock()
		defer c.handlersLock.Unlock()
		delete(c.eventHandlers, key)
	}
}

func (c *clusterCache) getEventHandlers() []OnEventHandler {
	c.handlersLock.Lock()
	defer c.handlersLock.Unlock()
	handlers := make([]OnEventHandler, 0, len(c.eventHandlers))
	for _, h := range c.eventHandlers {
		handlers = append(handlers, h)
	}
	return handlers
}

// OnProcessEventsHandler register event handler that is executed every time when events were processed
func (c *clusterCache) OnProcessEventsHandler(handler OnProcessEventsHandler) Unsubscribe {
	c.handlersLock.Lock()
	defer c.handlersLock.Unlock()
	key := c.handlerKey
	c.handlerKey++
	c.processEventsHandlers[key] = handler
	return func() {
		c.handlersLock.Lock()
		defer c.handlersLock.Unlock()
		delete(c.processEventsHandlers, key)
	}
}

func (c *clusterCache) getProcessEventsHandlers() []OnProcessEventsHandler {
	c.handlersLock.Lock()
	defer c.handlersLock.Unlock()
	handlers := make([]OnProcessEventsHandler, 0, len(c.processEventsHandlers))
	for _, h := range c.processEventsHandlers {
		handlers = append(handlers, h)
	}
	return handlers
}

// GetServerVersion returns observed cluster version
func (c *clusterCache) GetServerVersion() string {
	return c.serverVersion
}

// GetAPIResources returns information about observed API resources
// This method is called frequently during reconciliation to pass API resource info to `helm template`
// NOTE: we do not provide any consistency guarantees about the returned list. The list might be
// updated in place (anytime new CRDs are introduced or removed). If necessary, a separate method
// would need to be introduced to return a copy of the list so it can be iterated consistently.
func (c *clusterCache) GetAPIResources() []kube.APIResourceInfo {
	return c.apiResources
}

// GetOpenAPISchema returns open API schema of supported API resources
func (c *clusterCache) GetOpenAPISchema() openapi.Resources {
	return c.openAPISchema
}

// GetGVKParser returns a parser able to build a TypedValue used in
// structured merge diffs.
func (c *clusterCache) GetGVKParser() *managedfields.GvkParser {
	return c.gvkParser
}

func (c *clusterCache) appendAPIResource(info kube.APIResourceInfo) {
	exists := false
	for i := range c.apiResources {
		if c.apiResources[i].GroupKind == info.GroupKind && c.apiResources[i].GroupVersionResource.Version == info.GroupVersionResource.Version {
			exists = true
			break
		}
	}
	if !exists {
		c.apiResources = append(c.apiResources, info)
	}
}

func (c *clusterCache) deleteAPIResource(info kube.APIResourceInfo) {
	for i := range c.apiResources {
		if c.apiResources[i].GroupKind == info.GroupKind && c.apiResources[i].GroupVersionResource.Version == info.GroupVersionResource.Version {
			c.apiResources[i] = c.apiResources[len(c.apiResources)-1]
			c.apiResources = c.apiResources[:len(c.apiResources)-1]
			break
		}
	}
}

func (c *clusterCache) replaceResourceCache(gk schema.GroupKind, resources []*Resource, ns string) {
	objByKey := make(map[kube.ResourceKey]*Resource)
	for i := range resources {
		objByKey[resources[i].ResourceKey()] = resources[i]
	}

	// update existing nodes
	for i := range resources {
		res := resources[i]
		oldRes := c.resources[res.ResourceKey()]
		if oldRes == nil || oldRes.ResourceVersion != res.ResourceVersion {
			c.onNodeUpdated(oldRes, res)
		}
	}

	for key := range c.resources {
		if key.Kind != gk.Kind || key.Group != gk.Group || ns != "" && key.Namespace != ns {
			continue
		}

		if _, ok := objByKey[key]; !ok {
			c.onNodeRemoved(key)
		}
	}
}

func (c *clusterCache) newResource(un *unstructured.Unstructured) *Resource {
	ownerRefs, isInferredParentOf := c.resolveResourceReferences(un)

	cacheManifest := false
	var info any
	if c.populateResourceInfoHandler != nil {
		info, cacheManifest = c.populateResourceInfoHandler(un, len(ownerRefs) == 0)
	}
	var creationTimestamp *metav1.Time
	ct := un.GetCreationTimestamp()
	if !ct.IsZero() {
		creationTimestamp = &ct
	}
	resource := &Resource{
		ResourceVersion:    un.GetResourceVersion(),
		Ref:                kube.GetObjectRef(un),
		OwnerRefs:          ownerRefs,
		Info:               info,
		CreationTimestamp:  creationTimestamp,
		isInferredParentOf: isInferredParentOf,
	}
	if cacheManifest {
		resource.Resource = un
	}

	return resource
}

func (c *clusterCache) setNode(n *Resource) {
	key := n.ResourceKey()

	// Keep track of existing resource for index updates
	existing := c.resources[key]

	c.resources[key] = n
	ns, ok := c.nsIndex[key.Namespace]
	if !ok {
		ns = make(map[kube.ResourceKey]*Resource)
		c.nsIndex[key.Namespace] = ns
	}
	ns[key] = n

	// Update parent-to-children index for all resources with owner refs
	// This is always done, regardless of sync state, as it's cheap to maintain
	c.updateParentUIDToChildren(key, existing, n)

	// update inferred parent references
	if n.isInferredParentOf != nil || mightHaveInferredOwner(n) {
		for k, v := range ns {
			// update child resource owner references
			if n.isInferredParentOf != nil && mightHaveInferredOwner(v) {
				shouldBeParent := n.isInferredParentOf(k)
				v.setOwnerRef(n.toOwnerRef(), shouldBeParent)
				// Update index inline for inferred ref changes.
				// Note: The removal case (shouldBeParent=false) is currently unreachable for
				// StatefulSet→PVC relationships because Kubernetes makes volumeClaimTemplates
				// immutable. We include it for defensive correctness and future-proofing.
				if n.Ref.UID != "" {
					if shouldBeParent {
						c.addToParentUIDToChildren(n.Ref.UID, k)
					} else {
						c.removeFromParentUIDToChildren(n.Ref.UID, k)
					}
				}
			}
			if mightHaveInferredOwner(n) && v.isInferredParentOf != nil {
				childKey := n.ResourceKey()
				shouldBeParent := v.isInferredParentOf(childKey)
				n.setOwnerRef(v.toOwnerRef(), shouldBeParent)
				// Update index inline for inferred ref changes.
				// Note: The removal case (shouldBeParent=false) is currently unreachable for
				// StatefulSet→PVC relationships because Kubernetes makes volumeClaimTemplates
				// immutable. We include it for defensive correctness and future-proofing.
				if v.Ref.UID != "" {
					if shouldBeParent {
						c.addToParentUIDToChildren(v.Ref.UID, childKey)
					} else {
						c.removeFromParentUIDToChildren(v.Ref.UID, childKey)
					}
				}
			}
		}
	}
}

// addToParentUIDToChildren adds a child to the parent-to-children index
func (c *clusterCache) addToParentUIDToChildren(parentUID types.UID, childKey kube.ResourceKey) {
	// Get or create the set for this parent
	childrenSet := c.parentUIDToChildren[parentUID]
	if childrenSet == nil {
		childrenSet = make(map[kube.ResourceKey]struct{})
		c.parentUIDToChildren[parentUID] = childrenSet
	}
	// Add child to set (O(1) operation, automatically handles duplicates)
	childrenSet[childKey] = struct{}{}
}

// removeFromParentUIDToChildren removes a child from the parent-to-children index
func (c *clusterCache) removeFromParentUIDToChildren(parentUID types.UID, childKey kube.ResourceKey) {
	childrenSet := c.parentUIDToChildren[parentUID]
	if childrenSet == nil {
		return
	}

	// Remove child from set (O(1) operation)
	delete(childrenSet, childKey)

	// Clean up empty sets to avoid memory leaks
	if len(childrenSet) == 0 {
		delete(c.parentUIDToChildren, parentUID)
	}
}

// updateParentUIDToChildren updates the parent-to-children index when a resource's owner refs change
func (c *clusterCache) updateParentUIDToChildren(childKey kube.ResourceKey, oldResource *Resource, newResource *Resource) {
	// Build sets of old and new parent UIDs
	oldParents := make(map[types.UID]struct{})
	if oldResource != nil {
		for _, ref := range oldResource.OwnerRefs {
			if ref.UID != "" {
				oldParents[ref.UID] = struct{}{}
			}
		}
	}

	newParents := make(map[types.UID]struct{})
	for _, ref := range newResource.OwnerRefs {
		if ref.UID != "" {
			newParents[ref.UID] = struct{}{}
		}
	}

	// Remove from parents that are no longer in owner refs
	for oldUID := range oldParents {
		if _, exists := newParents[oldUID]; !exists {
			c.removeFromParentUIDToChildren(oldUID, childKey)
		}
	}

	// Add to parents that are new in owner refs
	for newUID := range newParents {
		if _, exists := oldParents[newUID]; !exists {
			c.addToParentUIDToChildren(newUID, childKey)
		}
	}
}

// Invalidate cache and executes callback that optionally might update cache settings
func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
	c.lock.Lock()
	defer c.lock.Unlock()

	c.syncStatus.lock.Lock()
	c.syncStatus.syncTime = nil
	c.syncStatus.lock.Unlock()

	for i := range c.apisMeta {
		c.apisMeta[i].watchCancel()
	}
	for i := range opts {
		opts[i](c)
	}

	if c.batchEventsProcessing {
		c.invalidateEventMeta()
	}
	c.apisMeta = nil
	c.namespacedResources = nil
	c.log.Info("Invalidated cluster")
}

// clusterCacheSync's lock should be held before calling this method
func (syncStatus *clusterCacheSync) synced(clusterRetryTimeout time.Duration) bool {
	syncTime := syncStatus.syncTime

	if syncTime == nil {
		return false
	}
	if syncStatus.syncError != nil {
		return time.Now().Before(syncTime.Add(clusterRetryTimeout))
	}
	if syncStatus.resyncTimeout == 0 {
		// cluster resync timeout has been disabled
		return true
	}
	return time.Now().Before(syncTime.Add(syncStatus.resyncTimeout))
}

func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if info, ok := c.apisMeta[gk]; ok {
		info.watchCancel()
		delete(c.apisMeta, gk)
		c.replaceResourceCache(gk, nil, ns)
		c.log.Info(fmt.Sprintf("Stop watching: %s not found", gk))
	}
}

// startMissingWatches lists supported cluster resources and starts watching for changes unless watch is already running
func (c *clusterCache) startMissingWatches() error {
	apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
	if err != nil {
		return fmt.Errorf("failed to get APIResources: %w", err)
	}
	client, err := c.kubectl.NewDynamicClient(c.config)
	if err != nil {
		return fmt.Errorf("failed to create client: %w", err)
	}
	clientset, err := kubernetes.NewForConfig(c.config)
	if err != nil {
		return fmt.Errorf("failed to create clientset: %w", err)
	}
	namespacedResources := make(map[schema.GroupKind]bool)
	for i := range apis {
		api := apis[i]
		namespacedResources[api.GroupKind] = api.Meta.Namespaced
		if _, ok := c.apisMeta[api.GroupKind]; !ok {
			ctx, cancel := context.WithCancel(context.Background())
			c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}

			err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
				resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents
				if err != nil && c.isRestrictedResource(err) {
					keep := false
					if c.respectRBAC == RespectRbacStrict {
						k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
						if permErr != nil {
							return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
						}
						keep = k
					}
					// if we are not allowed to list the resource, remove it from the watch list
					if !keep {
						delete(c.apisMeta, api.GroupKind)
						delete(namespacedResources, api.GroupKind)
						return nil
					}
				}
				go c.watchEvents(ctx, api, resClient, ns, resourceVersion)
				return nil
			})
			if err != nil {
				return err
			}
		}
	}
	c.namespacedResources = namespacedResources
	return nil
}

func runSynced(lock sync.Locker, action func() error) error {
	lock.Lock()
	defer lock.Unlock()
	return action()
}

// listResources creates list pager and enforces number of concurrent list requests
// The callback should not wait on any locks that may be held by other callers.
func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.ResourceInterface, callback func(*pager.ListPager) error) (string, error) {
	if err := c.listSemaphore.Acquire(ctx, 1); err != nil {
		return "", fmt.Errorf("failed to acquire list semaphore: %w", err)
	}
	defer c.listSemaphore.Release(1)

	var retryCount int64
	resourceVersion := ""
	listPager := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
		var res *unstructured.UnstructuredList
		var listRetry wait.Backoff

		if c.listRetryUseBackoff {
			listRetry = retry.DefaultBackoff
		} else {
			listRetry = retry.DefaultRetry
		}

		listRetry.Steps = int(c.listRetryLimit)
		err := retry.OnError(listRetry, c.listRetryFunc, func() error {
			var ierr error
			res, ierr = resClient.List(ctx, opts)
			if ierr != nil {
				// Log out a retry
				if c.listRetryLimit > 1 && c.listRetryFunc(ierr) {
					retryCount++
					c.log.Info(fmt.Sprintf("Error while listing resources: %v (try %d/%d)", ierr, retryCount, c.listRetryLimit))
				}
				// Ensure res is never nil even when there's an error
				if res == nil {
					res = &unstructured.UnstructuredList{}
				}
				//nolint:wrapcheck // wrap outside the retry
				return ierr
			}
			resourceVersion = res.GetResourceVersion()
			return nil
		})
		if err != nil {
			return res, fmt.Errorf("failed to list resources: %w", err)
		}
		return res, nil
	})
	listPager.PageBufferSize = c.listPageBufferSize
	listPager.PageSize = c.listPageSize

	return resourceVersion, callback(listPager)
}

// loadInitialState loads the state of all the resources retrieved by the given resource client.
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
	var items []*Resource
	resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
		return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
			if un, ok := obj.(*unstructured.Unstructured); !ok {
				return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
			} else {
				items = append(items, c.newResource(un))
			}
			return nil
		})
	})
	if err != nil {
		return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
	}

	if lock {
		return resourceVersion, runSynced(&c.lock, func() error {
			c.replaceResourceCache(api.GroupKind, items, ns)
			return nil
		})
	}
	c.replaceResourceCache(api.GroupKind, items, ns)
	return resourceVersion, nil
}

func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
	kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
		defer func() {
			if r := recover(); r != nil {
				err = fmt.Errorf("recovered from panic: %+v\n%s", r, debug.Stack())
			}
		}()

		// load API initial state if no resource version provided
		if resourceVersion == "" {
			resourceVersion, err = c.loadInitialState(ctx, api, resClient, ns, true)
			if err != nil {
				return err
			}
		}

		w, err := watchutil.NewRetryWatcherWithContext(ctx, resourceVersion, &cache.ListWatch{
			WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
				res, err := resClient.Watch(ctx, options)
				if apierrors.IsNotFound(err) {
					c.stopWatching(api.GroupKind, ns)
				}
				//nolint:wrapcheck // wrap outside the retry
				return res, err
			},
		})
		if err != nil {
			return fmt.Errorf("failed to create resource watcher: %w", err)
		}

		defer func() {
			w.Stop()
			resourceVersion = ""
		}()

		var watchResyncTimeoutCh <-chan time.Time
		if c.watchResyncTimeout > 0 {
			shouldResync := time.NewTimer(c.watchResyncTimeout)
			defer shouldResync.Stop()
			watchResyncTimeoutCh = shouldResync.C
		}

		for {
			select {
			// stop watching when parent context got cancelled
			case <-ctx.Done():
				return nil

			// re-synchronize API state and restart watch periodically
			case <-watchResyncTimeoutCh:
				return fmt.Errorf("resyncing %s on %s due to timeout", api.GroupKind, c.config.Host)

			// re-synchronize API state and restart watch if retry watcher failed to continue watching using provided resource version
			case <-w.Done():
				return fmt.Errorf("watch %s on %s has closed", api.GroupKind, c.config.Host)

			case event, ok := <-w.ResultChan():
				if !ok {
					return fmt.Errorf("watch %s on %s has closed", api.GroupKind, c.config.Host)
				}

				obj, ok := event.Object.(*unstructured.Unstructured)
				if !ok {
					return fmt.Errorf("failed to convert to *unstructured.Unstructured: %v", event.Object)
				}

				c.recordEvent(event.Type, obj)
				if kube.IsCRD(obj) {
					var resources []kube.APIResourceInfo
					crd := apiextensionsv1.CustomResourceDefinition{}
					err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &crd)
					if err != nil {
						c.log.Error(err, "Failed to extract CRD resources")
					}
					for _, v := range crd.Spec.Versions {
						resources = append(resources, kube.APIResourceInfo{
							GroupKind: schema.GroupKind{
								Group: crd.Spec.Group, Kind: crd.Spec.Names.Kind,
							},
							GroupVersionResource: schema.GroupVersionResource{
								Group: crd.Spec.Group, Version: v.Name, Resource: crd.Spec.Names.Plural,
							},
							Meta: metav1.APIResource{
								Group:        crd.Spec.Group,
								SingularName: crd.Spec.Names.Singular,
								Namespaced:   crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
								Name:         crd.Spec.Names.Plural,
								Kind:         crd.Spec.Names.Singular,
								Version:      v.Name,
								ShortNames:   crd.Spec.Names.ShortNames,
							},
						})
					}

					if event.Type == watch.Deleted {
						for i := range resources {
							c.deleteAPIResource(resources[i])
						}
					} else {
						c.log.Info("Updating Kubernetes APIs, watches, and Open API schemas due to CRD event", "eventType", event.Type, "groupKind", crd.GroupVersionKind().GroupKind().String())
						// add new CRD's groupkind to c.apigroups
						if event.Type == watch.Added {
							for i := range resources {
								c.appendAPIResource(resources[i])
							}
						}
						err = runSynced(&c.lock, func() error {
							return c.startMissingWatches()
						})
						if err != nil {
							c.log.Error(err, "Failed to start missing watch")
						}
					}
					err = runSynced(&c.lock, func() error {
						openAPISchema, gvkParser, err := c.kubectl.LoadOpenAPISchema(c.config)
						if err != nil {
							return fmt.Errorf("failed to load open api schema while handling CRD change: %w", err)
						}
						if gvkParser != nil {
							c.gvkParser = gvkParser
						}
						c.openAPISchema = openAPISchema
						return nil
					})
					if err != nil {
						c.log.Error(err, "Failed to reload open api schema")
					}
				}
			}
		}
	})
}

// processApi processes all the resources for a given API. First we construct an API client for the given API. Then we
// call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace.
// If we're managing specific namespaces, we call the callback for each namespace.
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
	resClient := client.Resource(api.GroupVersionResource)
	switch {
	// if manage whole cluster or resource is cluster level and cluster resources enabled
	case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
		return callback(resClient, "")
	// if manage some namespaces and resource is namespaced
	case len(c.namespaces) != 0 && api.Meta.Namespaced:
		for _, ns := range c.namespaces {
			err := callback(resClient.Namespace(ns), ns)
			if err != nil {
				return err
			}
		}
	}

	return nil
}

// isRestrictedResource checks if the kube api call is unauthorized or forbidden
func (c *clusterCache) isRestrictedResource(err error) bool {
	return c.respectRBAC != RespectRbacDisabled && (apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err))
}

// checkPermission runs a self subject access review to check if the controller has permissions to list the resource
func (c *clusterCache) checkPermission(ctx context.Context, reviewInterface authType1.SelfSubjectAccessReviewInterface, api kube.APIResourceInfo) (keep bool, err error) {
	sar := &authorizationv1.SelfSubjectAccessReview{
		Spec: authorizationv1.SelfSubjectAccessReviewSpec{
			ResourceAttributes: &authorizationv1.ResourceAttributes{
				Namespace: "*",
				Verb:      "list", // uses list verb to check for permissions
				Resource:  api.GroupVersionResource.Resource,
			},
		},
	}

	switch {
	// if manage whole cluster or resource is cluster level and cluster resources enabled
	case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
		resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
		if err != nil {
			return false, fmt.Errorf("failed to create self subject access review: %w", err)
		}
		if resp != nil && resp.Status.Allowed {
			return true, nil
		}
		// unsupported, remove from watch list
		return false, nil
	// if manage some namespaces and resource is namespaced
	case len(c.namespaces) != 0 && api.Meta.Namespaced:
		for _, ns := range c.namespaces {
			sar.Spec.ResourceAttributes.Namespace = ns
			resp, err := reviewInterface.Create(ctx, sar, metav1.CreateOptions{})
			if err != nil {
				return false, fmt.Errorf("failed to create self subject access review: %w", err)
			}
			if resp != nil && resp.Status.Allowed {
				return true, nil
			}
			// unsupported, remove from watch list
			//nolint:staticcheck //FIXME
			return false, nil
		}
	}
	// checkPermission follows the same logic of determining namespace/cluster resource as the processApi function
	// so if neither of the cases match it means the controller will not watch for it so it is safe to return true.
	return true, nil
}

// sync retrieves the current state of the cluster and stores relevant information in the clusterCache fields.
//
// First we get some metadata from the cluster, like the server version, OpenAPI document, and the list of all API
// resources.
//
// Then we get a list of the preferred versions of all API resources which are to be monitored (it's possible to exclude
// resources from monitoring). We loop through those APIs asynchronously and for each API we list all resources. We also
// kick off a goroutine to watch the resources for that API and update the cache constantly.
//
// When this function exits, the cluster cache is up to date, and the appropriate resources are being watched for
// changes.
func (c *clusterCache) sync() error {
	c.log.Info("Start syncing cluster")

	for i := range c.apisMeta {
		c.apisMeta[i].watchCancel()
	}

	if c.batchEventsProcessing {
		c.invalidateEventMeta()
		c.eventMetaCh = make(chan eventMeta)
	}

	c.apisMeta = make(map[schema.GroupKind]*apiMeta)
	c.resources = make(map[kube.ResourceKey]*Resource)
	c.namespacedResources = make(map[schema.GroupKind]bool)
	c.parentUIDToChildren = make(map[types.UID]map[kube.ResourceKey]struct{})
	config := c.config
	version, err := c.kubectl.GetServerVersion(config)
	if err != nil {
		return fmt.Errorf("failed to get server version: %w", err)
	}
	c.serverVersion = version
	apiResources, err := c.kubectl.GetAPIResources(config, false, NewNoopSettings())
	if err != nil {
		return fmt.Errorf("failed to get api resources: %w", err)
	}
	c.apiResources = apiResources

	openAPISchema, gvkParser, err := c.kubectl.LoadOpenAPISchema(config)
	if err != nil {
		return fmt.Errorf("failed to load open api schema while syncing cluster cache: %w", err)
	}

	if gvkParser != nil {
		c.gvkParser = gvkParser
	}

	c.openAPISchema = openAPISchema

	apis, err := c.kubectl.GetAPIResources(c.config, true, c.settings.ResourcesFilter)
	if err != nil {
		return fmt.Errorf("failed to get api resources: %w", err)
	}
	client, err := c.kubectl.NewDynamicClient(c.config)
	if err != nil {
		return fmt.Errorf("failed to create client: %w", err)
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		return fmt.Errorf("failed to create clientset: %w", err)
	}

	if c.batchEventsProcessing {
		go c.processEvents()
	}

	// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
	lock := sync.Mutex{}
	err = kube.RunAllAsync(len(apis), func(i int) error {
		api := apis[i]

		lock.Lock()
		ctx, cancel := context.WithCancel(context.Background())
		info := &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}
		c.apisMeta[api.GroupKind] = info
		c.namespacedResources[api.GroupKind] = api.Meta.Namespaced
		lock.Unlock()

		return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
			resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
				return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
					if un, ok := obj.(*unstructured.Unstructured); !ok {
						return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
					} else {
						newRes := c.newResource(un)
						lock.Lock()
						c.setNode(newRes)
						lock.Unlock()
					}
					return nil
				})
			})
			if err != nil {
				if c.isRestrictedResource(err) {
					keep := false
					if c.respectRBAC == RespectRbacStrict {
						k, permErr := c.checkPermission(ctx, clientset.AuthorizationV1().SelfSubjectAccessReviews(), api)
						if permErr != nil {
							return fmt.Errorf("failed to check permissions for resource %s: %w, original error=%v", api.GroupKind.String(), permErr, err.Error())
						}
						keep = k
					}
					// if we are not allowed to list the resource, remove it from the watch list
					if !keep {
						lock.Lock()
						delete(c.apisMeta, api.GroupKind)
						delete(c.namespacedResources, api.GroupKind)
						lock.Unlock()
						return nil
					}
				}
				return fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
			}

			go c.watchEvents(ctx, api, resClient, ns, resourceVersion)

			return nil
		})
	})
	if err != nil {
		c.log.Error(err, "Failed to sync cluster")
		return fmt.Errorf("failed to sync cluster %s: %w", c.config.Host, err)
	}

	c.log.Info("Cluster successfully synced")
	return nil
}

// invalidateEventMeta closes the eventMeta channel if it is open
func (c *clusterCache) invalidateEventMeta() {
	if c.eventMetaCh != nil {
		close(c.eventMetaCh)
		c.eventMetaCh = nil
	}
}

// EnsureSynced checks cache state and synchronizes it if necessary
func (c *clusterCache) EnsureSynced() error {
	syncStatus := &c.syncStatus

	// first check if cluster is synced *without acquiring the full clusterCache lock*
	syncStatus.lock.Lock()
	if syncStatus.synced(c.clusterSyncRetryTimeout) {
		syncError := syncStatus.syncError
		syncStatus.lock.Unlock()
		return syncError
	}
	syncStatus.lock.Unlock() // release the lock, so that we can acquire the parent lock (see struct comment re: lock acquisition ordering)

	c.lock.Lock()
	defer c.lock.Unlock()
	syncStatus.lock.Lock()
	defer syncStatus.lock.Unlock()

	// before doing any work, check once again now that we have the lock, to see if it got
	// synced between the first check and now
	if syncStatus.synced(c.clusterSyncRetryTimeout) {
		return syncStatus.syncError
	}
	err := c.sync()
	syncTime := time.Now()
	syncStatus.syncTime = &syncTime
	syncStatus.syncError = err
	return syncStatus.syncError
}

func (c *clusterCache) FindResources(namespace string, predicates ...func(r *Resource) bool) map[kube.ResourceKey]*Resource {
	c.lock.RLock()
	defer c.lock.RUnlock()
	result := map[kube.ResourceKey]*Resource{}
	resources := map[kube.ResourceKey]*Resource{}
	if namespace != "" {
		if ns, ok := c.nsIndex[namespace]; ok {
			resources = ns
		}
	} else {
		resources = c.resources
	}

	for k := range resources {
		r := resources[k]
		matches := true
		for i := range predicates {
			if !predicates[i](r) {
				matches = false
				break
			}
		}

		if matches {
			result[k] = r
		}
	}
	return result
}

// IterateHierarchyV2 iterates through the hierarchy of resources starting from the given keys.
// It efficiently traverses parent-child relationships, including cross-namespace relationships
// between cluster-scoped parents and namespaced children, using pre-computed indexes.
func (c *clusterCache) IterateHierarchyV2(keys []kube.ResourceKey, action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	// Track whether action() has been called on each resource (notCalled/inProgress/completed).
	// This is shared across processNamespaceHierarchy and processCrossNamespaceChildren.
	// Note: This is distinct from 'crossNSTraversed' in processCrossNamespaceChildren, which tracks
	// whether we've traversed a cluster-scoped key's cross-namespace children.
	actionCallState := make(map[kube.ResourceKey]callState)

	// Group keys by namespace for efficient processing
	keysPerNamespace := make(map[string][]kube.ResourceKey)
	for _, key := range keys {
		_, ok := c.resources[key]
		if !ok {
			continue
		}
		keysPerNamespace[key.Namespace] = append(keysPerNamespace[key.Namespace], key)
	}

	// Process namespaced resources with standard hierarchy
	for namespace, namespaceKeys := range keysPerNamespace {
		nsNodes := c.nsIndex[namespace]
		graph := buildGraph(nsNodes)
		c.processNamespaceHierarchy(namespaceKeys, nsNodes, graph, actionCallState, action)
	}

	// Process pre-computed cross-namespace children
	if clusterKeys, ok := keysPerNamespace[""]; ok {
		// Track which cluster-scoped keys have had their cross-namespace children traversed.
		// This is distinct from 'actionCallState' - a resource may have had action() called
		// (i.e., its actionCallState is in the completed state) but not yet had its cross-namespace
		// children traversed. This prevents infinite recursion when resources have circular
		// ownerReferences.
		crossNSTraversed := make(map[kube.ResourceKey]bool)
		c.processCrossNamespaceChildren(clusterKeys, actionCallState, crossNSTraversed, action)
	}
}

// processCrossNamespaceChildren processes namespaced children of cluster-scoped resources
// This enables traversing from cluster-scoped parents to their namespaced children across namespace boundaries.
// It also handles multi-level hierarchies where cluster-scoped resources own other cluster-scoped resources
// that in turn own namespaced resources (e.g., Provider -> ProviderRevision -> Deployment in Crossplane).
// The crossNSTraversed map tracks which keys have already been processed to prevent infinite recursion
// from circular ownerReferences (e.g., a resource that owns itself).
func (c *clusterCache) processCrossNamespaceChildren(
	clusterScopedKeys []kube.ResourceKey,
	actionCallState map[kube.ResourceKey]callState,
	crossNSTraversed map[kube.ResourceKey]bool,
	action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool,
) {
	for _, clusterKey := range clusterScopedKeys {
		// Skip if already processed (cycle detection)
		if crossNSTraversed[clusterKey] {
			continue
		}
		crossNSTraversed[clusterKey] = true

		// Get cluster-scoped resource to access its UID
		clusterResource := c.resources[clusterKey]
		if clusterResource == nil {
			continue
		}

		// Use parent-to-children index for O(1) lookup of direct children
		childrenSet := c.parentUIDToChildren[clusterResource.Ref.UID]
		for childKey := range childrenSet {
			child := c.resources[childKey]
			if child == nil {
				continue
			}

			alreadyProcessed := actionCallState[childKey] != notCalled

			// If child is cluster-scoped and action() was already called by processNamespaceHierarchy,
			// we still need to recursively check for its cross-namespace children.
			// This handles multi-level hierarchies like: ClusterScoped -> ClusterScoped -> Namespaced
			// (e.g., Crossplane's Provider -> ProviderRevision -> Deployment)
			if alreadyProcessed {
				if childKey.Namespace == "" {
					// Recursively process cross-namespace children of this cluster-scoped child
					// The crossNSTraversed map prevents infinite recursion on circular ownerReferences
					c.processCrossNamespaceChildren([]kube.ResourceKey{childKey}, actionCallState, crossNSTraversed, action)
				}
				continue
			}

			// Get namespace nodes for this child
			nsNodes := c.nsIndex[childKey.Namespace]
			if nsNodes == nil {
				continue
			}

			// Process this child
			if action(child, nsNodes) {
				actionCallState[childKey] = inProgress
				// Recursively process descendants using index-based traversal
				c.iterateChildrenUsingIndex(child, nsNodes, actionCallState, action)

				// If this child is also cluster-scoped, recursively process its cross-namespace children
				if childKey.Namespace == "" {
					c.processCrossNamespaceChildren([]kube.ResourceKey{childKey}, actionCallState, crossNSTraversed, action)
				}

				actionCallState[childKey] = completed
			}
		}
	}
}

// iterateChildrenUsingIndex recursively processes a resource's children using the parentUIDToChildren index
// This replaces graph-based traversal with O(1) index lookups
func (c *clusterCache) iterateChildrenUsingIndex(
	parent *Resource,
	nsNodes map[kube.ResourceKey]*Resource,
	actionCallState map[kube.ResourceKey]callState,
	action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool,
) {
	// Look up direct children of this parent using the index
	childrenSet := c.parentUIDToChildren[parent.Ref.UID]
	for childKey := range childrenSet {
		if actionCallState[childKey] != notCalled {
			continue // action() already called or in progress
		}

		child := c.resources[childKey]
		if child == nil {
			continue
		}

		// Only process children in the same namespace (for within-namespace traversal)
		// Cross-namespace children are handled by the outer loop in processCrossNamespaceChildren
		if child.Ref.Namespace != parent.Ref.Namespace {
			continue
		}

		if action(child, nsNodes) {
			actionCallState[childKey] = inProgress
			// Recursively process this child's descendants
			c.iterateChildrenUsingIndex(child, nsNodes, actionCallState, action)
			actionCallState[childKey] = completed
		}
	}
}

// processNamespaceHierarchy processes hierarchy for keys within a single namespace
func (c *clusterCache) processNamespaceHierarchy(
	namespaceKeys []kube.ResourceKey,
	nsNodes map[kube.ResourceKey]*Resource,
	graph map[kube.ResourceKey]map[types.UID]*Resource,
	actionCallState map[kube.ResourceKey]callState,
	action func(resource *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool,
) {
	for _, key := range namespaceKeys {
		res := c.resources[key]
		if actionCallState[key] == completed || !action(res, nsNodes) {
			continue
		}
		actionCallState[key] = inProgress
		if _, ok := graph[key]; ok {
			for _, child := range graph[key] {
				if actionCallState[child.ResourceKey()] == notCalled && action(child, nsNodes) {
					child.iterateChildrenV2(graph, nsNodes, actionCallState, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) bool {
						if err != nil {
							c.log.V(2).Info(err.Error())
							return false
						}
						return action(child, namespaceResources)
					})
				}
			}
		}
		actionCallState[key] = completed
	}
}

func buildGraph(nsNodes map[kube.ResourceKey]*Resource) map[kube.ResourceKey]map[types.UID]*Resource {
	// Prepare to construct a graph
	nodesByUID := make(map[types.UID][]*Resource, len(nsNodes))
	for _, node := range nsNodes {
		nodesByUID[node.Ref.UID] = append(nodesByUID[node.Ref.UID], node)
	}

	// In graph, the key is the parent and the value is a list of children.
	graph := make(map[kube.ResourceKey]map[types.UID]*Resource)

	// Loop through all nodes, calling each one "childNode," because we're only bothering with it if it has a parent.
	for _, childNode := range nsNodes {
		for i, ownerRef := range childNode.OwnerRefs {
			// First, backfill UID of inferred owner child references.
			if ownerRef.UID == "" {
				group, err := schema.ParseGroupVersion(ownerRef.APIVersion)
				if err != nil {
					// APIVersion is invalid, so we couldn't find the parent.
					continue
				}
				graphKeyNode, ok := nsNodes[kube.ResourceKey{Group: group.Group, Kind: ownerRef.Kind, Namespace: childNode.Ref.Namespace, Name: ownerRef.Name}]
				if !ok {
					// No resource found with the given graph key, so move on.
					continue
				}
				ownerRef.UID = graphKeyNode.Ref.UID
				childNode.OwnerRefs[i] = ownerRef
			}

			// Now that we have the UID of the parent, update the graph.
			uidNodes, ok := nodesByUID[ownerRef.UID]
			if ok {
				for _, uidNode := range uidNodes {
					// Cache ResourceKey() to avoid repeated expensive calls
					uidNodeKey := uidNode.ResourceKey()
					// Update the graph for this owner to include the child.
					if _, ok := graph[uidNodeKey]; !ok {
						graph[uidNodeKey] = make(map[types.UID]*Resource)
					}
					r, ok := graph[uidNodeKey][childNode.Ref.UID]
					if !ok {
						graph[uidNodeKey][childNode.Ref.UID] = childNode
					} else if r != nil {
						// The object might have multiple children with the same UID (e.g. replicaset from apps and extensions group).
						// It is ok to pick any object, but we need to make sure we pick the same child after every refresh.
						key1 := r.ResourceKey()
						key2 := childNode.ResourceKey()
						if strings.Compare(key1.String(), key2.String()) > 0 {
							graph[uidNodeKey][childNode.Ref.UID] = childNode
						}
					}
				}
			}
		}
	}
	return graph
}

// IsNamespaced answers if specified group/kind is a namespaced resource API or not
func (c *clusterCache) IsNamespaced(gk schema.GroupKind) (bool, error) {
	if isNamespaced, ok := c.namespacedResources[gk]; ok {
		return isNamespaced, nil
	}
	return false, apierrors.NewNotFound(schema.GroupResource{Group: gk.Group}, "")
}

func (c *clusterCache) managesNamespace(namespace string) bool {
	for _, ns := range c.namespaces {
		if ns == namespace {
			return true
		}
	}
	return false
}

// GetManagedLiveObjs helps finding matching live K8S resources for a given resources list.
// The function returns all resources from cache for those `isManaged` function returns true and resources
// specified in targetObjs list.
func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructured, isManaged func(r *Resource) bool) (map[kube.ResourceKey]*unstructured.Unstructured, error) {
	c.lock.RLock()
	defer c.lock.RUnlock()

	for _, o := range targetObjs {
		if len(c.namespaces) > 0 {
			if o.GetNamespace() == "" && !c.clusterResources {
				return nil, fmt.Errorf("cluster level %s %q can not be managed when in namespaced mode", o.GetKind(), o.GetName())
			} else if o.GetNamespace() != "" && !c.managesNamespace(o.GetNamespace()) {
				return nil, fmt.Errorf("namespace %q for %s %q is not managed", o.GetNamespace(), o.GetKind(), o.GetName())
			}
		}
	}

	managedObjs := make(map[kube.ResourceKey]*unstructured.Unstructured)
	// iterate all objects in live state cache to find ones associated with app
	for key, o := range c.resources {
		if isManaged(o) && o.Resource != nil && len(o.OwnerRefs) == 0 {
			managedObjs[key] = o.Resource
		}
	}
	// but are simply missing our label
	lock := &sync.Mutex{}
	err := kube.RunAllAsync(len(targetObjs), func(i int) error {
		targetObj := targetObjs[i]
		key := kube.GetResourceKey(targetObj)
		lock.Lock()
		managedObj := managedObjs[key]
		lock.Unlock()

		if managedObj == nil {
			if existingObj, exists := c.resources[key]; exists {
				if existingObj.Resource != nil {
					managedObj = existingObj.Resource
				} else {
					var err error
					managedObj, err = c.kubectl.GetResource(context.TODO(), c.config, targetObj.GroupVersionKind(), existingObj.Ref.Name, existingObj.Ref.Namespace)
					if err != nil {
						if apierrors.IsNotFound(err) {
							return nil
						}
						return fmt.Errorf("unexpected error getting managed object: %w", err)
					}
				}
			} else if _, watched := c.apisMeta[key.GroupKind()]; !watched {
				var err error
				managedObj, err = c.kubectl.GetResource(context.TODO(), c.config, targetObj.GroupVersionKind(), targetObj.GetName(), targetObj.GetNamespace())
				if err != nil {
					if apierrors.IsNotFound(err) {
						return nil
					}
					return fmt.Errorf("unexpected error getting managed object: %w", err)
				}
			}
		}

		if managedObj != nil {
			converted, err := c.kubectl.ConvertToVersion(managedObj, targetObj.GroupVersionKind().Group, targetObj.GroupVersionKind().Version)
			if err != nil {
				// fallback to loading resource from kubernetes if conversion fails
				c.log.V(1).Info(fmt.Sprintf("Failed to convert resource: %v", err))
				managedObj, err = c.kubectl.GetResource(context.TODO(), c.config, targetObj.GroupVersionKind(), managedObj.GetName(), managedObj.GetNamespace())
				if err != nil {
					if apierrors.IsNotFound(err) {
						return nil
					}
					return fmt.Errorf("unexpected error getting managed object: %w", err)
				}
			} else {
				managedObj = converted
			}
			lock.Lock()
			managedObjs[key] = managedObj
			lock.Unlock()
		}
		return nil
	})
	if err != nil {
		return nil, fmt.Errorf("failed to get managed objects: %w", err)
	}

	return managedObjs, nil
}

func (c *clusterCache) recordEvent(event watch.EventType, un *unstructured.Unstructured) {
	for _, h := range c.getEventHandlers() {
		h(event, un)
	}
	key := kube.GetResourceKey(un)
	if event == watch.Modified && skipAppRequeuing(key) {
		return
	}

	if c.batchEventsProcessing {
		c.eventMetaCh <- eventMeta{event, un}
	} else {
		c.lock.Lock()
		defer c.lock.Unlock()
		c.processEvent(key, eventMeta{event, un})
	}
}

func (c *clusterCache) processEvents() {
	log := c.log.WithValues("functionName", "processItems")
	log.V(1).Info("Start processing events")

	c.lock.Lock()
	ch := c.eventMetaCh
	c.lock.Unlock()

	eventMetas := make([]eventMeta, 0)
	ticker := time.NewTicker(c.eventProcessingInterval)
	defer ticker.Stop()

	for {
		select {
		case evMeta, ok := <-ch:
			if !ok {
				log.V(2).Info("Event processing channel closed, finish processing")
				return
			}
			eventMetas = append(eventMetas, evMeta)
		case <-ticker.C:
			if len(eventMetas) > 0 {
				c.processEventsBatch(eventMetas)
				eventMetas = eventMetas[:0]
			}
		}
	}
}

func (c *clusterCache) processEventsBatch(eventMetas []eventMeta) {
	log := c.log.WithValues("functionName", "processEventsBatch")
	start := time.Now()
	c.lock.Lock()
	log.V(1).Info("Lock acquired (ms)", "duration", time.Since(start).Milliseconds())
	defer func() {
		c.lock.Unlock()
		duration := time.Since(start)
		// Update the metric with the duration of the events processing
		for _, handler := range c.getProcessEventsHandlers() {
			handler(duration, len(eventMetas))
		}
	}()

	for _, evMeta := range eventMetas {
		key := kube.GetResourceKey(evMeta.un)
		c.processEvent(key, evMeta)
	}

	log.V(1).Info("Processed events (ms)", "count", len(eventMetas), "duration", time.Since(start).Milliseconds())
}

func (c *clusterCache) processEvent(key kube.ResourceKey, evMeta eventMeta) {
	existingNode, exists := c.resources[key]
	if evMeta.event == watch.Deleted {
		if exists {
			c.onNodeRemoved(key)
		}
	} else {
		c.onNodeUpdated(existingNode, c.newResource(evMeta.un))
	}
}

func (c *clusterCache) onNodeUpdated(oldRes *Resource, newRes *Resource) {
	c.setNode(newRes)
	for _, h := range c.getResourceUpdatedHandlers() {
		h(newRes, oldRes, c.nsIndex[newRes.Ref.Namespace])
	}
}

func (c *clusterCache) onNodeRemoved(key kube.ResourceKey) {
	existing, ok := c.resources[key]
	if ok {
		delete(c.resources, key)
		ns, ok := c.nsIndex[key.Namespace]
		if ok {
			delete(ns, key)
			if len(ns) == 0 {
				delete(c.nsIndex, key.Namespace)
			}
			// remove ownership references from children with inferred references
			if existing.isInferredParentOf != nil {
				for k, v := range ns {
					if mightHaveInferredOwner(v) && existing.isInferredParentOf(k) {
						v.setOwnerRef(existing.toOwnerRef(), false)
						// Update index inline when removing inferred ref
						if existing.Ref.UID != "" {
							c.removeFromParentUIDToChildren(existing.Ref.UID, k)
						}
					}
				}
			}
		}

		// Clean up parent-to-children index
		for _, ownerRef := range existing.OwnerRefs {
			if ownerRef.UID != "" {
				c.removeFromParentUIDToChildren(ownerRef.UID, key)
			}
		}

		for _, h := range c.getResourceUpdatedHandlers() {
			h(nil, existing, ns)
		}
	}
}

var ignoredRefreshResources = map[string]bool{
	"/" + kube.EndpointsKind: true,
}

// GetClusterInfo returns cluster cache statistics
func (c *clusterCache) GetClusterInfo() ClusterInfo {
	c.lock.RLock()
	defer c.lock.RUnlock()
	c.syncStatus.lock.Lock()
	defer c.syncStatus.lock.Unlock()

	return ClusterInfo{
		APIsCount:         len(c.apisMeta),
		K8SVersion:        c.serverVersion,
		ResourcesCount:    len(c.resources),
		Server:            c.config.Host,
		LastCacheSyncTime: c.syncStatus.syncTime,
		SyncError:         c.syncStatus.syncError,
		APIResources:      c.apiResources,
	}
}

// skipAppRequeuing checks if the object is an API type which we want to skip requeuing against.
// We ignore API types which have a high churn rate, and/or whose updates are irrelevant to the app
func skipAppRequeuing(key kube.ResourceKey) bool {
	return ignoredRefreshResources[key.Group+"/"+key.Kind]
}
