real_world/fits_ml.pyΒΆ

import hyperopt.hp as hp
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor, RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.svm import SVR
from astropy.io import fits

from astrodata.data import BaseLoader, DataPipeline, AbstractProcessor, RawData
from astrodata.preml import PremlPipeline, TrainTestSplitter
from astrodata.ml.metrics import SklearnMetric
from astrodata.ml.model_selection import HyperOptSelector
from astrodata.ml.models import SklearnModel
from astrodata.tracking.MLFlowTracker import SklearnMLflowTracker
from testdata import download_fits

file_path = download_fits()
SEED = 42
config_path = "./config.yaml"


# First, let's define a custom loader for FITS files
# Astrodata provides loaders for common formats like CSV and Parquet,
# but FITS files require a custom implementation.
class FitsLoader(BaseLoader):
    def load(self, path: str):
        with fits.open(path) as hdul:
            data = hdul[1].data
            df = pd.DataFrame(data.tolist(), columns=data.names)
        return RawData(source=path, format="fits", data=df)


# Then, we define a custom processor to handle FITS data
# This processor will filter out invalid data and prepare the dataset for modeling.
class FitsProcessor(AbstractProcessor):
    def process(self, raw):
        objin = raw.data.shape[0]
        # Let's ingore some columns, identify the target columns, use the rest as features
        ignore = ["specObjID", "objid", "ra", "dec", "targetObjID", "zErr"]
        features = [col for col in raw.data.columns if col not in ignore]
        # Filter out negative values in features
        raw.data = raw.data[(raw.data[features] >= 0).all(axis=1)]
        objout = raw.data.shape[0]
        print("from ", objin, "intial objects we have now", objout)
        print("object discarded:", objin - objout)
        raw.data = raw.data.sample(frac=0.2, random_state=SEED).reset_index(drop=True)
        return raw


loader = FitsLoader()
processor = FitsProcessor()

# Define the data pipeline with the config file, loader and processors.
data_pipeline = DataPipeline(
    config_path=config_path,
    loader=loader,
    processors=[processor],
)
# Run the data pipeline to load and process the FITS data.
# This results in a ProcessedData object, ready for further preprocessing.
data = data_pipeline.run(file_path, dump_output=False)

tts = TrainTestSplitter(
    targets=["z"],
    train_size=0.2,
    random_state=SEED,
)
# Define the PremlPipeline with the TrainTestSplitter processor.
preml_pipeline = PremlPipeline(
    config_path=config_path,
    processors=[tts],
)

# Run the PremlPipeline to split the data into training and testing sets.
preml_data = preml_pipeline.run(data, dump_output=False)

X_train, X_test, y_train, y_test = preml_data.dump_supervised_ML_format()
print(f"Training set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")


tracker = SklearnMLflowTracker(
    run_name="catania_cavuoti_hyperopt",
    experiment_name="catania_cavuoti_hyperopt",
    extra_tags=None,
)

# Define the metrics to be used for evaluation

mse = SklearnMetric(mean_squared_error, greater_is_better=False)
r2score = SklearnMetric(r2_score, greater_is_better=True)
mae = SklearnMetric(mean_absolute_error, greater_is_better=False)

metrics = [mse, r2score, mae]

rfr = SklearnModel(model_class=RandomForestRegressor)
gbr = SklearnModel(model_class=GradientBoostingRegressor)
svr = SklearnModel(model_class=SVR)

models = [rfr, gbr, svr]

param_space = {
    "model": hp.choice("model", models),
}

hos = HyperOptSelector(
    param_space=param_space,
    scorer=r2score,
    use_cv=False,
    random_state=42,
    max_evals=10,
    metrics=None,
    tracker=tracker,
)

hos.fit(X=X_train, y=y_train, X_val=X_test, y_val=y_test)

print("Best parameters found: ", hos.get_best_params())
print("Best metrics: ", hos.get_best_metrics())


tracker.register_best_model(
    metric=r2score,
    split_name="val",
    stage="Production",
)