File: test_cron.py

package info (click to toggle)
python-rq 2.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,580 kB
  • sloc: python: 13,878; makefile: 22; sh: 19
file content (813 lines) | stat: -rw-r--r-- 35,915 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
import os
import signal
import socket
import tempfile
import time
from datetime import datetime, timedelta, timezone
from multiprocessing import Process
from typing import cast
from unittest.mock import patch

from redis import Redis

from rq import Queue, utils
from rq.cron import CronJob, CronScheduler, _job_data_registry
from rq.cron_scheduler_registry import get_keys, get_registry_key
from rq.exceptions import SchedulerNotFound
from tests import RQTestCase
from tests.fixtures import div_by_zero, do_nothing, say_hello


class BreakLoop(Exception):
    pass


def run_scheduler(redis_connection_kwargs):
    """Target function to run the scheduler in a separate process."""
    scheduler = CronScheduler(connection=Redis(**redis_connection_kwargs))
    # Register a job that runs every second to keep the scheduler busy
    scheduler.register(do_nothing, 'default', interval=1)
    scheduler.start()


class TestCronScheduler(RQTestCase):
    """Tests for the Cron class"""

    def setUp(self):
        super().setUp()
        # No default self.cron instance needed here anymore, create it in tests
        self.queue_name = 'default'
        # Ensure clean global registry before each test method in this class
        _job_data_registry.clear()

    def test_scheduler_tracking_attributes(self):
        """Test that CronScheduler tracks hostname, pid, and config_file"""
        cron = CronScheduler(connection=self.connection)

        # Test initial values
        self.assertEqual(cron.hostname, socket.gethostname())
        self.assertEqual(cron.pid, os.getpid())
        self.assertFalse(cron.config_file)

        # Test config_file is set when loading config
        config_file_path = 'tests/cron_config.py'
        cron.load_config_from_file(config_file_path)
        self.assertEqual(cron.config_file, config_file_path)

    def test_register_job(self):
        """Test registering jobs with different configurations"""
        cron = CronScheduler(connection=self.connection)  # Create instance for test

        # Register job with cron expression
        cron_job_manual = cron.register(  # Renamed variable
            func=say_hello, queue_name=self.queue_name, cron='0 9 * * *'
        )

        self.assertIsInstance(cron_job_manual, CronJob)
        self.assertEqual(cron_job_manual.func, say_hello)
        self.assertIsNone(cron_job_manual.interval)
        self.assertEqual(cron_job_manual.cron, '0 9 * * *')

        # Register job with an interval
        interval = 60
        cron_job_interval = cron.register(  # Renamed variable
            func=say_hello, queue_name=self.queue_name, args=('Periodic job',), interval=interval
        )

        self.assertEqual(cron_job_interval.interval, interval)
        self.assertEqual(cron_job_interval.args, ('Periodic job',))

        # Verify jobs are in the cron instance's registry
        registered_jobs = cron.get_jobs()
        self.assertEqual(len(registered_jobs), 2)
        self.assertIn(cron_job_manual, registered_jobs)
        self.assertIn(cron_job_interval, registered_jobs)  # Check the second job too

    def test_enqueue_jobs(self):
        """Test that enqueue_jobs correctly enqueues jobs that are due to run"""
        cron = CronScheduler(connection=self.connection)  # Create instance for test

        # Register jobs with different intervals
        job1 = cron.register(func=say_hello, queue_name=self.queue_name, args=('Job 1',), interval=60)

        job2 = cron.register(func=do_nothing, queue_name=self.queue_name, interval=120)

        job3 = cron.register(func=say_hello, queue_name=self.queue_name, args=('Job 3',), interval=30)

        # Initially, all jobs should run because latest_run_time is not set
        enqueued_jobs = cron.enqueue_jobs()
        queue = Queue(self.queue_name, connection=self.connection)
        self.assertEqual(len(enqueued_jobs), 3)
        self.assertEqual(len(queue.get_jobs()), 3)

        # Store the run time for job2 to check later it wasn't updated
        self.assertIsNotNone(job2.latest_run_time)  # Ensure it was set by the first enqueue
        job_2_latest_run_time = job2.latest_run_time

        queue.empty()
        # Set job1 and job3 to be due, but not job2
        now_time = utils.now()  # Use consistent name
        # Manually set the next run times for testing `should_run` logic
        # Note: latest_run_time is already set from the previous enqueue
        job1.next_run_time = now_time - timedelta(seconds=5)  # 5 seconds ago
        job2.next_run_time = now_time + timedelta(seconds=30)  # 30 seconds in the future
        job3.next_run_time = now_time - timedelta(seconds=10)  # 10 seconds ago

        # Execute enqueue_jobs()
        enqueued_jobs = cron.enqueue_jobs()

        # Check that only job1 and job3 were enqueued
        self.assertEqual(len(enqueued_jobs), 2)
        self.assertIn(job1, enqueued_jobs)
        self.assertIn(job3, enqueued_jobs)
        self.assertNotIn(job2, enqueued_jobs)

        # Check that jobs were actually created in the queue
        queue_jobs = Queue(self.queue_name, connection=self.connection).get_jobs()
        self.assertEqual(len(queue_jobs), 2)

        # Check that the run times were updated for the enqueued jobs
        self.assertIsNotNone(job1.latest_run_time)
        assert job1.latest_run_time is not None and job_2_latest_run_time is not None
        self.assertTrue(job1.latest_run_time > job_2_latest_run_time)
        self.assertIsNotNone(job1.next_run_time)
        self.assertGreaterEqual(job1.next_run_time, now_time)

        self.assertIsNotNone(job3.latest_run_time)
        self.assertTrue(job3.latest_run_time > job_2_latest_run_time)
        self.assertIsNotNone(job3.next_run_time)
        self.assertGreaterEqual(job3.next_run_time, now_time)

        # job2 should not be updated since it wasn't run
        self.assertEqual(job2.latest_run_time, job_2_latest_run_time)  # Check it wasn't updated
        self.assertEqual(job2.next_run_time, now_time + timedelta(seconds=30))  # Still future time

    @patch('rq.cron.now')
    def test_calculate_sleep_interval(self, mock_now):
        """Tests calculate_sleep_interval across various explicit scenarios."""
        cron = CronScheduler(connection=self.connection)
        base_time = datetime(2023, 10, 27, 12, 0, 0)
        mock_now.return_value = base_time

        # No Jobs (directly check _cron_jobs on the instance)
        cron._cron_jobs = []  # Ensure no jobs
        actual_interval = cron.calculate_sleep_interval()
        self.assertEqual(actual_interval, 60)

        # Jobs with no next_run_time
        job1 = CronJob(func=say_hello, queue_name=self.queue_name, interval=60)
        job2 = CronJob(func=do_nothing, queue_name=self.queue_name, interval=120)
        job1.next_run_time = None
        job2.next_run_time = None
        cron._cron_jobs = [job1, job2]
        actual_interval = cron.calculate_sleep_interval()
        self.assertEqual(actual_interval, 60)

        # Future job within max sleep time
        job1 = CronJob(func=say_hello, queue_name=self.queue_name, interval=120)
        # Set run time needed to calculate next run time correctly
        job1.set_run_time(base_time - timedelta(seconds=120 - 35))  # Last run so next is in 35s
        # job1.next_run_time = base_time + timedelta(seconds=35) # Or set directly for simplicity
        job2 = CronJob(func=do_nothing, queue_name=self.queue_name, interval=180)
        job2.set_run_time(base_time - timedelta(seconds=180 - 70))  # Last run so next is in 70s
        # job2.next_run_time = base_time + timedelta(seconds=70) # Or set directly
        cron._cron_jobs = [job1, job2]
        actual_interval = cron.calculate_sleep_interval()
        # Get the actual next run times after set_run_time
        self.assertEqual(job1.next_run_time, base_time + timedelta(seconds=35))
        self.assertEqual(job2.next_run_time, base_time + timedelta(seconds=70))
        self.assertAlmostEqual(actual_interval, 35)

        # Future job over max sleep time
        job1 = CronJob(func=say_hello, queue_name=self.queue_name, interval=120)
        job1.set_run_time(base_time - timedelta(seconds=120 - 90))  # Last run so next is in 90s
        job2 = CronJob(func=do_nothing, queue_name=self.queue_name, interval=180)
        job2.set_run_time(base_time - timedelta(seconds=180 - 120))  # Last run so next is in 120s
        cron._cron_jobs = [job1, job2]
        actual_interval = cron.calculate_sleep_interval()
        self.assertEqual(job1.next_run_time, base_time + timedelta(seconds=90))
        self.assertEqual(job2.next_run_time, base_time + timedelta(seconds=120))
        self.assertEqual(actual_interval, 60)  # Capped at 60

        # Overdue job (should run immediately)
        job1 = CronJob(func=say_hello, queue_name=self.queue_name, interval=60)
        job1.set_run_time(base_time - timedelta(seconds=60 + 10))  # Last run was 70s ago, next was 10s ago
        job2 = CronJob(func=do_nothing, queue_name=self.queue_name, interval=180)
        job2.set_run_time(base_time - timedelta(seconds=180 - 20))  # Last run so next is in 20s
        cron._cron_jobs = [job1, job2]
        actual_interval = cron.calculate_sleep_interval()
        self.assertEqual(job1.next_run_time, base_time - timedelta(seconds=10))
        self.assertEqual(job2.next_run_time, base_time + timedelta(seconds=20))
        self.assertEqual(actual_interval, 0)

    def test_register_with_job_options(self):
        """Test registering a job with various job options"""
        cron = CronScheduler(connection=self.connection)  # Create instance for test
        timeout = 180
        result_ttl = 600
        meta = {'purpose': 'testing'}

        cron_job = cron.register(
            func=say_hello, queue_name=self.queue_name, interval=30, timeout=timeout, result_ttl=result_ttl, meta=meta
        )

        self.assertEqual(cron_job.job_options['timeout'], timeout)
        self.assertEqual(cron_job.job_options['result_ttl'], result_ttl)
        self.assertEqual(cron_job.job_options['meta'], meta)

    def test_load_config_from_file_method(self):  # Renamed test
        """Test loading cron configuration using the instance method"""
        # Create a Cron instance first
        cron = CronScheduler(connection=self.connection)

        # Load configuration using the method
        config_file_path = 'tests/cron_config.py'
        cron.load_config_from_file(config_file_path)

        # Check all jobs were registered on this instance
        jobs = cron.get_jobs()
        self.assertEqual(len(jobs), 4)

        # Verify specific job properties (same checks as before)
        job_functions = [job.func for job in jobs]
        job_intervals = [job.interval for job in jobs]

        self.assertIn(say_hello, job_functions)
        self.assertIn(div_by_zero, job_functions)
        self.assertIn(do_nothing, job_functions)
        self.assertIn(30, job_intervals)
        self.assertIn(180, job_intervals)

        # Find job with kwargs
        kwargs_job = next((job for job in jobs if job.kwargs.get('name') == 'RQ Cron'), None)
        self.assertIsNotNone(kwargs_job)
        self.assertEqual(kwargs_job.interval, 120)

        # Find job with args
        args_job = next((job for job in jobs if job.args == (10,)), None)
        self.assertIsNotNone(args_job)
        self.assertEqual(args_job.func, div_by_zero)

        # Verify the global registry is cleared after loading
        self.assertEqual(len(_job_data_registry), 0)

    def test_cron_config_path_finding_method(self):  # Renamed test
        """Test different ways to load cron configuration files using the instance method"""
        # Ensure the test directory is findable if running tests from elsewhere
        test_dir = os.path.dirname(__file__)
        config_file_rel = 'cron_config.py'
        config_file_abs = os.path.join(test_dir, config_file_rel)

        # Verify the config file exists for the test
        self.assertTrue(os.path.exists(config_file_abs), f'Test config file not found at {config_file_abs}')

        # Test 1: Loading with a direct file path (absolute path)
        cron = CronScheduler(connection=self.connection)
        # _job_data_registry is cleared inside the method, no need here
        cron.load_config_from_file(config_file_abs)
        self.assertEqual(len(cron.get_jobs()), 4, 'Failed loading with absolute path')
        self.assertEqual(len(_job_data_registry), 0, 'Registry not cleared after absolute path load')

        # Test 2: Loading with a module path
        cron = CronScheduler(connection=self.connection)
        cron.load_config_from_file('tests.cron_config')
        self.assertEqual(len(cron.get_jobs()), 4, 'Failed loading with module path')
        self.assertEqual(len(_job_data_registry), 0, 'Registry not cleared after module path load')

        # Test 3: Test error handling with a non-existent path
        cron = CronScheduler(connection=self.connection)
        with self.assertRaises(Exception):  # Expect FileNotFoundError or ImportError
            cron.load_config_from_file('path/does/not/exist.py')
        self.assertEqual(len(cron.get_jobs()), 0)  # No jobs should be loaded
        self.assertEqual(len(_job_data_registry), 0, 'Registry not cleared after non-existent path error')

        # Test 4: Test error handling with a valid file path but invalid content
        with tempfile.NamedTemporaryFile(suffix='.py', mode='w+', delete=False) as invalid_file:
            # Write some invalid Python content
            invalid_file.write('this is not valid python code :')
            invalid_file_path = invalid_file.name  # Store path before closing
        try:
            with self.assertRaises(Exception):  # Expect ImportError or SyntaxError inside
                cron.load_config_from_file(invalid_file_path)

            self.assertEqual(len(cron.get_jobs()), 0)  # No jobs should be loaded
            self.assertEqual(len(_job_data_registry), 0, 'Registry not cleared after invalid content error')
        finally:
            os.remove(invalid_file_path)  # Clean up temp file

    @patch('rq.cron.time.sleep')
    @patch('rq.cron.CronScheduler.calculate_sleep_interval')
    @patch('rq.cron.CronScheduler.enqueue_jobs')
    def test_start_loop(self, mock_enqueue, mock_calculate_interval, mock_sleep):
        """
        Tests cron.start() loop for both sleeping and non-sleeping scenarios.

        Simulates one iteration where sleep occurs (interval > 0) and one
        iteration where sleep is skipped (interval == 0), then breaks the loop.
        """
        cron = CronScheduler(connection=self.connection)  # Create instance for test

        # Simulate:
        # 1st iteration: Calculate interval > 0 (e.g., 15.5), should sleep.
        # 2nd iteration: Calculate interval == 0, should NOT sleep.
        # 3rd call to calculate_interval: Raise BreakLoop to exit test.
        mock_calculate_interval.side_effect = [15.5, 0, BreakLoop]

        # Enqueue can return an empty list for simplicity in this test
        mock_enqueue.return_value = []

        # Run start() and expect it to break when calculate_interval raises BreakLoop
        with self.assertRaises(BreakLoop):
            cron.start()  # Call start on the instance

        # --- Assertions ---
        self.assertEqual(mock_enqueue.call_count, 3)
        self.assertEqual(
            mock_calculate_interval.call_count, 3, 'calculate_sleep_interval should be called thrice (15.5, 0, raises)'
        )

        self.assertEqual(mock_sleep.call_count, 1, 'time.sleep should be called only once')
        mock_sleep.assert_called_once_with(15.5)  # Verify it was called with the correct interval

    def test_register_job_with_cron_string(self):
        """Test registering jobs with cron expressions"""
        cron = CronScheduler(connection=self.connection)

        # Register job with cron expression
        cron_expr = '0 9 * * 1-5'  # 9 AM on weekdays
        cron_job = cron.register(func=say_hello, queue_name=self.queue_name, cron=cron_expr)

        self.assertEqual(cron_job.func, say_hello)
        self.assertEqual(cron_job.cron, cron_expr)
        self.assertIsNone(cron_job.interval)

        # Verify job is in the cron instance's registry
        registered_jobs = cron.get_jobs()
        self.assertEqual(len(registered_jobs), 1)
        self.assertIn(cron_job, registered_jobs)

    def test_register_job_with_interval_and_cron_args(self):
        """Test that registering with both interval and cron arguments"""
        cron = CronScheduler(connection=self.connection)

        with self.assertRaises(ValueError):
            cron.register(func=say_hello, queue_name=self.queue_name, interval=60, cron='0 9 * * *')

        # Registering with neither interval nor cron also raises an error
        cron = CronScheduler(connection=self.connection)

        with self.assertRaises(ValueError):
            cron.register(func=say_hello, queue_name=self.queue_name)

    def test_enqueue_jobs_with_cron_strings(self):
        """Test that cron.register correctly handles cron-scheduled jobs"""
        cron = CronScheduler(connection=self.connection)

        # Register a job that should run every minute
        job1 = cron.register(
            func=say_hello,
            queue_name=self.queue_name,
            cron='* * * * *',  # Every minute
        )

        # Register a job that should run at a specific time in the future
        job2 = cron.register(
            func=do_nothing,
            queue_name=self.queue_name,
            cron='0 12 * * *',  # Daily at noon
        )

        # Initially, cron jobs should NOT run immediately (crontab behavior)
        enqueued_jobs = cron.enqueue_jobs()
        queue = Queue(self.queue_name, connection=self.connection)
        self.assertEqual(len(enqueued_jobs), 0)  # No jobs should be enqueued initially
        self.assertEqual(len(queue.get_jobs()), 0)

        # Verify next_run_time was initialized but jobs didn't run
        self.assertIsNone(job1.latest_run_time)
        self.assertIsNotNone(job1.next_run_time)
        self.assertIsNone(job2.latest_run_time)
        self.assertIsNotNone(job2.next_run_time)

    @patch('rq.cron.now')
    def test_cron_jobs_run_when_scheduled_time_arrives(self, mock_now):
        """Test that cron jobs run when their scheduled time arrives"""
        cron = CronScheduler(connection=self.connection)

        # Set current time to 8:59 AM (1 minute before 9 AM)
        current_time = datetime(2023, 10, 27, 8, 59, 0)
        mock_now.return_value = current_time

        # Register job scheduled for 9 AM daily
        job = cron.register(
            func=say_hello,
            queue_name=self.queue_name,
            cron='0 9 * * *',  # 9 AM daily
        )

        # Initially should not run (1 minute before schedule)
        enqueued_jobs = cron.enqueue_jobs()
        self.assertEqual(len(enqueued_jobs), 0)
        self.assertIsNone(job.latest_run_time)
        self.assertIsNotNone(job.next_run_time)

        # Now advance time to exactly 9 AM
        scheduled_time = datetime(2023, 10, 27, 9, 0, 0)
        mock_now.return_value = scheduled_time

        # Now the job should run
        enqueued_jobs = cron.enqueue_jobs()
        queue = Queue(self.queue_name, connection=self.connection)

        # Verify job was enqueued
        self.assertEqual(len(enqueued_jobs), 1)
        self.assertEqual(len(queue.get_jobs()), 1)
        self.assertIn(job, enqueued_jobs)

        # Verify job was marked as run and next run time updated
        self.assertIsNotNone(job.latest_run_time)
        self.assertEqual(job.latest_run_time, scheduled_time)
        self.assertIsNotNone(job.next_run_time)
        # Next run should be tomorrow at 9 AM
        expected_next_run = datetime(2023, 10, 28, 9, 0, 0)
        self.assertEqual(job.next_run_time, expected_next_run)

        # Verify the actual queued job has correct function
        queued_job = queue.get_jobs()[0]
        self.assertEqual(queued_job.func_name, 'tests.fixtures.say_hello')

    @patch('rq.cron.now')
    def test_multiple_cron_jobs_selective_execution(self, mock_now):
        """Test that only cron jobs whose time has arrived are executed"""
        cron = CronScheduler(connection=self.connection)

        # Set current time to 8:15 AM
        mock_now.return_value = datetime(2023, 10, 27, 8, 15, 0)

        # Register multiple jobs with different schedules
        job_9am = cron.register(
            func=say_hello,
            queue_name=self.queue_name,
            args=('9 AM job',),
            cron='0 9 * * *',  # 9 AM daily
        )

        job_10am = cron.register(
            func=do_nothing,
            queue_name=self.queue_name,
            cron='0 10 * * *',  # 10 AM daily
        )

        job_every_30_min = cron.register(
            func=say_hello,
            queue_name=self.queue_name,
            cron='*/30 * * * *',  # Every 30 minutes
        )

        # At 8:30 AM, only the 30-minute job should be ready
        # (assuming it last ran at 8:00 AM or this is its first run at 8:30)
        mock_now.return_value = datetime(2023, 10, 27, 8, 30, 0)
        enqueued_jobs = cron.enqueue_jobs()

        # Only the 30-minute job should run (as it matches the current time)
        self.assertEqual(enqueued_jobs, [job_every_30_min])

        # Now advance to 9:00 AM
        mock_now.return_value = datetime(2023, 10, 27, 9, 0, 0)
        enqueued_jobs = cron.enqueue_jobs()

        # Now the 9 AM job should also run, but not the 10 AM job
        self.assertEqual(len(enqueued_jobs), 2)
        self.assertIn(job_every_30_min, enqueued_jobs)
        self.assertIn(job_9am, enqueued_jobs)
        self.assertNotIn(job_10am, enqueued_jobs)

    @patch('rq.cron.now')
    def test_calculate_sleep_interval_with_cron_jobs(self, mock_now):
        """Test calculate_sleep_interval with cron-scheduled jobs"""
        cron = CronScheduler(connection=self.connection)
        # Set current time to 8:58 AM
        mock_now.return_value = datetime(2023, 10, 27, 8, 58, 0)

        # Create jobs that will have next_run_time set based on the mock time
        # Job 1: scheduled for every minute (next run at 8:59 AM, 1 minute away)
        job1 = CronJob(func=say_hello, queue_name=self.queue_name, cron='* * * * *')

        # Job 2: scheduled for 9:05 AM (7 minutes away)
        job2 = CronJob(func=do_nothing, queue_name=self.queue_name, cron='5 9 * * *')

        cron._cron_jobs = [job1, job2]
        actual_interval = cron.calculate_sleep_interval()

        # Should sleep for 1 minute (60 seconds) until the first job
        self.assertEqual(actual_interval, 60.0)

        # Test with a closer time - 30 seconds before next job
        mock_now.return_value = datetime(2023, 10, 27, 8, 58, 30)

        actual_interval = cron.calculate_sleep_interval()
        # Should sleep for 30 seconds (not capped at 60)
        self.assertEqual(actual_interval, 30.0)

    def test_mixed_interval_and_cron_jobs(self):
        """Test that interval-based and cron-based jobs can coexist"""
        cron = CronScheduler(connection=self.connection)

        # Register interval-based job
        interval_job = cron.register(func=say_hello, queue_name=self.queue_name, interval=120)

        # Register cron-based job
        cron_job = cron.register(func=do_nothing, queue_name=self.queue_name, cron='*/5 * * * *')

        # Verify both jobs are registered
        registered_jobs = cron.get_jobs()
        self.assertEqual(len(registered_jobs), 2)
        self.assertIn(interval_job, registered_jobs)
        self.assertIn(cron_job, registered_jobs)

        # Verify their properties
        self.assertEqual(interval_job.interval, 120)
        self.assertIsNone(interval_job.cron)
        self.assertEqual(cron_job.cron, '*/5 * * * *')
        self.assertIsNone(cron_job.interval)

        # Interval job should run immediately, cron job should wait for schedule
        self.assertTrue(interval_job.should_run())  # Interval jobs run immediately
        self.assertFalse(cron_job.should_run())  # Cron jobs wait for their schedule

    def test_cron_scheduler_to_dict(self):
        """Test that CronScheduler can be serialized to a dictionary"""
        cron = CronScheduler(connection=self.connection, name='test-scheduler')
        cron.config_file = 'test_config.py'

        data = cron.to_dict()

        self.assertEqual(data['hostname'], cron.hostname)
        self.assertEqual(data['pid'], str(cron.pid))
        self.assertEqual(data['name'], 'test-scheduler')
        self.assertEqual(data['config_file'], 'test_config.py')
        self.assertIn('created_at', data)

    def test_cron_scheduler_save_and_restore(self):
        """Test that CronScheduler can be saved to and restored from Redis"""
        # Create and configure scheduler
        original_scheduler = CronScheduler(connection=self.connection, name='test-scheduler')
        original_scheduler.config_file = 'test_config.py'

        # Save to Redis
        original_scheduler.save()

        # Verify data exists in Redis
        key = original_scheduler.key
        redis_data = self.connection.hgetall(key)
        self.assertGreater(len(cast(dict, redis_data)), 0)

        # Create new scheduler and restore from Redis data
        restored_scheduler = CronScheduler(connection=self.connection, name='restored-scheduler')
        restored_scheduler.restore(cast(dict, redis_data))

        # Verify restored data matches original
        self.assertEqual(restored_scheduler.hostname, original_scheduler.hostname)
        self.assertEqual(restored_scheduler.pid, original_scheduler.pid)
        self.assertEqual(restored_scheduler.name, original_scheduler.name)
        self.assertEqual(restored_scheduler.config_file, original_scheduler.config_file)
        self.assertEqual(restored_scheduler.created_at, original_scheduler.created_at)

    def test_cron_scheduler_fetch_from_redis(self):
        """Test that CronScheduler can be fetched from Redis using the fetch class method"""
        # Create and save a scheduler
        original_scheduler = CronScheduler(connection=self.connection, name='persistent-scheduler')
        original_scheduler.config_file = 'persistent_config.py'
        original_scheduler.save()

        # Fetch scheduler from Redis
        loaded_scheduler = CronScheduler.fetch('persistent-scheduler', self.connection)

        # Verify fetched scheduler matches original
        self.assertEqual(loaded_scheduler.name, original_scheduler.name)

        # Test that fetching a nonexistent scheduler raises SchedulerNotFound
        with self.assertRaises(SchedulerNotFound):
            CronScheduler.fetch('nonexistent-scheduler', self.connection)

    def test_cron_scheduler_default_name(self):
        """Test that CronScheduler creates a default name if none provided"""
        cron = CronScheduler(connection=self.connection)
        # Name should follow pattern: hostname:pid:random_suffix
        expected_prefix = f'{cron.hostname}:{cron.pid}:'
        self.assertTrue(cron.name.startswith(expected_prefix))
        # Random suffix should be 6 characters (hex)
        suffix = cron.name[len(expected_prefix) :]
        self.assertEqual(len(suffix), 6)
        self.assertTrue(all(c in '0123456789abcdef' for c in suffix))

    def test_register_birth_and_death(self):
        """Test that register_birth and register_death manage scheduler registry and Redis hash"""
        cron = CronScheduler(connection=self.connection)

        # Register birth
        cron.register_birth()

        # Verify scheduler is in registry
        registered_keys = get_keys(self.connection)
        self.assertIn(cron.name, registered_keys)

        # Verify Redis hash data was saved
        self.assertTrue(self.connection.exists(cron.key))

        # Verify TTL is set (should be 60 seconds)
        ttl = self.connection.ttl(cron.key)
        self.assertGreater(ttl, 0)
        self.assertLessEqual(ttl, 60)

        # Register death
        cron.register_death()

        # Verify scheduler is no longer in registry
        registered_keys = get_keys(self.connection)
        self.assertNotIn(cron.name, registered_keys)

    def test_fetch_after_register_birth(self):
        """Test that CronScheduler can be fetched using saved Redis hash data"""
        cron = CronScheduler(connection=self.connection)

        # Register birth to save data
        cron.register_birth()

        # Fetch the scheduler from Redis
        fetched_cron = CronScheduler.fetch(cron.name, self.connection)

        # Verify basic attributes were restored correctly
        self.assertEqual(fetched_cron.name, cron.name)
        self.assertEqual(fetched_cron.created_at, cron.created_at)

    def test_heartbeat(self):
        """Test that heartbeat() updates scheduler's timestamp in registry"""
        cron = CronScheduler(connection=self.connection)

        # Ensure registry is clean
        registry_key = get_registry_key()
        self.connection.delete(registry_key)

        # Register scheduler first (heartbeat only works on registered schedulers)
        cron.register_birth()
        initial_score = self.connection.zscore(registry_key, cron.name)
        self.assertIsNotNone(initial_score)

        # Wait a brief moment to ensure timestamp difference
        time.sleep(0.01)

        cron.heartbeat()
        new_score = self.connection.zscore(registry_key, cron.name)
        self.assertIsNotNone(new_score)
        self.assertGreater(cast(float, new_score), cast(float, initial_score))
        cron.register_death()

        # Test heartbeat on unregistered scheduler
        unregistered_cron = CronScheduler(connection=self.connection, name='unregistered-scheduler')

        # This should not raise an exception, but should log a warning
        unregistered_cron.heartbeat()

        # Verify unregistered scheduler is still not in registry
        score = self.connection.zscore(registry_key, 'unregistered-scheduler')
        self.assertIsNone(score)

    @patch('rq.cron.CronScheduler.register_death')
    @patch('rq.cron.CronScheduler._install_signal_handlers')
    def test_start_always_calls_register_death_on_exception(self, mock_signal_handlers, mock_register_death):
        """Test that start() calls register_death even when an exception occurs in the main loop"""
        cron = CronScheduler(connection=self.connection, name='test-scheduler')

        # Mock enqueue_jobs to raise an exception
        with patch.object(cron, 'enqueue_jobs', side_effect=Exception('Test exception')):
            with self.assertRaises(Exception) as cm:
                cron.start()

            self.assertEqual(str(cm.exception), 'Test exception')

            # Verify register_death was still called
            mock_signal_handlers.assert_called_once()
            mock_register_death.assert_called_once()

    @patch('rq.cron.CronScheduler.register_death')
    def test_start_handles_keyboard_interrupt(self, mock_register_death):
        """Test that start() handles KeyboardInterrupt gracefully"""
        cron = CronScheduler(connection=self.connection, name='test-scheduler')

        # Mock enqueue_jobs to raise KeyboardInterrupt
        with patch.object(cron, 'enqueue_jobs', side_effect=KeyboardInterrupt('Ctrl+C')):
            # start() should handle KeyboardInterrupt without re-raising it
            cron.start()

            # Verify lifecycle methods were called
            mock_register_death.assert_called_once()

    def test_sigint_handling(self):
        """Test that sending SIGINT to the process stops the scheduler"""
        conn_kwargs = self.connection.connection_pool.connection_kwargs
        scheduler_process = Process(target=run_scheduler, args=(conn_kwargs,))
        scheduler_process.start()
        assert scheduler_process.pid
        time.sleep(0.2)
        # Ensure scheduler is registered (name will have random suffix)
        scheduler_prefix = f'{socket.gethostname()}:{scheduler_process.pid}:'

        # Find scheduler with matching prefix
        matching_scheduler = None
        for key in get_keys(self.connection):
            if key.startswith(scheduler_prefix):
                matching_scheduler = key
                break

        self.assertTrue(matching_scheduler)

        os.kill(scheduler_process.pid, signal.SIGINT)

        scheduler_process.join(timeout=2)
        self.assertFalse(scheduler_process.is_alive())

        # Verify scheduler is no longer registered
        keys = get_keys(self.connection)
        self.assertEqual([key for key in keys if key.startswith(scheduler_prefix)], [])

    def test_last_heartbeat_property(self):
        """Test that last_heartbeat property works correctly in all scenarios"""
        cron = CronScheduler(connection=self.connection)

        # Ensure registry is clean
        registry_key = get_registry_key()
        self.connection.delete(registry_key)

        # Register scheduler and verify heartbeat timestamp
        before_registration = datetime.now(timezone.utc)
        cron.register_birth()
        initial_heartbeat = cron.last_heartbeat

        assert initial_heartbeat

        after_registration = datetime.now(timezone.utc)
        # Heartbeat should be between before and after registration
        self.assertGreaterEqual(initial_heartbeat, before_registration - timedelta(seconds=1))
        self.assertLessEqual(initial_heartbeat, after_registration + timedelta(seconds=1))

        # Wait and send heartbeat, should update timestamp
        time.sleep(0.01)
        cron.heartbeat()

        new_heartbeat = cron.last_heartbeat
        assert new_heartbeat
        self.assertGreater(new_heartbeat, initial_heartbeat)

        # After death, should return None
        cron.register_death()
        self.assertIsNone(cron.last_heartbeat)

    def test_all(self):
        """Test that CronScheduler.all() returns all registered schedulers"""
        # Clean up any existing schedulers
        registry_key = get_registry_key()
        self.connection.delete(registry_key)

        # Create multiple schedulers with default names (now unique due to random suffix)
        cron1 = CronScheduler(connection=self.connection)
        cron2 = CronScheduler(connection=self.connection)
        cron3 = CronScheduler(connection=self.connection)

        # Register births
        cron1.register_birth()
        cron2.register_birth()
        cron3.register_birth()

        # Test all() method
        all_schedulers = CronScheduler.all(self.connection)

        # Should return all 3 schedulers
        self.assertEqual(set(all_schedulers), {cron1, cron2, cron3})

        # Test with cleanup disabled
        all_schedulers_no_cleanup = CronScheduler.all(self.connection, cleanup=False)
        self.assertEqual(len(all_schedulers_no_cleanup), 3)

        # Register death for cleanup
        cron1.register_death()
        cron2.register_death()
        cron3.register_death()

    def test_equality(self):
        """Test that CronScheduler equality works correctly"""
        # Schedulers with same name should be equal
        cron1 = CronScheduler(connection=self.connection, name='test-scheduler')
        cron2 = CronScheduler(connection=self.connection, name='test-scheduler')

        self.assertEqual(cron1, cron2)
        self.assertEqual(hash(cron1), hash(cron2))

        # Schedulers with different names should not be equal
        cron3 = CronScheduler(connection=self.connection, name='different-scheduler')
        self.assertNotEqual(cron1, cron3)
        self.assertNotEqual(hash(cron1), hash(cron3))

        # Scheduler should not equal non-scheduler objects
        self.assertNotEqual(cron1, 'not-a-scheduler')
        self.assertNotEqual(cron1, 42)