# -*- coding:utf-8 -*-
import time
import collections
import numpy as np
import pandas as pd
import copy
import hashlib
import os
import shutil
import pickle
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from deeptables.preprocessing.transformer import PassThroughEstimator
from .metainfo import CategoricalColumn, ContinuousColumn
from ..preprocessing import MultiLabelEncoder, MultiKBinsDiscretizer, DataFrameWrapper, LgbmLeavesEncoder, \
CategorizeEncoder
from ..utils import dt_logging, consts
from . import deeptable
logger = dt_logging.get_logger()
from .config import ModelConfig
[docs]class AbstractPreprocessor:
def __init__(self, config: ModelConfig):
self.config = config
self.labels_ = None
self.task_ = None
@property
def pos_label(self):
if self.labels_ is not None and len(self.labels_) == 2:
return self.labels_[1]
else:
return None
@property
def labels(self):
return self.labels_
@property
def task(self):
return self.task_
@property
def signature(self):
repr = f'''{self.config.auto_imputation}|
{self.config.auto_encode_label}|
{self.config.auto_discrete}|
{self.config.apply_gbm_features}|
{self.config.task}|
{self.config.cat_exponent}|
{self.config.exclude_columns}|
{self.config.categorical_columns}|
{self.config.auto_categorize}|
{self.config.cat_remain_numeric}|
{self.config.auto_discard_unique}|
{self.config.gbm_params}|
{self.config.gbm_feature_type}|
{self.config.fixed_embedding_dim}|
{self.config.embeddings_output_dim}'''
sign = hashlib.md5(repr.encode('utf-8')).hexdigest()
return sign
[docs] def get_X_y_signature(self, X, y):
repr = ''
if X is not None:
if isinstance(X, list):
repr += f'X len({len(X)})|'
if hasattr(X, 'shape'):
repr += f'X shape{X.shape}|'
if hasattr(X, 'dtypes'):
repr += f'x.dtypes({list(X.dtypes)})|'
if y is not None:
if isinstance(y, list):
repr += f'y len({len(y)})|'
if hasattr(y, 'shape'):
repr += f'y shape{y.shape}|'
if hasattr(y, 'dtype'):
repr += f'y.dtype({y.dtype})|'
sign = hashlib.md5(repr.encode('utf-8')).hexdigest()
return sign
[docs] def get_categorical_columns(self):
raise NotImplementedError
[docs] def get_continuous_columns(self):
raise NotImplementedError
[docs] def save(self, filepath):
raise NotImplementedError
[docs] @staticmethod
def load(filepath):
raise NotImplementedError
[docs]class DefaultPreprocessor(AbstractPreprocessor):
def __init__(self, config: ModelConfig, cache_home=None, use_cache=False):
super().__init__(config)
self.reset()
self.X_types = None
self.y_type = None
self.cache_dir = self._prepare_cache_dir(cache_home)
self.use_cache = use_cache
[docs] def reset(self):
self.metainfo = None
self.categorical_columns = None
self.continuous_columns = None
self.y_lable_encoder = None
self.X_transformers = collections.OrderedDict()
[docs] def prepare_X(self, X):
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
if len(set(X.columns)) != len(list(X.columns)):
cols = [item for item, count in collections.Counter(X.columns).items() if count > 1]
raise ValueError(f'Columns with duplicate names in X: {cols}')
if X.columns.dtype != 'object':
X.columns = ['x_' + str(c) for c in X.columns]
logger.warn(f"Column index of X has been converted: {X.columns}")
return X
def __prepare_features(self, X):
start = time.time()
logger.info(f'Preparing features...')
num_vars = []
convert2cat_vars = []
cat_vars = []
excluded_vars = []
if self.config.cat_exponent >= 1:
raise ValueError(f'"cat_expoent" must be less than 1, not {self.config.cat_exponent} .')
unique_upper_limit = round(X.shape[0] ** self.config.cat_exponent)
for c in X.columns:
nunique = X[c].nunique()
dtype = str(X[c].dtype)
if nunique <= 1 and self.config.auto_discard_unique:
continue
if c in self.config.exclude_columns:
excluded_vars.append((c, dtype, nunique))
continue
if self.config.categorical_columns is not None and isinstance(self.config.categorical_columns, list):
if c in self.config.categorical_columns:
cat_vars.append((c, dtype, nunique))
else:
if np.issubdtype(dtype, np.number):
num_vars.append((c, dtype, nunique))
else:
print(
f'Column [{c}] has been discarded. It is not numeric and not in [config.categorical_columns].')
else:
if dtype == 'object' or dtype == 'category' or dtype == 'bool':
cat_vars.append((c, dtype, nunique))
elif self.config.auto_categorize and nunique < unique_upper_limit:
convert2cat_vars.append((c, dtype, nunique))
else:
num_vars.append((c, dtype, nunique))
if len(convert2cat_vars) > 0:
ce = CategorizeEncoder([c for c, d, n in convert2cat_vars], self.config.cat_remain_numeric)
X = ce.fit_transform(X)
self.X_transformers['categorize'] = ce
if self.config.cat_remain_numeric:
cat_vars = cat_vars + ce.new_columns
num_vars = num_vars + convert2cat_vars
else:
cat_vars = cat_vars + convert2cat_vars
logger.debug(f'{len(cat_vars)} categorical variables and {len(num_vars)} continuous variables found. '
f'{len(convert2cat_vars)} of them are from continuous to categorical.')
self.__append_categorical_cols([(c[0], c[2] + 2) for c in cat_vars])
self.__append_continuous_cols([c[0] for c in num_vars], consts.INPUT_PREFIX_NUM + 'all')
print(f'Preparing features cost:{time.time() - start}')
return X
def _imputation(self, X):
start = time.time()
logger.info('Data imputation...')
continuous_vars = self.get_continuous_columns()
categorical_vars = self.get_categorical_columns()
ct = ColumnTransformer([
('categorical', SimpleImputer(missing_values=np.nan, strategy='constant'),
categorical_vars),
('continuous', SimpleImputer(missing_values=np.nan, strategy='mean'), continuous_vars),
])
dfwrapper = DataFrameWrapper(ct, categorical_vars + continuous_vars)
X = dfwrapper.fit_transform(X)
self.X_transformers['imputation'] = dfwrapper
print(f'Imputation cost:{time.time() - start}')
return X
def _categorical_encoding(self, X):
start = time.time()
logger.info('Categorical encoding...')
vars = self.get_categorical_columns()
mle = MultiLabelEncoder(vars)
X = mle.fit_transform(X)
self.X_transformers['label_encoder'] = mle
print(f'Categorical encoding cost:{time.time() - start}')
return X
def _discretization(self, X):
start = time.time()
logger.info('Data discretization...')
vars = self.get_continuous_columns()
mkbd = MultiKBinsDiscretizer(vars)
X = mkbd.fit_transform(X)
self.__append_categorical_cols([(new_name, bins + 1) for name, new_name, bins in mkbd.new_columns])
self.X_transformers['discreter'] = mkbd
print(f'Discretization cost:{time.time() - start}')
return X
def _apply_gbm_features(self, X, y):
start = time.time()
logger.info('Extracting GBM features...')
cont_vars = self.get_continuous_columns()
cat_vars = self.get_categorical_columns()
gbmencoder = LgbmLeavesEncoder(cat_vars, cont_vars, self.task_, **self.config.gbm_params)
X = gbmencoder.fit_transform(X, y)
self.X_transformers['gbm_features'] = gbmencoder
if self.config.gbm_feature_type == consts.GBM_FEATURE_TYPE_EMB:
self.__append_categorical_cols([(name, X[name].max() + 1) for name in gbmencoder.new_columns])
else:
self.__append_continuous_cols([name for name in gbmencoder.new_columns],
consts.INPUT_PREFIX_NUM + 'gbm_leaves')
print(f'Extracting gbm features cost:{time.time() - start}')
return X
def __append_categorical_cols(self, cols):
logger.debug(f'{len(cols)} categorical variables appended.')
if self.config.fixed_embedding_dim:
embedding_output_dim = self.config.embeddings_output_dim if self.config.embeddings_output_dim > 0 else consts.EMBEDDING_OUT_DIM_DEFAULT
else:
embedding_output_dim = 0
#
if self.categorical_columns is None:
self.categorical_columns = []
self.categorical_columns = self.categorical_columns + \
[CategoricalColumn(name,
voc_size,
embedding_output_dim
if embedding_output_dim > 0
else min(4 * int(pow(voc_size, 0.25)), 20))
for name, voc_size in cols]
def __append_continuous_cols(self, cols, input_name):
if self.continuous_columns is None:
self.continuous_columns = []
self.continuous_columns = self.continuous_columns + [ContinuousColumn(name=input_name,
column_names=[c for c in cols])]
[docs] def get_categorical_columns(self):
return [c.name for c in self.categorical_columns]
[docs] def get_continuous_columns(self):
cont_vars = []
for c in self.continuous_columns:
cont_vars = cont_vars + c.column_names
return cont_vars
def _prepare_cache_dir(self, cache_home, clear_cache=False):
if cache_home is None:
cache_home = 'cache'
if cache_home[-1] == '/':
cache_home = cache_home[:-1]
cache_home = os.path.expanduser(f'{cache_home}')
if not os.path.exists(cache_home):
os.makedirs(cache_home)
else:
if clear_cache:
shutil.rmtree(cache_home)
os.makedirs(cache_home)
cache_dir = f'{cache_home}/{self.signature}'
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
return cache_dir
[docs] def clear_cache(self):
shutil.rmtree(self.cache_dir)
os.makedirs(self.cache_dir)