from monitor.services.email_service import EmailService @shared_task def send_event_notification_task(analysis_id): """Send email notification for a market event.""" try: from monitor.models import MarketAnalysis analysis = MarketAnalysis.objects.get(id=analysis_id) if analysis.is_event: email_service = EmailService() email_service.send_event_alert(analysis) logger.info(f"Event notification sent for analysis {analysis_id}") return {'success': True, 'analysis_id': analysis_id} else: logger.info(f"Analysis {analysis_id} is not an event, skipping notification") return {'success': False, 'reason': 'Not an event'} except MarketAnalysis.DoesNotExist: logger.error(f"Analysis {analysis_id} not found") return {'success': False, 'error': 'Analysis not found'} except Exception as e: logger.error(f"Error sending event notification: {e}") return {'success': False, 'error': str(e)} @shared_task def send_daily_digest_task(): """Send daily digest email at 8 AM.""" try: email_service = EmailService() email_service.send_daily_digest() logger.info("Daily digest email sent") return {'success': True} except Exception as e: logger.error(f"Error sending daily digest: {e}") # Try to send system alert about the failure send_system_notification_task.delay( alert_title="Daily Digest Failed", alert_message=f"Failed to send daily digest: {str(e)}", severity='error' ) return {'success': False, 'error': str(e)} @shared_task def send_system_notification_task(alert_title, alert_message, severity='warning', affected_component='system'): """Send system notification email.""" try: email_service = EmailService() email_service.send_system_alert( alert_title=alert_title, alert_message=alert_message, severity=severity, affected_component=affected_component ) logger.info(f"System notification sent: {alert_title}") return {'success': True} except Exception as e: logger.error(f"Error sending system notification: {e}") return {'success': False, 'error': str(e)} # Update the fetch_bitcoin_price_task to send system alerts on failure @shared_task(bind=True, max_retries=3) def fetch_bitcoin_price_task(self): """Fetch current Bitcoin price and save to database.""" logger.info("Starting Bitcoin price fetch task...") try: fetcher = CoinGeckoFetcher() # Fetch current price price_data = fetcher.fetch_current_price() if not price_data: logger.error("Failed to fetch price data") # Send system alert send_system_notification_task.delay( alert_title="Bitcoin Price Fetch Failed", alert_message="Failed to fetch Bitcoin price from CoinGecko API. Check API status and internet connection.", severity='warning', affected_component='data_fetcher' ) raise Exception("No price data received") # ... rest of existing code ... except Exception as e: logger.error(f"Error in fetch_bitcoin_price_task: {e}") # Update system status with error SystemStatus.objects.create( last_error=str(e), is_stale=True, is_healthy=False, ) # Send system alert send_system_notification_task.delay( alert_title="Bitcoin Price Task Error", alert_message=f"Error in price fetch task: {str(e)}", severity='error', affected_component='celery_task' ) # Retry the task self.retry(exc=e, countdown=60) return { 'success': False, 'error': str(e), } # Update run_hourly_analysis_task to send event notifications @shared_task def run_hourly_analysis_task(): """Run hourly market analysis.""" logger.info("Starting hourly analysis task...") try: analyzer = MarketAnalyzer(threshold_percent=15.0) # Run hourly analysis analysis = analyzer.analyze_market('hourly') if analysis: logger.info(f"Hourly analysis completed: {analysis.status} at ${analysis.current_price}") # Check if this is an event and send notification if analysis.is_event: logger.warning( f"Market event detected: {analysis.event_type} " f"at ${analysis.current_price}" ) # Send email notification send_event_notification_task.delay(analysis.id) return { 'success': True, 'analysis_id': analysis.id, 'status': analysis.status, 'price': float(analysis.current_price), 'is_event': analysis.is_event, } else: logger.warning("Hourly analysis returned no results") return {'success': False, 'error': 'No analysis results'} except Exception as e: logger.error(f"Error in run_hourly_analysis_task: {e}") # Send system alert send_system_notification_task.delay( alert_title="Hourly Analysis Failed", alert_message=f"Failed to run hourly analysis: {str(e)}", severity='error', affected_component='market_analyzer' ) return {'success': False, 'error': str(e)}