import logging import time from datetime import datetime, timezone, timedelta from typing import List, Dict, Optional import requests from django.db import transaction from django.utils import timezone as django_timezone from decimal import Decimal from monitor.models import BitcoinPrice logger = logging.getLogger(__name__) class HistoricalDataFetcher: """Fetches historical Bitcoin price data.""" def __init__(self): self.base_url = "https://api.coingecko.com/api/v3" self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'BitcoinMonitor/1.0', 'Accept': 'application/json', }) def fetch_historical_data(self, days: int = 365) -> List[Dict]: """ Fetch historical Bitcoin data for specified number of days. Args: days: Number of days of historical data to fetch Returns: List of price data dictionaries """ try: logger.info(f"Fetching {days} days of historical Bitcoin data...") url = f"{self.base_url}/coins/bitcoin/market_chart" params = { 'vs_currency': 'usd', 'days': days, 'interval': 'daily', } response = self.session.get(url, params=params, timeout=30) response.raise_for_status() data = response.json() historical_data = [] # Process prices for price_point in data.get('prices', []): timestamp = datetime.fromtimestamp(price_point[0] / 1000, timezone.utc) price = price_point[1] historical_data.append({ 'timestamp': timestamp, 'price_usd': price, 'volume': None, 'market_cap': None, }) # Add volume data if available volumes = data.get('total_volumes', []) for i, (timestamp_ms, volume) in enumerate(volumes): if i < len(historical_data): historical_data[i]['volume'] = volume # Add market cap data if available market_caps = data.get('market_caps', []) for i, (timestamp_ms, market_cap) in enumerate(market_caps): if i < len(historical_data): historical_data[i]['market_cap'] = market_cap logger.info(f"Fetched {len(historical_data)} historical price points") return historical_data except requests.exceptions.RequestException as e: logger.error(f"Request error fetching historical data: {e}") return [] except Exception as e: logger.error(f"Error fetching historical data: {e}") return [] def fetch_historical_data_range(self, start_date: datetime, end_date: datetime) -> List[Dict]: """ Fetch historical data for a specific date range. Note: CoinGecko API doesn't support arbitrary date ranges directly, so we fetch maximum days and filter. """ # Calculate days between dates days_difference = (end_date - start_date).days # Fetch more data than needed to ensure we have the range all_data = self.fetch_historical_data(days=days_difference + 100) # Filter to date range filtered_data = [ point for point in all_data if start_date <= point['timestamp'] <= end_date ] return filtered_data def save_historical_data(self, historical_data: List[Dict], clear_existing: bool = False) -> Dict: """ Save historical data to database. Args: historical_data: List of price data dictionaries clear_existing: Whether to clear existing data before saving Returns: Dictionary with statistics about the operation """ if not historical_data: logger.warning("No historical data to save") return {'saved': 0, 'skipped': 0, 'errors': 0} try: with transaction.atomic(): if clear_existing: deleted_count, _ = BitcoinPrice.objects.all().delete() logger.info(f"Cleared {deleted_count} existing price records") saved_count = 0 skipped_count = 0 error_count = 0 for data_point in historical_data: try: # Check if price already exists for this timestamp exists = BitcoinPrice.objects.filter( timestamp=data_point['timestamp'] ).exists() if exists: skipped_count += 1 continue # Create BitcoinPrice object BitcoinPrice.objects.create( timestamp=data_point['timestamp'], price_usd=Decimal(str(data_point['price_usd'])), volume=Decimal(str(data_point['volume'])) if data_point.get('volume') else None, market_cap=Decimal(str(data_point['market_cap'])) if data_point.get('market_cap') else None, ) saved_count += 1 # Log progress every 50 records if saved_count % 50 == 0: logger.info(f"Saved {saved_count} historical records...") except Exception as e: error_count += 1 logger.error(f"Error saving data point {data_point.get('timestamp')}: {e}") logger.info(f"Historical data saved: {saved_count} new, {skipped_count} existing, {error_count} errors") return { 'saved': saved_count, 'skipped': skipped_count, 'errors': error_count, 'total': len(historical_data), } except Exception as e: logger.error(f"Transaction error saving historical data: {e}") return {'saved': 0, 'skipped': 0, 'errors': len(historical_data), 'total': len(historical_data)} def generate_test_data(self, days: int = 30, base_price: float = 45000) -> List[Dict]: """ Generate synthetic test data for development. Args: days: Number of days of test data base_price: Base price for the data Returns: List of synthetic price data """ import random from datetime import timedelta logger.info(f"Generating {days} days of synthetic test data...") test_data = [] now = django_timezone.now() for i in range(days * 24): # Generate hourly data timestamp = now - timedelta(hours=i) # Create realistic price fluctuations (±5%) variation = random.uniform(0.95, 1.05) price = base_price * variation # Generate volume and market cap with some randomness volume = random.uniform(20000000000, 40000000000) market_cap = random.uniform(800000000000, 900000000000) test_data.append({ 'timestamp': timestamp, 'price_usd': round(price, 2), 'volume': round(volume, 2), 'market_cap': round(market_cap, 2), }) # Reverse to have chronological order test_data.reverse() logger.info(f"Generated {len(test_data)} synthetic data points") return test_data def analyze_historical_data_quality(self, historical_data: List[Dict]) -> Dict: """ Analyze the quality of historical data. Args: historical_data: List of price data dictionaries Returns: Dictionary with quality metrics """ if not historical_data: return {'error': 'No data to analyze'} # Sort by timestamp sorted_data = sorted(historical_data, key=lambda x: x['timestamp']) timestamps = [d['timestamp'] for d in sorted_data] prices = [d['price_usd'] for d in sorted_data] # Calculate metrics min_price = min(prices) max_price = max(prices) avg_price = sum(prices) / len(prices) # Check for gaps in timestamps time_gaps = [] for i in range(1, len(timestamps)): gap = (timestamps[i] - timestamps[i-1]).total_seconds() / 3600 # hours if gap > 24: # More than 1 day gap time_gaps.append({ 'from': timestamps[i-1], 'to': timestamps[i], 'gap_hours': gap, }) # Check for missing values missing_prices = sum(1 for d in sorted_data if d['price_usd'] is None) missing_volumes = sum(1 for d in sorted_data if d.get('volume') is None) missing_market_caps = sum(1 for d in sorted_data if d.get('market_cap') is None) return { 'total_points': len(historical_data), 'date_range': { 'start': timestamps[0], 'end': timestamps[-1], 'days': (timestamps[-1] - timestamps[0]).days, }, 'price_stats': { 'min': min_price, 'max': max_price, 'average': avg_price, 'range_percent': ((max_price - min_price) / min_price * 100), }, 'data_quality': { 'missing_prices': missing_prices, 'missing_volumes': missing_volumes, 'missing_market_caps': missing_market_caps, 'time_gaps': len(time_gaps), 'time_gaps_details': time_gaps[:5], # First 5 gaps }, 'suggestions': self._generate_data_quality_suggestions({ 'missing_prices': missing_prices, 'time_gaps': len(time_gaps), 'total_points': len(historical_data), }) } def _generate_data_quality_suggestions(self, metrics: Dict) -> List[str]: """Generate suggestions based on data quality metrics.""" suggestions = [] if metrics['missing_prices'] > 0: suggestions.append(f"Found {metrics['missing_prices']} missing prices. Consider filling gaps.") if metrics['time_gaps'] > 0: suggestions.append(f"Found {metrics['time_gaps']} time gaps. Data may not be continuous.") if metrics['total_points'] < 30: suggestions.append("Less than 30 data points. Consider fetching more data.") if not suggestions: suggestions.append("Data quality looks good!") return suggestions