timesead.data.preprocessing.exathlon ==================================== .. py:module:: timesead.data.preprocessing.exathlon Attributes ---------- .. autoapisummary:: timesead.data.preprocessing.exathlon.APP_IDS timesead.data.preprocessing.exathlon.TRACE_TYPES timesead.data.preprocessing.exathlon.ANOMALY_TYPES Functions --------- .. autoapisummary:: timesead.data.preprocessing.exathlon.extract_binary_ranges_ids timesead.data.preprocessing.exathlon.load_trace timesead.data.preprocessing.exathlon.load_labels timesead.data.preprocessing.exathlon.load_raw_data timesead.data.preprocessing.exathlon.add_anomaly_column timesead.data.preprocessing.exathlon.get_handled_nans timesead.data.preprocessing.exathlon.get_handled_executor_features timesead.data.preprocessing.exathlon.get_handled_os_features timesead.data.preprocessing.exathlon.handle_missing_values timesead.data.preprocessing.exathlon.add_executors_avg timesead.data.preprocessing.exathlon.add_nodes_avg timesead.data.preprocessing.exathlon.add_differencing timesead.data.preprocessing.exathlon.get_resampled timesead.data.preprocessing.exathlon.preprocess_exathlon_data Module Contents --------------- .. py:data:: APP_IDS .. py:data:: TRACE_TYPES :value: ('undisturbed', 'bursty_input', 'bursty_input_crash', 'stalled_input', 'cpu_contention',... .. py:data:: ANOMALY_TYPES :value: ('bursty_input', 'bursty_input_crash', 'stalled_input', 'cpu_contention', 'driver_failure',... .. py:function:: extract_binary_ranges_ids(y) Returns the start and (excluded) end indices of all contiguous ranges of 1s in the binary array `y`. :param y: 1d-array of binary elements. :type y: ndarray :returns: array of `start, end` indices: `[[start_1, end_1], [start_2, end_2], ...]`. :rtype: ndarray .. py:function:: load_trace(trace_path) Loads a Spark trace as a pd.DataFrame from its full input path. :param trace_path: full path of the trace to load (with file extension). :type trace_path: str :returns: the trace indexed by time, with columns processed to be consistent between traces. :rtype: pd.DataFrame .. py:function:: load_labels(data_path: str) -> pandas.DataFrame .. py:function:: load_raw_data(data_path: str, app_id: int = 0, trace_types: List[str] = TRACE_TYPES) Spark-specific raw data loading. - The loaded periods as the application(s) traces. - The labels as the ground-truth table gathering events information for disturbed traces. - The periods information as lists initialized in the form `[file_name, trace_type]`. .. py:function:: add_anomaly_column(period_dfs, labels, periods_info, ignored_anomalies: str = 'none') Spark-specific `Anomaly` column extension. Note: `periods_info` is assumed to be of the form `[file_name, trace_type]`. `Anomaly` will be set to 0 if the record is outside any anomaly range, otherwise it will be set to another value depending on the range type (as defined by utils.spark.ANOMALY_TYPES). => The label for a given range type corresponds to its index in the ANOMALY_TYPES list +1. .. py:function:: get_handled_nans(period_dfs) Spark-specific handling of NaN values. The only NaN values that were recorded were for inactive executors, found equivalent of them being -1. .. py:function:: get_handled_executor_features(period_dfs) Returns the period DataFrames with "handled" executor features. By looking at the features, we saw that some executor spots sometimes went to -1, presumably because we did not receive data from them within a given delay, without meaning the executors were not active anymore. The goal of this method is to detect such scenarios, in which case all -1 features are replaced with their last valid occurrence. Note: it was checked that if a given feature for an executor spot is -1, then all features from that spot are also -1, except for 1 or 2 records of some traces, which we argue is negligible. => To handle these very few cases, we would explicitly set all features to -1. :param period_dfs: the list of input period DataFrames. Assumed without NaNs. :type period_dfs: list :returns: the new period DataFrames, with handled executor features. :rtype: list .. py:function:: get_handled_os_features(period_dfs) Returns the period DataFrames with "handled" OS features. Similarly to some executor features, some OS features might happen to be -1 for no other reason than their real value not being sent fast enough by the monitoring software. This is here true for all encountered -1 values that do not span an entire trace. In such scenarios, we replace -1 features with their last valid value (or their next one if their last is not available). :param period_dfs: the list of input period DataFrames. Assumed without NaNs. :type period_dfs: list :returns: the new period DataFrames, with handled OS features. :rtype: list .. py:function:: handle_missing_values(period_dfs) Returns the period DataFrames with "handled" OS features. Similarly to some executor features, some OS features might happen to be -1 for no other reason than their real value not being sent fast enough by the monitoring software. This is here true for all encountered -1 values that do not span an entire trace. In such scenarios, we replace -1 features with their last valid value (or their next one if their last is not available). :param period_dfs: the list of input period DataFrames. Assumed without NaNs. :type period_dfs: list :returns: the new period DataFrames, with handled OS features. :rtype: list .. py:function:: add_executors_avg(period_df, original_treatment) Adds executor features averaged across active executors, keeping or not the original ones. An executor is defined as "inactive" for a given feature if the value of the feature for this executor is -1. Note: it was checked that if a given feature for an executor spot is -1, then all features from that spot are also -1, except for 1 or 2 records of some traces, which we argue is negligible. => To handle these few cases, we should have set all the features to -1 (see `data.spark_manager.SparkManager.get_handled_executor_features`). .. py:function:: add_nodes_avg(period_df, original_treatment) Adds node features averaged across nodes, keeping or not the original ones. .. py:function:: add_differencing(period_df, diff_factor_str, original_treatment) Adds features differences, either keeping or dropping the original ones. :param period_df: input period DataFrame. :type period_df: pd.DataFrame :param diff_factor_str: differencing factor as a string integer. :type diff_factor_str: str :param original_treatment: either `keep` or `drop`, specifying what to do with original features. :type original_treatment: str :returns: the input DataFrame with differenced features, with or without the original ones. :rtype: pd.DataFrame .. py:function:: get_resampled(period_dfs, sampling_period, agg='mean', anomaly_col=False, pre_sampling_period=None) Returns the period DataFrames resampled to `sampling_period` using the provided aggregation function. If `anomaly_col` is False, periods indices will also be reset to start from a round date, in order to keep them aligned with any subsequently resampled labels. :param period_dfs: the period DataFrames to resample. :type period_dfs: list :param sampling_period: the new sampling period, as a valid argument to `pd.Timedelta`. :type sampling_period: str :param agg: the aggregation function defining the resampling (e.g. `mean`, `median` or `max`). :type agg: str :param anomaly_col: whether the provided DataFrames have an `Anomaly` column, to resample differently. :type anomaly_col: bool :param pre_sampling_period: original sampling period of the DataFrames. If None and `anomaly_col` is False, the original sampling period will be inferred from the first two records of the first DataFrame. :type pre_sampling_period: str|None :returns: the same periods with resampled records. :rtype: list .. py:function:: preprocess_exathlon_data(raw_data_dir: str, out_data_dir: str, app_ids: List[int] = set(APP_IDS) - {7, 8}, subsample: str = None) Preprocess Exathlon dataset for experiments :param raw_data_dir: Path to raw data folder in the dataset :param out_data_dir: Folder to output the processed data :param app_ids: Which all app data should be processed :param subsample: The new sampling period for DataFrames. Should be a valid argument to :class:`pandas.Timedelta`