import pandas as pd from sklearn.model_selection import train_test_split from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error import schedule
def initialize_csv(): """ This function checks if the CSV data file exists. If it does not, it creates it and writes the headers. """ if not os.path.exists(DATA_FILE):
def fetch_fees(): """ This function requests the current mempool fees from Blockstream. If the request fails, it logs the error and returns None. """ try: response = requests.get(MEMPOOL_URL) response.raise_for_status() return response.json()
except requests.exceptions.RequestException as e:
return handle_request_error(e, 'Error fetching recommended fees')
def fetch_mempool_stats(): """ This function requests the current mempool stats from Blockstream. If the request fails, it logs the error and returns None. """ try: response = requests.get(MEMPOOL_STATS_URL) response.raise_for_status() return response.json()
except requests.exceptions.RequestException as e:
return handle_request_error(e, 'Error fetching mempool stats')
def collect_data(): """ This function collects data from the Blockstream API and appends it to the data list. It then writes the data to the CSV file. """ fees = fetch_fees() mempool_stats = fetch_mempool_stats()
if fees is None or mempool_stats is None: return
timestamp = datetime.now()
data.append({
'timestamp': timestamp,
'fastest_fee': fees['fastestFee'],
'half_hour_fee': fees['halfHourFee'],
'unconfirmed_transactions': mempool_stats['count'],
'vsize': mempool_stats['vsize']
})
df = pd.DataFrame(data)
df.to_csv(DATA_FILE, mode='a', header=False, index=False)
print(f"Data collected and saved at {timestamp}")
def generate_training_data(): """ This function reads the CSV file, checks if there is enough data for training, and then separates the features from the target. """ df = pd.read_csv(DATA_FILE)
if len(df) < 2:
print("Not enough data collected for training")
return None, None
X = df[['unconfirmed_transactions', 'vsize']]
y = df['fastest_fee']
return X, y
def train_model(): """ This function trains a Linear Regression model on the data collected. It splits the data into a training and test set, then fits the model, and finally calculates the MSE. """ X, y = generate_training_data()
if X is None or y is None: return None
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Model trained. MSE: {mse:.2f}")
return model
def make_prediction(model): """ This function uses the trained model to make a prediction on the current mempool stats. """ fees, mempool_stats = fetch_fees(), fetch_mempool_stats()
if model is None or fees is None or mempool_stats is None:
print("Error: Missing data or model")
return
X_new = [[mempool_stats['count'], mempool_stats['vsize']]]
predicted_fee = model.predict(X_new)[0]
print(f"Predicted Fastest Fee (sat/vB): {predicted_fee:.2f}")
print(f"Current Fastest Fee (sat/vB): {fees['fastestFee']}")
def schedule_tasks(): """ This function schedules tasks to collect data and train the model. Data is collected every half hour and the model is trained once a day. """ schedule.every(30).minutes.do(collect_data) schedule.every().day.at('00:00').do(lambda: make_prediction(train_model()))
def start_bot(): """ This function initializes the bot. """ try: initialize_csv() schedule_tasks()
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("Program interrupted")
finally:
if data:
df = pd.DataFrame(data)
df.to_csv(DATA_FILE, mode='a', header=False, index=False)
logging.shutdown()
Python standard librariesPython standard libraries
import requests
import os
import time
from datetime import datetime
import logging
Third-party librariesThird-party libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import schedule
Config loggerConfig logger
logging.basicConfig(filename='regression_model.log', level=logging.INFO)
Program constantsProgram constants
MEMPOOL_URL = "https://blockstream.info/api/mempool/recommended-fees"
MEMPOOL_STATS_URL = "https://blockstream.info/api/mempool"
DATA_FILE = "mempool_data.csv"
List to store data in memoryList to store data in memory
data = []
Helper function to handle request errorsHelper function to handle request errors
def handle_request_error(e, message):
logging.error(message, exc_info=True)
print(message)
return None
def initialize_csv():
"""
This function checks if the CSV data file exists.
If it does not, it creates it and writes the headers.
"""
if not os.path.exists(DATA_FILE):
headers = ['timestamp', 'fastest_fee', 'half_hour_fee', 'unconfirmed_transactions', 'vsize'] df = pd.DataFrame(columns=headers) df.to_csv(DATA_FILE, index=False)def fetch_fees():
"""
This function requests the current mempool fees from Blockstream.
If the request fails, it logs the error and returns None.
"""
try:
response = requests.get(MEMPOOL_URL)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e: return handle_request_error(e, 'Error fetching recommended fees')def fetch_mempool_stats():
"""
This function requests the current mempool stats from Blockstream.
If the request fails, it logs the error and returns None.
"""
try:
response = requests.get(MEMPOOL_STATS_URL)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e: return handle_request_error(e, 'Error fetching mempool stats')def collect_data():
"""
This function collects data from the Blockstream API and appends it to the data list.
It then writes the data to the CSV file.
"""
fees = fetch_fees()
mempool_stats = fetch_mempool_stats()
if fees is None or mempool_stats is None: return timestamp = datetime.now() data.append({ 'timestamp': timestamp, 'fastest_fee': fees['fastestFee'], 'half_hour_fee': fees['halfHourFee'], 'unconfirmed_transactions': mempool_stats['count'], 'vsize': mempool_stats['vsize'] }) df = pd.DataFrame(data) df.to_csv(DATA_FILE, mode='a', header=False, index=False) print(f"Data collected and saved at {timestamp}")def generate_training_data():
"""
This function reads the CSV file, checks if there is enough data for training,
and then separates the features from the target.
"""
df = pd.read_csv(DATA_FILE)
if len(df) < 2: print("Not enough data collected for training") return None, None X = df[['unconfirmed_transactions', 'vsize']] y = df['fastest_fee'] return X, ydef train_model():
"""
This function trains a Linear Regression model on the data collected.
It splits the data into a training and test set, then fits the model, and finally calculates the MSE.
"""
X, y = generate_training_data()
if X is None or y is None: return None X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) model = LinearRegression() model.fit(X_train, y_train) y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) print(f"Model trained. MSE: {mse:.2f}") return modeldef make_prediction(model):
"""
This function uses the trained model to make a prediction on the current mempool stats.
"""
fees, mempool_stats = fetch_fees(), fetch_mempool_stats()
if model is None or fees is None or mempool_stats is None: print("Error: Missing data or model") return X_new = [[mempool_stats['count'], mempool_stats['vsize']]] predicted_fee = model.predict(X_new)[0] print(f"Predicted Fastest Fee (sat/vB): {predicted_fee:.2f}") print(f"Current Fastest Fee (sat/vB): {fees['fastestFee']}")def schedule_tasks():
"""
This function schedules tasks to collect data and train the model.
Data is collected every half hour and the model is trained once a day.
"""
schedule.every(30).minutes.do(collect_data)
schedule.every().day.at('00:00').do(lambda: make_prediction(train_model()))
def start_bot():
"""
This function initializes the bot.
"""
try:
initialize_csv()
schedule_tasks()
while True: schedule.run_pending() time.sleep(1) except KeyboardInterrupt: print("Program interrupted") finally: if data: df = pd.DataFrame(data) df.to_csv(DATA_FILE, mode='a', header=False, index=False) logging.shutdown()if name == "main":
start_bot()