Example: Ray backend¶
This example shows how to use the ray backend to train models in a parallel context.
The data used is a synthetic dataset created using sklearn's make_classification function.
Load the data¶
In [1]:
Copied!
# Import packages
import ray
import pandas as pd
from atom import ATOMClassifier
from sklearn.datasets import make_classification
# Import packages
import ray
import pandas as pd
from atom import ATOMClassifier
from sklearn.datasets import make_classification
In [2]:
Copied!
# Use a small dataset for illustration purposes
X, y = make_classification(n_samples=10000, n_features=10, random_state=1)
# Use a small dataset for illustration purposes
X, y = make_classification(n_samples=10000, n_features=10, random_state=1)
Run the pipeline¶
In [3]:
Copied!
# Note we already specify the number of cores for parallel execution here
atom = ATOMClassifier(X, y, n_jobs=2, backend="ray", verbose=2, random_state=1)
# Note we already specify the number of cores for parallel execution here
atom = ATOMClassifier(X, y, n_jobs=2, backend="ray", verbose=2, random_state=1)
2023-03-04 17:56:42,979 INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
<< ================== ATOM ================== >> Algorithm task: binary classification. Parallel processing with 2 cores. Parallelization backend: ray Dataset stats ==================== >> Shape: (10000, 11) Train set size: 8001 Test set size: 2000 ------------------------------------- Memory: 880.13 kB Scaled: True Outlier values: 212 (0.2%)
In [4]:
Copied!
# The ray backend uses modin instead of pandas as data handler
type(atom.dataset)
# The ray backend uses modin instead of pandas as data handler
type(atom.dataset)
Out[4]:
modin.pandas.dataframe.DataFrame
In [5]:
Copied!
# Use data cleaning as usual
atom.scale()
# Use data cleaning as usual
atom.scale()
Fitting Scaler... Scaling features...
In [6]:
Copied!
# Using `parallel=True`, we train one model in each node
# Note that when training in parallel, the verbosity of the models is zero
atom.run(models=["PA", "SGD"], est_params={"max_iter": 150}, parallel=True)
# Using `parallel=True`, we train one model in each node
# Note that when training in parallel, the verbosity of the models is zero
atom.run(models=["PA", "SGD"], est_params={"max_iter": 150}, parallel=True)
Training ========================= >> Models: PA, SGD Metric: f1 Final results ==================== >> Total time: 58.891s ------------------------------------- PassiveAggressive --> f1: 0.8165 StochasticGradientDescent --> f1: 0.8772 !
Analyze the results¶
In [7]:
Copied!
# Notice how the summed time to train the models is less than the total time
atom.plot_results(metric="time_fit")
# Notice how the summed time to train the models is less than the total time
atom.plot_results(metric="time_fit")
In [8]:
Copied!
# Create a rest API endpoint and do inference on the holdout set
atom.pa.serve(port=8002)
# Create a rest API endpoint and do inference on the holdout set
atom.pa.serve(port=8002)
In [9]:
Copied!
import requests
X_predict = atom.X_test.iloc[:10, :]
response = requests.get("http://127.0.0.1:8002/", json=X_predict.to_json())
import requests
X_predict = atom.X_test.iloc[:10, :]
response = requests.get("http://127.0.0.1:8002/", json=X_predict.to_json())
In [10]:
Copied!
response.json()[:10]
response.json()[:10]
Out[10]:
[1, 1, 0, 0, 1, 1, 0, 1, 0, 0]
In [11]:
Copied!
# Don't forget to shut down the ray server
ray.shutdown()
# Don't forget to shut down the ray server
ray.shutdown()