302 lines
11 KiB
Python
302 lines
11 KiB
Python
import logging
|
|
import time
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import List, Dict, Optional
|
|
import requests
|
|
from django.db import transaction
|
|
from django.utils import timezone as django_timezone
|
|
from decimal import Decimal
|
|
|
|
from monitor.models import BitcoinPrice
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HistoricalDataFetcher:
|
|
"""Fetches historical Bitcoin price data."""
|
|
|
|
def __init__(self):
|
|
self.base_url = "https://api.coingecko.com/api/v3"
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'BitcoinMonitor/1.0',
|
|
'Accept': 'application/json',
|
|
})
|
|
|
|
def fetch_historical_data(self, days: int = 365) -> List[Dict]:
|
|
"""
|
|
Fetch historical Bitcoin data for specified number of days.
|
|
|
|
Args:
|
|
days: Number of days of historical data to fetch
|
|
|
|
Returns:
|
|
List of price data dictionaries
|
|
"""
|
|
try:
|
|
logger.info(f"Fetching {days} days of historical Bitcoin data...")
|
|
|
|
url = f"{self.base_url}/coins/bitcoin/market_chart"
|
|
params = {
|
|
'vs_currency': 'usd',
|
|
'days': days,
|
|
'interval': 'daily',
|
|
}
|
|
|
|
response = self.session.get(url, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
historical_data = []
|
|
|
|
# Process prices
|
|
for price_point in data.get('prices', []):
|
|
timestamp = datetime.fromtimestamp(price_point[0] / 1000, timezone.utc)
|
|
price = price_point[1]
|
|
|
|
historical_data.append({
|
|
'timestamp': timestamp,
|
|
'price_usd': price,
|
|
'volume': None,
|
|
'market_cap': None,
|
|
})
|
|
|
|
# Add volume data if available
|
|
volumes = data.get('total_volumes', [])
|
|
for i, (timestamp_ms, volume) in enumerate(volumes):
|
|
if i < len(historical_data):
|
|
historical_data[i]['volume'] = volume
|
|
|
|
# Add market cap data if available
|
|
market_caps = data.get('market_caps', [])
|
|
for i, (timestamp_ms, market_cap) in enumerate(market_caps):
|
|
if i < len(historical_data):
|
|
historical_data[i]['market_cap'] = market_cap
|
|
|
|
logger.info(f"Fetched {len(historical_data)} historical price points")
|
|
return historical_data
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error fetching historical data: {e}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error fetching historical data: {e}")
|
|
return []
|
|
|
|
def fetch_historical_data_range(self, start_date: datetime, end_date: datetime) -> List[Dict]:
|
|
"""
|
|
Fetch historical data for a specific date range.
|
|
|
|
Note: CoinGecko API doesn't support arbitrary date ranges directly,
|
|
so we fetch maximum days and filter.
|
|
"""
|
|
# Calculate days between dates
|
|
days_difference = (end_date - start_date).days
|
|
|
|
# Fetch more data than needed to ensure we have the range
|
|
all_data = self.fetch_historical_data(days=days_difference + 100)
|
|
|
|
# Filter to date range
|
|
filtered_data = [
|
|
point for point in all_data
|
|
if start_date <= point['timestamp'] <= end_date
|
|
]
|
|
|
|
return filtered_data
|
|
|
|
def save_historical_data(self, historical_data: List[Dict], clear_existing: bool = False) -> Dict:
|
|
"""
|
|
Save historical data to database.
|
|
|
|
Args:
|
|
historical_data: List of price data dictionaries
|
|
clear_existing: Whether to clear existing data before saving
|
|
|
|
Returns:
|
|
Dictionary with statistics about the operation
|
|
"""
|
|
if not historical_data:
|
|
logger.warning("No historical data to save")
|
|
return {'saved': 0, 'skipped': 0, 'errors': 0}
|
|
|
|
try:
|
|
with transaction.atomic():
|
|
if clear_existing:
|
|
deleted_count, _ = BitcoinPrice.objects.all().delete()
|
|
logger.info(f"Cleared {deleted_count} existing price records")
|
|
|
|
saved_count = 0
|
|
skipped_count = 0
|
|
error_count = 0
|
|
|
|
for data_point in historical_data:
|
|
try:
|
|
# Check if price already exists for this timestamp
|
|
exists = BitcoinPrice.objects.filter(
|
|
timestamp=data_point['timestamp']
|
|
).exists()
|
|
|
|
if exists:
|
|
skipped_count += 1
|
|
continue
|
|
|
|
# Create BitcoinPrice object
|
|
BitcoinPrice.objects.create(
|
|
timestamp=data_point['timestamp'],
|
|
price_usd=Decimal(str(data_point['price_usd'])),
|
|
volume=Decimal(str(data_point['volume'])) if data_point.get('volume') else None,
|
|
market_cap=Decimal(str(data_point['market_cap'])) if data_point.get('market_cap') else None,
|
|
)
|
|
|
|
saved_count += 1
|
|
|
|
# Log progress every 50 records
|
|
if saved_count % 50 == 0:
|
|
logger.info(f"Saved {saved_count} historical records...")
|
|
|
|
except Exception as e:
|
|
error_count += 1
|
|
logger.error(f"Error saving data point {data_point.get('timestamp')}: {e}")
|
|
|
|
logger.info(f"Historical data saved: {saved_count} new, {skipped_count} existing, {error_count} errors")
|
|
|
|
return {
|
|
'saved': saved_count,
|
|
'skipped': skipped_count,
|
|
'errors': error_count,
|
|
'total': len(historical_data),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Transaction error saving historical data: {e}")
|
|
return {'saved': 0, 'skipped': 0, 'errors': len(historical_data), 'total': len(historical_data)}
|
|
|
|
def generate_test_data(self, days: int = 30, base_price: float = 45000) -> List[Dict]:
|
|
"""
|
|
Generate synthetic test data for development.
|
|
|
|
Args:
|
|
days: Number of days of test data
|
|
base_price: Base price for the data
|
|
|
|
Returns:
|
|
List of synthetic price data
|
|
"""
|
|
import random
|
|
from datetime import timedelta
|
|
|
|
logger.info(f"Generating {days} days of synthetic test data...")
|
|
|
|
test_data = []
|
|
now = django_timezone.now()
|
|
|
|
for i in range(days * 24): # Generate hourly data
|
|
timestamp = now - timedelta(hours=i)
|
|
|
|
# Create realistic price fluctuations (±5%)
|
|
variation = random.uniform(0.95, 1.05)
|
|
price = base_price * variation
|
|
|
|
# Generate volume and market cap with some randomness
|
|
volume = random.uniform(20000000000, 40000000000)
|
|
market_cap = random.uniform(800000000000, 900000000000)
|
|
|
|
test_data.append({
|
|
'timestamp': timestamp,
|
|
'price_usd': round(price, 2),
|
|
'volume': round(volume, 2),
|
|
'market_cap': round(market_cap, 2),
|
|
})
|
|
|
|
# Reverse to have chronological order
|
|
test_data.reverse()
|
|
|
|
logger.info(f"Generated {len(test_data)} synthetic data points")
|
|
return test_data
|
|
|
|
def analyze_historical_data_quality(self, historical_data: List[Dict]) -> Dict:
|
|
"""
|
|
Analyze the quality of historical data.
|
|
|
|
Args:
|
|
historical_data: List of price data dictionaries
|
|
|
|
Returns:
|
|
Dictionary with quality metrics
|
|
"""
|
|
if not historical_data:
|
|
return {'error': 'No data to analyze'}
|
|
|
|
# Sort by timestamp
|
|
sorted_data = sorted(historical_data, key=lambda x: x['timestamp'])
|
|
|
|
timestamps = [d['timestamp'] for d in sorted_data]
|
|
prices = [d['price_usd'] for d in sorted_data]
|
|
|
|
# Calculate metrics
|
|
min_price = min(prices)
|
|
max_price = max(prices)
|
|
avg_price = sum(prices) / len(prices)
|
|
|
|
# Check for gaps in timestamps
|
|
time_gaps = []
|
|
for i in range(1, len(timestamps)):
|
|
gap = (timestamps[i] - timestamps[i-1]).total_seconds() / 3600 # hours
|
|
if gap > 24: # More than 1 day gap
|
|
time_gaps.append({
|
|
'from': timestamps[i-1],
|
|
'to': timestamps[i],
|
|
'gap_hours': gap,
|
|
})
|
|
|
|
# Check for missing values
|
|
missing_prices = sum(1 for d in sorted_data if d['price_usd'] is None)
|
|
missing_volumes = sum(1 for d in sorted_data if d.get('volume') is None)
|
|
missing_market_caps = sum(1 for d in sorted_data if d.get('market_cap') is None)
|
|
|
|
return {
|
|
'total_points': len(historical_data),
|
|
'date_range': {
|
|
'start': timestamps[0],
|
|
'end': timestamps[-1],
|
|
'days': (timestamps[-1] - timestamps[0]).days,
|
|
},
|
|
'price_stats': {
|
|
'min': min_price,
|
|
'max': max_price,
|
|
'average': avg_price,
|
|
'range_percent': ((max_price - min_price) / min_price * 100),
|
|
},
|
|
'data_quality': {
|
|
'missing_prices': missing_prices,
|
|
'missing_volumes': missing_volumes,
|
|
'missing_market_caps': missing_market_caps,
|
|
'time_gaps': len(time_gaps),
|
|
'time_gaps_details': time_gaps[:5], # First 5 gaps
|
|
},
|
|
'suggestions': self._generate_data_quality_suggestions({
|
|
'missing_prices': missing_prices,
|
|
'time_gaps': len(time_gaps),
|
|
'total_points': len(historical_data),
|
|
})
|
|
}
|
|
|
|
def _generate_data_quality_suggestions(self, metrics: Dict) -> List[str]:
|
|
"""Generate suggestions based on data quality metrics."""
|
|
suggestions = []
|
|
|
|
if metrics['missing_prices'] > 0:
|
|
suggestions.append(f"Found {metrics['missing_prices']} missing prices. Consider filling gaps.")
|
|
|
|
if metrics['time_gaps'] > 0:
|
|
suggestions.append(f"Found {metrics['time_gaps']} time gaps. Data may not be continuous.")
|
|
|
|
if metrics['total_points'] < 30:
|
|
suggestions.append("Less than 30 data points. Consider fetching more data.")
|
|
|
|
if not suggestions:
|
|
suggestions.append("Data quality looks good!")
|
|
|
|
return suggestions
|