230 lines
7.6 KiB
Python
230 lines
7.6 KiB
Python
from celery import shared_task
|
|
from celery.utils.log import get_task_logger
|
|
import logging
|
|
from datetime import timedelta
|
|
from django.utils import timezone
|
|
from django.db import transaction
|
|
|
|
from monitor.models import BitcoinPrice, MarketAnalysis, SystemStatus
|
|
from monitor.services.analyzer import MarketAnalyzer
|
|
from monitor.services.data_fetcher import CoinGeckoFetcher
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def fetch_bitcoin_price_task(self):
|
|
"""
|
|
Fetch current Bitcoin price and save to database.
|
|
Runs every 5 minutes by default.
|
|
"""
|
|
logger.info("Starting Bitcoin price fetch task...")
|
|
|
|
try:
|
|
fetcher = CoinGeckoFetcher()
|
|
|
|
# Fetch current price
|
|
price_data = fetcher.fetch_current_price()
|
|
|
|
if not price_data:
|
|
logger.error("Failed to fetch price data")
|
|
raise Exception("No price data received")
|
|
|
|
# Save to database in transaction
|
|
with transaction.atomic():
|
|
bitcoin_price = BitcoinPrice.objects.create(
|
|
timestamp=price_data['timestamp'],
|
|
price_usd=price_data['price_usd'],
|
|
volume=price_data.get('volume'),
|
|
market_cap=price_data.get('market_cap'),
|
|
)
|
|
|
|
# Update system status
|
|
SystemStatus.objects.create(
|
|
current_price=bitcoin_price.price_usd,
|
|
last_hourly_update=timezone.now(),
|
|
last_successful_fetch=timezone.now(),
|
|
is_stale=False,
|
|
is_healthy=True,
|
|
)
|
|
|
|
logger.info(f"Successfully fetched and saved Bitcoin price: ${price_data['price_usd']}")
|
|
|
|
# Trigger analysis if it's been more than 55 minutes since last analysis
|
|
# or if this is a significant price change
|
|
last_analysis = MarketAnalysis.objects.filter(
|
|
period='hourly'
|
|
).order_by('-timestamp').first()
|
|
|
|
should_analyze = False
|
|
if not last_analysis:
|
|
should_analyze = True
|
|
else:
|
|
time_since_analysis = timezone.now() - last_analysis.timestamp
|
|
if time_since_analysis.total_seconds() > 3300: # 55 minutes
|
|
should_analyze = True
|
|
|
|
if should_analyze:
|
|
# Run analysis in separate task
|
|
run_hourly_analysis_task.delay()
|
|
|
|
return {
|
|
'success': True,
|
|
'price': float(price_data['price_usd']),
|
|
'timestamp': price_data['timestamp'].isoformat(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in fetch_bitcoin_price_task: {e}")
|
|
|
|
# Update system status with error
|
|
SystemStatus.objects.create(
|
|
last_error=str(e),
|
|
is_stale=True,
|
|
is_healthy=False,
|
|
)
|
|
|
|
# Retry the task
|
|
self.retry(exc=e, countdown=60)
|
|
|
|
return {
|
|
'success': False,
|
|
'error': str(e),
|
|
}
|
|
|
|
|
|
@shared_task
|
|
def run_hourly_analysis_task():
|
|
"""
|
|
Run hourly market analysis.
|
|
Runs every hour by default.
|
|
"""
|
|
logger.info("Starting hourly analysis task...")
|
|
|
|
try:
|
|
analyzer = MarketAnalyzer(threshold_percent=15.0)
|
|
|
|
# Run hourly analysis
|
|
analysis = analyzer.analyze_market('hourly')
|
|
|
|
if analysis:
|
|
logger.info(f"Hourly analysis completed: {analysis.status} at ${analysis.current_price}")
|
|
|
|
# Check if this is an event and log it
|
|
if analysis.is_event:
|
|
logger.warning(
|
|
f"Market event detected: {analysis.event_type} "
|
|
f"at ${analysis.current_price}"
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
'analysis_id': analysis.id,
|
|
'status': analysis.status,
|
|
'price': float(analysis.current_price),
|
|
'is_event': analysis.is_event,
|
|
}
|
|
else:
|
|
logger.warning("Hourly analysis returned no results")
|
|
return {'success': False, 'error': 'No analysis results'}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in run_hourly_analysis_task: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
@shared_task
|
|
def cleanup_old_data_task():
|
|
"""
|
|
Clean up old data to keep database size manageable.
|
|
Runs once a day.
|
|
"""
|
|
logger.info("Starting data cleanup task...")
|
|
|
|
try:
|
|
# Keep only last 30 days of price data for performance
|
|
cutoff_date = timezone.now() - timedelta(days=30)
|
|
deleted_count, _ = BitcoinPrice.objects.filter(
|
|
timestamp__lt=cutoff_date
|
|
).delete()
|
|
|
|
# Keep only last 1000 system status entries
|
|
status_entries = SystemStatus.objects.all().order_by('-timestamp')
|
|
if status_entries.count() > 1000:
|
|
status_to_delete = status_entries[1000:]
|
|
deleted_status_count, _ = status_to_delete.delete()
|
|
|
|
# Keep only last 365 analyses
|
|
analyses = MarketAnalysis.objects.all().order_by('-timestamp')
|
|
if analyses.count() > 365:
|
|
analyses_to_delete = analyses[365:]
|
|
deleted_analyses_count, _ = analyses_to_delete.delete()
|
|
|
|
logger.info(f"Cleanup completed. Deleted {deleted_count} old price records.")
|
|
return {
|
|
'success': True,
|
|
'deleted_prices': deleted_count,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in cleanup_old_data_task: {e}")
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
@shared_task
|
|
def check_system_health_task():
|
|
"""
|
|
Check system health and log status.
|
|
Runs every 10 minutes.
|
|
"""
|
|
logger.info("Checking system health...")
|
|
|
|
try:
|
|
# Check if we have recent data
|
|
recent_price = BitcoinPrice.objects.order_by('-timestamp').first()
|
|
recent_analysis = MarketAnalysis.objects.order_by('-timestamp').first()
|
|
|
|
is_healthy = True
|
|
issues = []
|
|
|
|
if not recent_price:
|
|
issues.append("No price data available")
|
|
is_healthy = False
|
|
else:
|
|
price_age = (timezone.now() - recent_price.timestamp).total_seconds()
|
|
if price_age > 3600: # More than 1 hour old
|
|
issues.append(f"Price data is {price_age/60:.0f} minutes old")
|
|
is_healthy = False
|
|
|
|
if not recent_analysis:
|
|
issues.append("No analysis data available")
|
|
is_healthy = False
|
|
else:
|
|
analysis_age = (timezone.now() - recent_analysis.timestamp).total_seconds()
|
|
if analysis_age > 7200: # More than 2 hours old
|
|
issues.append(f"Analysis data is {analysis_age/3600:.1f} hours old")
|
|
is_healthy = False
|
|
|
|
# Log health status
|
|
if is_healthy:
|
|
logger.info("System is healthy")
|
|
else:
|
|
logger.warning(f"System has issues: {', '.join(issues)}")
|
|
|
|
# Update system status
|
|
SystemStatus.objects.create(
|
|
is_healthy=is_healthy,
|
|
last_error=', '.join(issues) if issues else None,
|
|
)
|
|
|
|
return {
|
|
'healthy': is_healthy,
|
|
'issues': issues,
|
|
'last_price_time': recent_price.timestamp.isoformat() if recent_price else None,
|
|
'last_analysis_time': recent_analysis.timestamp.isoformat() if recent_analysis else None,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in check_system_health_task: {e}")
|
|
return {'healthy': False, 'error': str(e)}
|