import os from celery import Celery from celery.schedules import crontab # Set the default Django settings module os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') app = Celery('bitcoin_monitor') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django apps app.autodiscover_tasks() # Configure periodic tasks app.conf.beat_schedule = { 'fetch-bitcoin-price-every-5-minutes': { 'task': 'monitor.tasks.fetch_bitcoin_price_task', 'schedule': 300.0, # 300 seconds = 5 minutes 'options': { 'expires': 300, 'retry': True, 'retry_policy': { 'max_retries': 3, 'interval_start': 60, 'interval_step': 60, 'interval_max': 300, } }, }, 'run-hourly-analysis-every-hour': { 'task': 'monitor.tasks.run_hourly_analysis_task', 'schedule': 3600.0, # 3600 seconds = 1 hour 'options': { 'expires': 3600, }, }, 'send-daily-digest-at-8am': { 'task': 'monitor.tasks.send_daily_digest_task', 'schedule': crontab(hour=8, minute=0), # 8 AM daily 'options': { 'expires': 3600, }, }, 'cleanup-old-data-daily': { 'task': 'monitor.tasks.cleanup_old_data_task', 'schedule': crontab(hour=0, minute=0), # Midnight daily }, 'check-system-health-every-10-minutes': { 'task': 'monitor.tasks.check_system_health_task', 'schedule': 600.0, # 600 seconds = 10 minutes }, } # Schedule for daily/yearly tasks (disabled for now as per your request) # app.conf.beat_schedule.update({ # 'run-yearly-analysis-daily': { # 'task': 'monitor.tasks.run_yearly_analysis_task', # 'schedule': crontab(hour=0, minute=0), # Midnight # }, # }) @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')