diff --git a/controller/two_lane_queue_test.go b/controller/two_lane_queue_test.go index 187cd2fb11..46a18ce39e 100644 --- a/controller/two_lane_queue_test.go +++ b/controller/two_lane_queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package controller import ( + "context" "strconv" "testing" "time" @@ -73,7 +74,7 @@ func TestRateLimit(t *testing.T) { } // Verify the items were properly added for consumption. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return q.Len() == 2, nil }) != nil { t.Error("Queue length was never 2") @@ -89,7 +90,7 @@ func TestSlowQueue(t *testing.T) { q := newTwoLaneWorkQueue("live-in-the-fast-lane", workqueue.DefaultControllerRateLimiter()) q.SlowLane().Add("1") // Queue has async moving parts so if we check at the wrong moment, this might still be 0. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return q.Len() == 1, nil }) != nil { t.Error("Queue length was never 1") diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 25562e965e..84bb27fbb3 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -74,7 +74,7 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) { // These timeout and retry interval are set by heuristics. // e.g. istio sidecar needs a few seconds to configure the pod network. var lastErr error - if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 5*time.Second, true, func(ctx context.Context) (bool, error) { loggingConfigMap, lastErr = kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, logging.ConfigMapName(), metav1.GetOptions{}) return lastErr == nil || apierrors.IsNotFound(lastErr), nil }); err != nil { diff --git a/metrics/e2e_test.go b/metrics/e2e_test.go index 67ea352c3c..b776ca6227 100644 --- a/metrics/e2e_test.go +++ b/metrics/e2e_test.go @@ -153,7 +153,7 @@ func TestMetricsExport(t *testing.T) { return err } // Wait for the webserver to actually start serving metrics - return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + return wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) { resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", prometheusPort)) return err == nil && resp.StatusCode == http.StatusOK, nil }) diff --git a/metrics/resource_view_test.go b/metrics/resource_view_test.go index f804866576..f64c96a7a4 100644 --- a/metrics/resource_view_test.go +++ b/metrics/resource_view_test.go @@ -17,6 +17,7 @@ limitations under the License. package metrics import ( + "context" "fmt" "testing" "time" @@ -173,7 +174,7 @@ func TestAllMetersExpiration(t *testing.T) { // Expire the second entry fakeClock.Step(9 * time.Minute) // t+12m - _ = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) { + _ = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { // Non-expiring defaultMeter should be available along with the non-expired entry allMeters.lock.Lock() defer allMeters.lock.Unlock() @@ -244,7 +245,7 @@ func TestIfAllMeterResourcesAreRemoved(t *testing.T) { // Expire all meters and views // We need to unlock before we move the clock ahead in time fakeClock.Step(12 * time.Minute) // t+12m - _ = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) { + _ = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { // Non-expiring defaultMeter should be available allMeters.lock.Lock() defer allMeters.lock.Unlock() diff --git a/reconciler/testing/context.go b/reconciler/testing/context.go index a1ea75903c..58b3b8d29a 100644 --- a/reconciler/testing/context.go +++ b/reconciler/testing/context.go @@ -120,7 +120,7 @@ func RunAndSyncInformers(ctx context.Context, informers ...controller.Informer) return wf, err } - err = wait.PollImmediate(time.Microsecond, wait.ForeverTestTimeout, func() (bool, error) { + err = wait.PollUntilContextTimeout(ctx, time.Microsecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) { if watchesPending.Load() == 0 { return true, nil } diff --git a/test/cleanup_test.go b/test/cleanup_test.go index e90ac5a340..145e56bccb 100644 --- a/test/cleanup_test.go +++ b/test/cleanup_test.go @@ -18,6 +18,7 @@ package test import ( "bytes" + "context" "errors" "fmt" "os" @@ -66,7 +67,7 @@ func TestCleanupOnInterrupt(t *testing.T) { // poll until the ready file is gone - indicating the subtest has been set up // with the cleanup functions - err = wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 2*time.Second, true, func(ctx context.Context) (bool, error) { _, err := os.Stat(readyFile.Name()) if os.IsNotExist(err) { return true, nil diff --git a/test/ha/ha.go b/test/ha/ha.go index 648cfd51c9..eb991fcf89 100644 --- a/test/ha/ha.go +++ b/test/ha/ha.go @@ -78,7 +78,7 @@ func WaitForNewLeaders(ctx context.Context, t *testing.T, client kubernetes.Inte defer span.End() var leaders sets.Set[string] - err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + err := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) { currLeaders, err := GetLeaders(ctx, t, client, deploymentName, namespace) if err != nil { return false, err @@ -105,7 +105,7 @@ func WaitForNewLeader(ctx context.Context, client kubernetes.Interface, lease, n span := logging.GetEmitableSpan(ctx, "WaitForNewLeader/"+lease) defer span.End() var leader string - err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + err := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) { lease, err := client.CoordinationV1().Leases(namespace).Get(ctx, lease, metav1.GetOptions{}) if err != nil { return false, fmt.Errorf("error getting lease %s: %w", lease, err) diff --git a/test/kube_checks.go b/test/kube_checks.go index 3e295a55f3..3b2b7101a0 100644 --- a/test/kube_checks.go +++ b/test/kube_checks.go @@ -53,7 +53,7 @@ func WaitForDeploymentState(ctx context.Context, client kubernetes.Interface, na span := logging.GetEmitableSpan(ctx, fmt.Sprintf("WaitForDeploymentState/%s/%s", name, desc)) defer span.End() var lastState *appsv1.Deployment - waitErr := wait.PollImmediate(interval, timeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { var err error lastState, err = d.Get(ctx, name, metav1.GetOptions{}) if err != nil { @@ -78,7 +78,7 @@ func WaitForPodListState(ctx context.Context, client kubernetes.Interface, inSta defer span.End() var lastState *corev1.PodList - waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, interval, podTimeout, true, func(ctx context.Context) (bool, error) { var err error lastState, err = p.List(ctx, metav1.ListOptions{}) if err != nil { @@ -103,7 +103,7 @@ func WaitForPodState(ctx context.Context, client kubernetes.Interface, inState f defer span.End() var lastState *corev1.Pod - waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, interval, podTimeout, true, func(ctx context.Context) (bool, error) { var err error lastState, err = p.Get(ctx, name, metav1.GetOptions{}) if err != nil { @@ -139,7 +139,7 @@ func WaitForServiceEndpoints(ctx context.Context, client kubernetes.Interface, s defer span.End() var endpoints *corev1.Endpoints - waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, interval, podTimeout, true, func(ctx context.Context) (bool, error) { var err error endpoints, err = endpointsService.Get(ctx, svcName, metav1.GetOptions{}) if apierrs.IsNotFound(err) { @@ -186,7 +186,7 @@ func GetEndpointAddresses(ctx context.Context, client kubernetes.Interface, svcN // WaitForChangedEndpoints waits until the endpoints for the given service differ from origEndpoints. func WaitForChangedEndpoints(ctx context.Context, client kubernetes.Interface, svcName, svcNamespace string, origEndpoints []string) error { var newEndpoints []string - waitErr := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, 1*time.Second, 2*time.Minute, true, func(ctx context.Context) (bool, error) { var err error newEndpoints, err = GetEndpointAddresses(ctx, client, svcName, svcNamespace) return !cmp.Equal(origEndpoints, newEndpoints), err @@ -213,7 +213,7 @@ func DeploymentScaledToZeroFunc() func(d *appsv1.Deployment) (bool, error) { // If the content is not present within timeout it returns error. func WaitForLogContent(ctx context.Context, client kubernetes.Interface, podName, containerName, namespace, content string) error { var logs []byte - waitErr := wait.PollImmediate(interval, logTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, interval, logTimeout, true, func(ctx context.Context) (bool, error) { var err error logs, err = PodLogs(ctx, client, podName, containerName, namespace) if err != nil { @@ -236,7 +236,7 @@ func WaitForAllPodsRunning(ctx context.Context, client kubernetes.Interface, nam func WaitForPodRunning(ctx context.Context, client kubernetes.Interface, name string, namespace string) error { var p *corev1.Pod pods := client.CoreV1().Pods(namespace) - waitErr := wait.PollImmediate(interval, podTimeout, func() (bool, error) { + waitErr := wait.PollUntilContextTimeout(ctx, interval, podTimeout, true, func(ctx context.Context) (bool, error) { var err error p, err = pods.Get(ctx, name, metav1.GetOptions{}) if err != nil { diff --git a/test/logstream/v2/stream_test.go b/test/logstream/v2/stream_test.go index f87777a40a..2bc05f6b3c 100644 --- a/test/logstream/v2/stream_test.go +++ b/test/logstream/v2/stream_test.go @@ -304,7 +304,7 @@ func TestNamespaceStream(t *testing.T) { // We can't assume that the cancel signal doesn't race the pod creation signal, so // we retry a few times to give some leeway. - if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(ctx context.Context) (bool, error) { if _, err := podClient.Create(context.Background(), knativePod, metav1.CreateOptions{}); err != nil { return false, err } diff --git a/test/spoof/spoof.go b/test/spoof/spoof.go index 319e2ead86..716e5d83db 100644 --- a/test/spoof/spoof.go +++ b/test/spoof/spoof.go @@ -167,7 +167,7 @@ func (sc *SpoofingClient) Poll(req *http.Request, inState ResponseChecker, check } var resp *Response - err := wait.PollImmediate(sc.RequestInterval, sc.RequestTimeout, func() (bool, error) { + err := wait.PollUntilContextTimeout(context.Background(), sc.RequestInterval, sc.RequestTimeout, true, func(ctx context.Context) (bool, error) { // Starting span to capture zipkin trace. traceContext, span := trace.StartSpan(req.Context(), "SpoofingClient-Trace") defer span.End() diff --git a/webhook/certificates/certificates_test.go b/webhook/certificates/certificates_test.go index a6369081a2..1ed889d269 100644 --- a/webhook/certificates/certificates_test.go +++ b/webhook/certificates/certificates_test.go @@ -243,7 +243,7 @@ func TestNew(t *testing.T) { } // Queue has async moving parts so if we check at the wrong moment, this might still be 0. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") diff --git a/webhook/configmaps/table_test.go b/webhook/configmaps/table_test.go index ad7992e3c9..1b0c51c0fe 100644 --- a/webhook/configmaps/table_test.go +++ b/webhook/configmaps/table_test.go @@ -353,7 +353,7 @@ func TestNew(t *testing.T) { } // Queue has async moving parts so if we check at the wrong moment, this might still be 0. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") diff --git a/webhook/helper_test.go b/webhook/helper_test.go index 34916df27d..4a34626307 100644 --- a/webhook/helper_test.go +++ b/webhook/helper_test.go @@ -131,7 +131,7 @@ func waitForNonTLSServerAvailable(t *testing.T, serverURL string, timeout time.D t.Helper() var interval = 100 * time.Millisecond - conditionFunc := func() (done bool, err error) { + conditionFunc := func(ctx context.Context) (done bool, err error) { var conn net.Conn conn, _ = net.DialTimeout("tcp", serverURL, timeout) if conn != nil { @@ -141,7 +141,7 @@ func waitForNonTLSServerAvailable(t *testing.T, serverURL string, timeout time.D return false, nil } - return wait.PollImmediate(interval, timeout, conditionFunc) + return wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, conditionFunc) } func waitForServerAvailable(t *testing.T, serverURL string, timeout time.Duration) error { @@ -158,7 +158,7 @@ func waitForServerAvailable(t *testing.T, serverURL string, timeout time.Duratio } ) - conditionFunc := func() (done bool, err error) { + conditionFunc := func(ctx context.Context) (done bool, err error) { conn, _ := tls.DialWithDialer(dialer, "tcp", serverURL, tlsConf) if conn != nil { conn.Close() @@ -167,7 +167,7 @@ func waitForServerAvailable(t *testing.T, serverURL string, timeout time.Duratio return false, nil } - return wait.PollImmediate(interval, timeout, conditionFunc) + return wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, conditionFunc) } func createNamespace(t *testing.T, kubeClient kubernetes.Interface, name string) error { diff --git a/webhook/psbinding/table_test.go b/webhook/psbinding/table_test.go index 1a178efdaf..c2fab50912 100644 --- a/webhook/psbinding/table_test.go +++ b/webhook/psbinding/table_test.go @@ -1067,7 +1067,7 @@ func TestNew(t *testing.T) { } // Queue has async moving parts so if we check at the wrong moment, this might still be 0. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") diff --git a/webhook/resourcesemantics/defaulting/table_test.go b/webhook/resourcesemantics/defaulting/table_test.go index fec0cf0851..2f946073da 100644 --- a/webhook/resourcesemantics/defaulting/table_test.go +++ b/webhook/resourcesemantics/defaulting/table_test.go @@ -487,7 +487,7 @@ func TestNew(t *testing.T) { } // Queue has async moving parts so if we check at the wrong moment, this might still be 0. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") diff --git a/webhook/resourcesemantics/validation/reconcile_config_test.go b/webhook/resourcesemantics/validation/reconcile_config_test.go index e027a15160..5663225889 100644 --- a/webhook/resourcesemantics/validation/reconcile_config_test.go +++ b/webhook/resourcesemantics/validation/reconcile_config_test.go @@ -601,7 +601,7 @@ func TestNew(t *testing.T) { } // Queue has async moving parts so if we check at the wrong moment, this might still be 0. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") diff --git a/webhook/resourcesemantics/validation/validation_admit_test.go b/webhook/resourcesemantics/validation/validation_admit_test.go index 1af76ae1f1..784c15e918 100644 --- a/webhook/resourcesemantics/validation/validation_admit_test.go +++ b/webhook/resourcesemantics/validation/validation_admit_test.go @@ -715,7 +715,7 @@ func newTestResourceAdmissionController(t *testing.T) webhook.AdmissionControlle } // Queue has async moving parts so if we check at the wrong moment, this might still be 0. - if wait.PollImmediate(10*time.Millisecond, 250*time.Millisecond, func() (bool, error) { + if wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 250*time.Millisecond, true, func(ctx context.Context) (bool, error) { return c.WorkQueue().Len() == 1, nil }) != nil { t.Error("Queue length was never 1") diff --git a/websocket/connection_test.go b/websocket/connection_test.go index d8712c31f4..f57cb6c7d4 100644 --- a/websocket/connection_test.go +++ b/websocket/connection_test.go @@ -17,6 +17,7 @@ limitations under the License. package websocket import ( + "context" "errors" "io" "net/http" @@ -361,7 +362,7 @@ func TestDurableConnectionWhenConnectionBreaksDown(t *testing.T) { defer conn.Shutdown() for i := 0; i < 10; i++ { - err := wait.PollImmediate(50*time.Millisecond, 5*time.Second, func() (bool, error) { + err := wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { if err := conn.Send(testPayload); err != nil { return false, nil }