Files
bitcoin_monitor/monitor/services/historical_data.py
2026-01-16 22:20:18 +03:00

302 lines
11 KiB
Python

import logging
import time
from datetime import datetime, timezone, timedelta
from typing import List, Dict, Optional
import requests
from django.db import transaction
from django.utils import timezone as django_timezone
from decimal import Decimal
from monitor.models import BitcoinPrice
logger = logging.getLogger(__name__)
class HistoricalDataFetcher:
"""Fetches historical Bitcoin price data."""
def __init__(self):
self.base_url = "https://api.coingecko.com/api/v3"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'BitcoinMonitor/1.0',
'Accept': 'application/json',
})
def fetch_historical_data(self, days: int = 365) -> List[Dict]:
"""
Fetch historical Bitcoin data for specified number of days.
Args:
days: Number of days of historical data to fetch
Returns:
List of price data dictionaries
"""
try:
logger.info(f"Fetching {days} days of historical Bitcoin data...")
url = f"{self.base_url}/coins/bitcoin/market_chart"
params = {
'vs_currency': 'usd',
'days': days,
'interval': 'daily',
}
response = self.session.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
historical_data = []
# Process prices
for price_point in data.get('prices', []):
timestamp = datetime.fromtimestamp(price_point[0] / 1000, timezone.utc)
price = price_point[1]
historical_data.append({
'timestamp': timestamp,
'price_usd': price,
'volume': None,
'market_cap': None,
})
# Add volume data if available
volumes = data.get('total_volumes', [])
for i, (timestamp_ms, volume) in enumerate(volumes):
if i < len(historical_data):
historical_data[i]['volume'] = volume
# Add market cap data if available
market_caps = data.get('market_caps', [])
for i, (timestamp_ms, market_cap) in enumerate(market_caps):
if i < len(historical_data):
historical_data[i]['market_cap'] = market_cap
logger.info(f"Fetched {len(historical_data)} historical price points")
return historical_data
except requests.exceptions.RequestException as e:
logger.error(f"Request error fetching historical data: {e}")
return []
except Exception as e:
logger.error(f"Error fetching historical data: {e}")
return []
def fetch_historical_data_range(self, start_date: datetime, end_date: datetime) -> List[Dict]:
"""
Fetch historical data for a specific date range.
Note: CoinGecko API doesn't support arbitrary date ranges directly,
so we fetch maximum days and filter.
"""
# Calculate days between dates
days_difference = (end_date - start_date).days
# Fetch more data than needed to ensure we have the range
all_data = self.fetch_historical_data(days=days_difference + 100)
# Filter to date range
filtered_data = [
point for point in all_data
if start_date <= point['timestamp'] <= end_date
]
return filtered_data
def save_historical_data(self, historical_data: List[Dict], clear_existing: bool = False) -> Dict:
"""
Save historical data to database.
Args:
historical_data: List of price data dictionaries
clear_existing: Whether to clear existing data before saving
Returns:
Dictionary with statistics about the operation
"""
if not historical_data:
logger.warning("No historical data to save")
return {'saved': 0, 'skipped': 0, 'errors': 0}
try:
with transaction.atomic():
if clear_existing:
deleted_count, _ = BitcoinPrice.objects.all().delete()
logger.info(f"Cleared {deleted_count} existing price records")
saved_count = 0
skipped_count = 0
error_count = 0
for data_point in historical_data:
try:
# Check if price already exists for this timestamp
exists = BitcoinPrice.objects.filter(
timestamp=data_point['timestamp']
).exists()
if exists:
skipped_count += 1
continue
# Create BitcoinPrice object
BitcoinPrice.objects.create(
timestamp=data_point['timestamp'],
price_usd=Decimal(str(data_point['price_usd'])),
volume=Decimal(str(data_point['volume'])) if data_point.get('volume') else None,
market_cap=Decimal(str(data_point['market_cap'])) if data_point.get('market_cap') else None,
)
saved_count += 1
# Log progress every 50 records
if saved_count % 50 == 0:
logger.info(f"Saved {saved_count} historical records...")
except Exception as e:
error_count += 1
logger.error(f"Error saving data point {data_point.get('timestamp')}: {e}")
logger.info(f"Historical data saved: {saved_count} new, {skipped_count} existing, {error_count} errors")
return {
'saved': saved_count,
'skipped': skipped_count,
'errors': error_count,
'total': len(historical_data),
}
except Exception as e:
logger.error(f"Transaction error saving historical data: {e}")
return {'saved': 0, 'skipped': 0, 'errors': len(historical_data), 'total': len(historical_data)}
def generate_test_data(self, days: int = 30, base_price: float = 45000) -> List[Dict]:
"""
Generate synthetic test data for development.
Args:
days: Number of days of test data
base_price: Base price for the data
Returns:
List of synthetic price data
"""
import random
from datetime import timedelta
logger.info(f"Generating {days} days of synthetic test data...")
test_data = []
now = django_timezone.now()
for i in range(days * 24): # Generate hourly data
timestamp = now - timedelta(hours=i)
# Create realistic price fluctuations (±5%)
variation = random.uniform(0.95, 1.05)
price = base_price * variation
# Generate volume and market cap with some randomness
volume = random.uniform(20000000000, 40000000000)
market_cap = random.uniform(800000000000, 900000000000)
test_data.append({
'timestamp': timestamp,
'price_usd': round(price, 2),
'volume': round(volume, 2),
'market_cap': round(market_cap, 2),
})
# Reverse to have chronological order
test_data.reverse()
logger.info(f"Generated {len(test_data)} synthetic data points")
return test_data
def analyze_historical_data_quality(self, historical_data: List[Dict]) -> Dict:
"""
Analyze the quality of historical data.
Args:
historical_data: List of price data dictionaries
Returns:
Dictionary with quality metrics
"""
if not historical_data:
return {'error': 'No data to analyze'}
# Sort by timestamp
sorted_data = sorted(historical_data, key=lambda x: x['timestamp'])
timestamps = [d['timestamp'] for d in sorted_data]
prices = [d['price_usd'] for d in sorted_data]
# Calculate metrics
min_price = min(prices)
max_price = max(prices)
avg_price = sum(prices) / len(prices)
# Check for gaps in timestamps
time_gaps = []
for i in range(1, len(timestamps)):
gap = (timestamps[i] - timestamps[i-1]).total_seconds() / 3600 # hours
if gap > 24: # More than 1 day gap
time_gaps.append({
'from': timestamps[i-1],
'to': timestamps[i],
'gap_hours': gap,
})
# Check for missing values
missing_prices = sum(1 for d in sorted_data if d['price_usd'] is None)
missing_volumes = sum(1 for d in sorted_data if d.get('volume') is None)
missing_market_caps = sum(1 for d in sorted_data if d.get('market_cap') is None)
return {
'total_points': len(historical_data),
'date_range': {
'start': timestamps[0],
'end': timestamps[-1],
'days': (timestamps[-1] - timestamps[0]).days,
},
'price_stats': {
'min': min_price,
'max': max_price,
'average': avg_price,
'range_percent': ((max_price - min_price) / min_price * 100),
},
'data_quality': {
'missing_prices': missing_prices,
'missing_volumes': missing_volumes,
'missing_market_caps': missing_market_caps,
'time_gaps': len(time_gaps),
'time_gaps_details': time_gaps[:5], # First 5 gaps
},
'suggestions': self._generate_data_quality_suggestions({
'missing_prices': missing_prices,
'time_gaps': len(time_gaps),
'total_points': len(historical_data),
})
}
def _generate_data_quality_suggestions(self, metrics: Dict) -> List[str]:
"""Generate suggestions based on data quality metrics."""
suggestions = []
if metrics['missing_prices'] > 0:
suggestions.append(f"Found {metrics['missing_prices']} missing prices. Consider filling gaps.")
if metrics['time_gaps'] > 0:
suggestions.append(f"Found {metrics['time_gaps']} time gaps. Data may not be continuous.")
if metrics['total_points'] < 30:
suggestions.append("Less than 30 data points. Consider fetching more data.")
if not suggestions:
suggestions.append("Data quality looks good!")
return suggestions