pull down to refresh

One suggestion that might work for you:

Open Aqua wallet, receive via Liquid and tap the "direct peg in" option. It will serve you up a regular bitcoin address and as soon as the on chain funds confirm it will convert to liquid in your wallet. If you're not a fan of the federated model of liquid, simply send out of Aqua to any lightning wallet you please, and it will create a boltz swap to lightning automatically.

great suggestion

FYI @nichro

reply

Python standard librariesPython standard libraries

import requests
import os
import time
from datetime import datetime
import logging

Third-party librariesThird-party libraries

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import schedule

Config loggerConfig logger

logging.basicConfig(filename='regression_model.log', level=logging.INFO)

Program constantsProgram constants

MEMPOOL_URL = "https://blockstream.info/api/mempool/recommended-fees"
MEMPOOL_STATS_URL = "https://blockstream.info/api/mempool"
DATA_FILE = "mempool_data.csv"

List to store data in memoryList to store data in memory

data = []

Helper function to handle request errorsHelper function to handle request errors

def handle_request_error(e, message):
logging.error(message, exc_info=True)
print(message)
return None

def initialize_csv():
"""
This function checks if the CSV data file exists.
If it does not, it creates it and writes the headers.
"""
if not os.path.exists(DATA_FILE):

    headers = ['timestamp', 'fastest_fee', 'half_hour_fee', 'unconfirmed_transactions', 'vsize']
    df = pd.DataFrame(columns=headers)
    df.to_csv(DATA_FILE, index=False)

def fetch_fees():
"""
This function requests the current mempool fees from Blockstream.
If the request fails, it logs the error and returns None.
"""
try:
response = requests.get(MEMPOOL_URL)
response.raise_for_status()
return response.json()

except requests.exceptions.RequestException as e:
    return handle_request_error(e, 'Error fetching recommended fees')

def fetch_mempool_stats():
"""
This function requests the current mempool stats from Blockstream.
If the request fails, it logs the error and returns None.
"""
try:
response = requests.get(MEMPOOL_STATS_URL)
response.raise_for_status()
return response.json()

except requests.exceptions.RequestException as e:
    return handle_request_error(e, 'Error fetching mempool stats')

def collect_data():
"""
This function collects data from the Blockstream API and appends it to the data list.
It then writes the data to the CSV file.
"""
fees = fetch_fees()
mempool_stats = fetch_mempool_stats()

if fees is None or mempool_stats is None: return

timestamp = datetime.now()
data.append({
    'timestamp': timestamp,
    'fastest_fee': fees['fastestFee'],
    'half_hour_fee': fees['halfHourFee'],
    'unconfirmed_transactions': mempool_stats['count'],
    'vsize': mempool_stats['vsize']
})
df = pd.DataFrame(data)
df.to_csv(DATA_FILE, mode='a', header=False, index=False)
print(f"Data collected and saved at {timestamp}")

def generate_training_data():
"""
This function reads the CSV file, checks if there is enough data for training,
and then separates the features from the target.
"""
df = pd.read_csv(DATA_FILE)

if len(df) < 2:
    print("Not enough data collected for training")
    return None, None

X = df[['unconfirmed_transactions', 'vsize']]
y = df['fastest_fee']

return X, y

def train_model():
"""
This function trains a Linear Regression model on the data collected.
It splits the data into a training and test set, then fits the model, and finally calculates the MSE.
"""
X, y = generate_training_data()

if X is None or y is None: return None

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = LinearRegression()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Model trained. MSE: {mse:.2f}")
return model

def make_prediction(model):
"""
This function uses the trained model to make a prediction on the current mempool stats.
"""
fees, mempool_stats = fetch_fees(), fetch_mempool_stats()

if model is None or fees is None or mempool_stats is None: 
    print("Error: Missing data or model")
    return

X_new = [[mempool_stats['count'], mempool_stats['vsize']]]
predicted_fee = model.predict(X_new)[0]
print(f"Predicted Fastest Fee (sat/vB): {predicted_fee:.2f}")
print(f"Current Fastest Fee (sat/vB): {fees['fastestFee']}")

def schedule_tasks():
"""
This function schedules tasks to collect data and train the model.
Data is collected every half hour and the model is trained once a day.
"""
schedule.every(30).minutes.do(collect_data)
schedule.every().day.at('00:00').do(lambda: make_prediction(train_model()))

def start_bot():
"""
This function initializes the bot.
"""
try:
initialize_csv()
schedule_tasks()

    while True:
        schedule.run_pending()
        time.sleep(1)

except KeyboardInterrupt:       
    
    print("Program interrupted")
     
finally:
    if data:
        df = pd.DataFrame(data)
        df.to_csv(DATA_FILE, mode='a', header=False, index=False)
    logging.shutdown()

if name == "main":
start_bot()