Files
bitcoin_monitor/config/tasks.py
2026-01-16 22:20:18 +03:00

230 lines
7.6 KiB
Python

from celery import shared_task
from celery.utils.log import get_task_logger
import logging
from datetime import timedelta
from django.utils import timezone
from django.db import transaction
from monitor.models import BitcoinPrice, MarketAnalysis, SystemStatus
from monitor.services.analyzer import MarketAnalyzer
from monitor.services.data_fetcher import CoinGeckoFetcher
logger = get_task_logger(__name__)
@shared_task(bind=True, max_retries=3)
def fetch_bitcoin_price_task(self):
"""
Fetch current Bitcoin price and save to database.
Runs every 5 minutes by default.
"""
logger.info("Starting Bitcoin price fetch task...")
try:
fetcher = CoinGeckoFetcher()
# Fetch current price
price_data = fetcher.fetch_current_price()
if not price_data:
logger.error("Failed to fetch price data")
raise Exception("No price data received")
# Save to database in transaction
with transaction.atomic():
bitcoin_price = BitcoinPrice.objects.create(
timestamp=price_data['timestamp'],
price_usd=price_data['price_usd'],
volume=price_data.get('volume'),
market_cap=price_data.get('market_cap'),
)
# Update system status
SystemStatus.objects.create(
current_price=bitcoin_price.price_usd,
last_hourly_update=timezone.now(),
last_successful_fetch=timezone.now(),
is_stale=False,
is_healthy=True,
)
logger.info(f"Successfully fetched and saved Bitcoin price: ${price_data['price_usd']}")
# Trigger analysis if it's been more than 55 minutes since last analysis
# or if this is a significant price change
last_analysis = MarketAnalysis.objects.filter(
period='hourly'
).order_by('-timestamp').first()
should_analyze = False
if not last_analysis:
should_analyze = True
else:
time_since_analysis = timezone.now() - last_analysis.timestamp
if time_since_analysis.total_seconds() > 3300: # 55 minutes
should_analyze = True
if should_analyze:
# Run analysis in separate task
run_hourly_analysis_task.delay()
return {
'success': True,
'price': float(price_data['price_usd']),
'timestamp': price_data['timestamp'].isoformat(),
}
except Exception as e:
logger.error(f"Error in fetch_bitcoin_price_task: {e}")
# Update system status with error
SystemStatus.objects.create(
last_error=str(e),
is_stale=True,
is_healthy=False,
)
# Retry the task
self.retry(exc=e, countdown=60)
return {
'success': False,
'error': str(e),
}
@shared_task
def run_hourly_analysis_task():
"""
Run hourly market analysis.
Runs every hour by default.
"""
logger.info("Starting hourly analysis task...")
try:
analyzer = MarketAnalyzer(threshold_percent=15.0)
# Run hourly analysis
analysis = analyzer.analyze_market('hourly')
if analysis:
logger.info(f"Hourly analysis completed: {analysis.status} at ${analysis.current_price}")
# Check if this is an event and log it
if analysis.is_event:
logger.warning(
f"Market event detected: {analysis.event_type} "
f"at ${analysis.current_price}"
)
return {
'success': True,
'analysis_id': analysis.id,
'status': analysis.status,
'price': float(analysis.current_price),
'is_event': analysis.is_event,
}
else:
logger.warning("Hourly analysis returned no results")
return {'success': False, 'error': 'No analysis results'}
except Exception as e:
logger.error(f"Error in run_hourly_analysis_task: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def cleanup_old_data_task():
"""
Clean up old data to keep database size manageable.
Runs once a day.
"""
logger.info("Starting data cleanup task...")
try:
# Keep only last 30 days of price data for performance
cutoff_date = timezone.now() - timedelta(days=30)
deleted_count, _ = BitcoinPrice.objects.filter(
timestamp__lt=cutoff_date
).delete()
# Keep only last 1000 system status entries
status_entries = SystemStatus.objects.all().order_by('-timestamp')
if status_entries.count() > 1000:
status_to_delete = status_entries[1000:]
deleted_status_count, _ = status_to_delete.delete()
# Keep only last 365 analyses
analyses = MarketAnalysis.objects.all().order_by('-timestamp')
if analyses.count() > 365:
analyses_to_delete = analyses[365:]
deleted_analyses_count, _ = analyses_to_delete.delete()
logger.info(f"Cleanup completed. Deleted {deleted_count} old price records.")
return {
'success': True,
'deleted_prices': deleted_count,
}
except Exception as e:
logger.error(f"Error in cleanup_old_data_task: {e}")
return {'success': False, 'error': str(e)}
@shared_task
def check_system_health_task():
"""
Check system health and log status.
Runs every 10 minutes.
"""
logger.info("Checking system health...")
try:
# Check if we have recent data
recent_price = BitcoinPrice.objects.order_by('-timestamp').first()
recent_analysis = MarketAnalysis.objects.order_by('-timestamp').first()
is_healthy = True
issues = []
if not recent_price:
issues.append("No price data available")
is_healthy = False
else:
price_age = (timezone.now() - recent_price.timestamp).total_seconds()
if price_age > 3600: # More than 1 hour old
issues.append(f"Price data is {price_age/60:.0f} minutes old")
is_healthy = False
if not recent_analysis:
issues.append("No analysis data available")
is_healthy = False
else:
analysis_age = (timezone.now() - recent_analysis.timestamp).total_seconds()
if analysis_age > 7200: # More than 2 hours old
issues.append(f"Analysis data is {analysis_age/3600:.1f} hours old")
is_healthy = False
# Log health status
if is_healthy:
logger.info("System is healthy")
else:
logger.warning(f"System has issues: {', '.join(issues)}")
# Update system status
SystemStatus.objects.create(
is_healthy=is_healthy,
last_error=', '.join(issues) if issues else None,
)
return {
'healthy': is_healthy,
'issues': issues,
'last_price_time': recent_price.timestamp.isoformat() if recent_price else None,
'last_analysis_time': recent_analysis.timestamp.isoformat() if recent_analysis else None,
}
except Exception as e:
logger.error(f"Error in check_system_health_task: {e}")
return {'healthy': False, 'error': str(e)}