timesead.data.preprocessing.exathlon

Attributes

APP_IDS

TRACE_TYPES

ANOMALY_TYPES

Functions

extract_binary_ranges_ids(y)

Returns the start and (excluded) end indices of all contiguous ranges of 1s in the binary array y.

load_trace(trace_path)

Loads a Spark trace as a pd.DataFrame from its full input path.

load_labels(→ pandas.DataFrame)

load_raw_data(data_path[, app_id, trace_types])

Spark-specific raw data loading.

add_anomaly_column(period_dfs, labels, periods_info[, ...])

Spark-specific Anomaly column extension.

get_handled_nans(period_dfs)

Spark-specific handling of NaN values.

get_handled_executor_features(period_dfs)

Returns the period DataFrames with "handled" executor features.

get_handled_os_features(period_dfs)

Returns the period DataFrames with "handled" OS features.

handle_missing_values(period_dfs)

Returns the period DataFrames with "handled" OS features.

add_executors_avg(period_df, original_treatment)

Adds executor features averaged across active executors, keeping or not the original ones.

add_nodes_avg(period_df, original_treatment)

Adds node features averaged across nodes, keeping or not the original ones.

add_differencing(period_df, diff_factor_str, ...)

Adds features differences, either keeping or dropping the original ones.

get_resampled(period_dfs, sampling_period[, agg, ...])

Returns the period DataFrames resampled to sampling_period using the provided aggregation function.

preprocess_exathlon_data(raw_data_dir, out_data_dir[, ...])

Preprocess Exathlon dataset for experiments

Module Contents

timesead.data.preprocessing.exathlon.APP_IDS
timesead.data.preprocessing.exathlon.TRACE_TYPES = ('undisturbed', 'bursty_input', 'bursty_input_crash', 'stalled_input', 'cpu_contention',...
timesead.data.preprocessing.exathlon.ANOMALY_TYPES = ('bursty_input', 'bursty_input_crash', 'stalled_input', 'cpu_contention', 'driver_failure',...
timesead.data.preprocessing.exathlon.extract_binary_ranges_ids(y)

Returns the start and (excluded) end indices of all contiguous ranges of 1s in the binary array y.

Parameters:

y (ndarray) – 1d-array of binary elements.

Returns:

array of start, end indices: [[start_1, end_1], [start_2, end_2], …].

Return type:

ndarray

timesead.data.preprocessing.exathlon.load_trace(trace_path)

Loads a Spark trace as a pd.DataFrame from its full input path.

Parameters:

trace_path (str) – full path of the trace to load (with file extension).

Returns:

the trace indexed by time, with columns processed to be consistent between traces.

Return type:

pd.DataFrame

timesead.data.preprocessing.exathlon.load_labels(data_path: str) pandas.DataFrame
Parameters:

data_path (str)

Return type:

pandas.DataFrame

timesead.data.preprocessing.exathlon.load_raw_data(data_path: str, app_id: int = 0, trace_types: List[str] = TRACE_TYPES)

Spark-specific raw data loading. - The loaded periods as the application(s) traces. - The labels as the ground-truth table gathering events information for disturbed traces. - The periods information as lists initialized in the form [file_name, trace_type].

Parameters:
  • data_path (str)

  • app_id (int)

  • trace_types (List[str])

timesead.data.preprocessing.exathlon.add_anomaly_column(period_dfs, labels, periods_info, ignored_anomalies: str = 'none')

Spark-specific Anomaly column extension. Note: periods_info is assumed to be of the form [file_name, trace_type]. Anomaly will be set to 0 if the record is outside any anomaly range, otherwise it will be set to another value depending on the range type (as defined by utils.spark.ANOMALY_TYPES). => The label for a given range type corresponds to its index in the ANOMALY_TYPES list +1.

Parameters:

ignored_anomalies (str)

timesead.data.preprocessing.exathlon.get_handled_nans(period_dfs)

Spark-specific handling of NaN values. The only NaN values that were recorded were for inactive executors, found equivalent of them being -1.

timesead.data.preprocessing.exathlon.get_handled_executor_features(period_dfs)

Returns the period DataFrames with “handled” executor features. By looking at the features, we saw that some executor spots sometimes went to -1, presumably because we did not receive data from them within a given delay, without meaning the executors were not active anymore. The goal of this method is to detect such scenarios, in which case all -1 features are replaced with their last valid occurrence. Note: it was checked that if a given feature for an executor spot is -1, then all features from that spot are also -1, except for 1 or 2 records of some traces, which we argue is negligible. => To handle these very few cases, we would explicitly set all features to -1.

Parameters:

period_dfs (list) – the list of input period DataFrames. Assumed without NaNs.

Returns:

the new period DataFrames, with handled executor features.

Return type:

list

timesead.data.preprocessing.exathlon.get_handled_os_features(period_dfs)

Returns the period DataFrames with “handled” OS features. Similarly to some executor features, some OS features might happen to be -1 for no other reason than their real value not being sent fast enough by the monitoring software. This is here true for all encountered -1 values that do not span an entire trace. In such scenarios, we replace -1 features with their last valid value (or their next one if their last is not available).

Parameters:

period_dfs (list) – the list of input period DataFrames. Assumed without NaNs.

Returns:

the new period DataFrames, with handled OS features.

Return type:

list

timesead.data.preprocessing.exathlon.handle_missing_values(period_dfs)

Returns the period DataFrames with “handled” OS features. Similarly to some executor features, some OS features might happen to be -1 for no other reason than their real value not being sent fast enough by the monitoring software. This is here true for all encountered -1 values that do not span an entire trace. In such scenarios, we replace -1 features with their last valid value (or their next one if their last is not available).

Parameters:

period_dfs (list) – the list of input period DataFrames. Assumed without NaNs.

Returns:

the new period DataFrames, with handled OS features.

Return type:

list

timesead.data.preprocessing.exathlon.add_executors_avg(period_df, original_treatment)

Adds executor features averaged across active executors, keeping or not the original ones. An executor is defined as “inactive” for a given feature if the value of the feature for this executor is -1. Note: it was checked that if a given feature for an executor spot is -1, then all features from that spot are also -1, except for 1 or 2 records of some traces, which we argue is negligible. => To handle these few cases, we should have set all the features to -1 (see data.spark_manager.SparkManager.get_handled_executor_features).

timesead.data.preprocessing.exathlon.add_nodes_avg(period_df, original_treatment)

Adds node features averaged across nodes, keeping or not the original ones.

timesead.data.preprocessing.exathlon.add_differencing(period_df, diff_factor_str, original_treatment)

Adds features differences, either keeping or dropping the original ones.

Parameters:
  • period_df (pd.DataFrame) – input period DataFrame.

  • diff_factor_str (str) – differencing factor as a string integer.

  • original_treatment (str) – either keep or drop, specifying what to do with original features.

Returns:

the input DataFrame with differenced features, with or without the original ones.

Return type:

pd.DataFrame

timesead.data.preprocessing.exathlon.get_resampled(period_dfs, sampling_period, agg='mean', anomaly_col=False, pre_sampling_period=None)

Returns the period DataFrames resampled to sampling_period using the provided aggregation function. If anomaly_col is False, periods indices will also be reset to start from a round date, in order to keep them aligned with any subsequently resampled labels.

Parameters:
  • period_dfs (list) – the period DataFrames to resample.

  • sampling_period (str) – the new sampling period, as a valid argument to pd.Timedelta.

  • agg (str) – the aggregation function defining the resampling (e.g. mean, median or max).

  • anomaly_col (bool) – whether the provided DataFrames have an Anomaly column, to resample differently.

  • pre_sampling_period (str|None) – original sampling period of the DataFrames. If None and anomaly_col is False, the original sampling period will be inferred from the first two records of the first DataFrame.

Returns:

the same periods with resampled records.

Return type:

list

timesead.data.preprocessing.exathlon.preprocess_exathlon_data(raw_data_dir: str, out_data_dir: str, app_ids: List[int] = set(APP_IDS) - {7, 8}, subsample: str = None)

Preprocess Exathlon dataset for experiments

Parameters:
  • raw_data_dir (str) – Path to raw data folder in the dataset

  • out_data_dir (str) – Folder to output the processed data

  • app_ids (List[int]) – Which all app data should be processed

  • subsample (str) – The new sampling period for DataFrames. Should be a valid argument to pandas.Timedelta