Source code for eemeter.derivatives

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""

   Copyright 2014-2023 OpenEEmeter contributors

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.

"""
from scipy.stats import t

from .caltrack.usage_per_day import CalTRACKUsagePerDayModelResults


__all__ = ("metered_savings", "modeled_savings")


def _compute_ols_error(
    t_stat,
    rmse_base_residuals,
    post_obs,
    base_obs,
    base_avg,
    post_avg,
    base_var,
    nprime,
):
    ols_model_agg_error = (
        (t_stat * rmse_base_residuals * post_obs)
        / (base_obs**0.5)
        * (1.0 + ((base_avg - post_avg) ** 2.0 / base_var)) ** 0.5
    )

    ols_noise_agg_error = (
        t_stat * rmse_base_residuals * (post_obs * base_obs / nprime) ** 0.5
    )

    ols_total_agg_error = (
        ols_model_agg_error**2.0 + ols_noise_agg_error**2.0
    ) ** 0.5

    return ols_total_agg_error, ols_model_agg_error, ols_noise_agg_error


def _compute_fsu_error(
    t_stat,
    interval,
    post_obs,
    total_base_energy,
    rmse_base_residuals,
    base_avg,
    base_obs,
    nprime,
):
    if interval.startswith("billing"):
        a_coeff = -0.00022
        b_coeff = 0.03306
        c_coeff = 0.94054
        months_reporting = float(post_obs)
    else:  # daily
        a_coeff = -0.00024
        b_coeff = 0.03535
        c_coeff = 1.00286
        months_reporting = float(post_obs) / 30.0

    fsu_error_band = total_base_energy * (
        t_stat
        * (a_coeff * months_reporting**2.0 + b_coeff * months_reporting + c_coeff)
        * (rmse_base_residuals / base_avg)
        * ((base_obs / nprime) * (1.0 + (2.0 / nprime)) * (1.0 / post_obs)) ** 0.5
    )

    return fsu_error_band


def _compute_error_bands_metered_savings(
    totals_metrics, results, interval, confidence_level
):
    num_parameters = float(totals_metrics.num_parameters)

    base_obs = float(totals_metrics.observed_length)
    if (interval.startswith("billing")) & (len(results.dropna().index) > 0):
        post_obs = float(round((results.index[-1] - results.index[0]).days / 30.0))
    else:
        post_obs = float(results["reporting_observed"].dropna().shape[0])

    degrees_of_freedom = float(base_obs - num_parameters)
    single_tailed_confidence_level = 1 - ((1 - confidence_level) / 2)
    t_stat = t.ppf(single_tailed_confidence_level, degrees_of_freedom)

    rmse_base_residuals = float(totals_metrics.rmse_adj)
    autocorr_resid = totals_metrics.autocorr_resid

    base_avg = float(totals_metrics.observed_mean)
    post_avg = float(results["reporting_observed"].mean())

    base_var = float(totals_metrics.observed_variance)

    # these result in division by zero error for fsu_error_band
    if (
        post_obs == 0
        or autocorr_resid is None
        or abs(autocorr_resid) == 1
        or base_obs == 0
        or base_avg == 0
        or base_var == 0
    ):
        return None

    autocorr_resid = float(autocorr_resid)

    nprime = float(base_obs * (1 - autocorr_resid) / (1 + autocorr_resid))

    total_base_energy = float(base_avg * base_obs)

    ols_total_agg_error, ols_model_agg_error, ols_noise_agg_error = _compute_ols_error(
        t_stat,
        rmse_base_residuals,
        post_obs,
        base_obs,
        base_avg,
        post_avg,
        base_var,
        nprime,
    )

    fsu_error_band = _compute_fsu_error(
        t_stat,
        interval,
        post_obs,
        total_base_energy,
        rmse_base_residuals,
        base_avg,
        base_obs,
        nprime,
    )

    return {
        "FSU Error Band": fsu_error_band,
        "OLS Error Band": ols_total_agg_error,
        "OLS Error Band: Model Error": ols_model_agg_error,
        "OLS Error Band: Noise": ols_noise_agg_error,
    }


[docs]def metered_savings( baseline_model, reporting_meter_data, temperature_data, with_disaggregated=False, confidence_level=0.90, predict_kwargs=None, ): """Compute metered savings, i.e., savings in which the baseline model is used to calculate the modeled usage in the reporting period. This modeled usage is then compared to the actual usage from the reporting period. Also compute two measures of the uncertainty of the aggregate savings estimate, a fractional savings uncertainty (FSU) error band and an OLS error band. (To convert the FSU error band into FSU, divide by total estimated savings.) Parameters ---------- baseline_model : :any:`eemeter.CalTRACKUsagePerDayModelResults` Object to use for predicting pre-intervention usage. reporting_meter_data : :any:`pandas.DataFrame` The observed reporting period data (totals). Savings will be computed for the periods supplied in the reporting period data. temperature_data : :any:`pandas.Series` Hourly-frequency timeseries of temperature data during the reporting period. with_disaggregated : :any:`bool`, optional If True, calculate baseline counterfactual disaggregated usage estimates. Savings cannot be disaggregated for metered savings. For that, use :any:`eemeter.modeled_savings`. confidence_level : :any:`float`, optional The two-tailed confidence level used to calculate the t-statistic used in calculation of the error bands. Ignored if not computing error bands. predict_kwargs : :any:`dict`, optional Extra kwargs to pass to the baseline_model.predict method. Returns ------- results : :any:`pandas.DataFrame` DataFrame with metered savings, indexed with ``reporting_meter_data.index``. Will include the following columns: - ``counterfactual_usage`` (baseline model projected into reporting period) - ``reporting_observed`` (given by reporting_meter_data) - ``metered_savings`` If `with_disaggregated` is set to True, the following columns will also be in the results DataFrame: - ``counterfactual_base_load`` - ``counterfactual_heating_load`` - ``counterfactual_cooling_load`` error_bands : :any:`dict`, optional If baseline_model is an instance of CalTRACKUsagePerDayModelResults, will also return a dictionary of FSU and OLS error bands for the aggregated energy savings over the post period. """ if predict_kwargs is None: predict_kwargs = {} model_type = None # generic if isinstance(baseline_model, CalTRACKUsagePerDayModelResults): model_type = "usage_per_day" if model_type == "usage_per_day" and with_disaggregated: predict_kwargs["with_disaggregated"] = True prediction_index = reporting_meter_data.index model_prediction = baseline_model.predict( prediction_index, temperature_data, **predict_kwargs ) predicted_baseline_usage = model_prediction.result # CalTrack 3.5.1 counterfactual_usage = predicted_baseline_usage["predicted_usage"].to_frame( "counterfactual_usage" ) reporting_observed = reporting_meter_data["value"].to_frame("reporting_observed") def metered_savings_func(row): return row.counterfactual_usage - row.reporting_observed results = reporting_observed.join(counterfactual_usage).assign( metered_savings=metered_savings_func ) if model_type == "usage_per_day" and with_disaggregated: counterfactual_usage_disaggregated = predicted_baseline_usage[ ["base_load", "heating_load", "cooling_load"] ].rename( columns={ "base_load": "counterfactual_base_load", "heating_load": "counterfactual_heating_load", "cooling_load": "counterfactual_cooling_load", } ) results = results.join(counterfactual_usage_disaggregated) results = results.dropna().reindex(results.index) # carry NaNs # compute t-statistic associated with n degrees of freedom # and a two-tailed confidence level. error_bands = None if model_type == "usage_per_day": # has totals_metrics error_bands = _compute_error_bands_metered_savings( baseline_model.totals_metrics, results, baseline_model.interval, confidence_level, ) return results, error_bands
def _compute_error_bands_modeled_savings( totals_metrics_baseline, totals_metrics_reporting, results, interval_baseline, interval_reporting, confidence_level, ): num_parameters_baseline = float(totals_metrics_baseline.num_parameters) num_parameters_reporting = float(totals_metrics_reporting.num_parameters) base_obs_baseline = float(totals_metrics_baseline.observed_length) base_obs_reporting = float(totals_metrics_reporting.observed_length) if (interval_baseline.startswith("billing")) & (len(results.dropna().index) > 0): post_obs_baseline = float( round((results.index[-1] - results.index[0]).days / 30.0) ) else: post_obs_baseline = float(results["modeled_baseline_usage"].dropna().shape[0]) if (interval_reporting.startswith("billing")) & (len(results.dropna().index) > 0): post_obs_reporting = float( round((results.index[-1] - results.index[0]).days / 30.0) ) else: post_obs_reporting = float(results["modeled_reporting_usage"].dropna().shape[0]) degrees_of_freedom_baseline = float(base_obs_baseline - num_parameters_baseline) degrees_of_freedom_reporting = float(base_obs_reporting - num_parameters_reporting) single_tailed_confidence_level = 1 - ((1 - confidence_level) / 2) t_stat_baseline = t.ppf(single_tailed_confidence_level, degrees_of_freedom_baseline) t_stat_reporting = t.ppf( single_tailed_confidence_level, degrees_of_freedom_reporting ) rmse_base_residuals_baseline = float(totals_metrics_baseline.rmse_adj) rmse_base_residuals_reporting = float(totals_metrics_reporting.rmse_adj) autocorr_resid_baseline = totals_metrics_baseline.autocorr_resid autocorr_resid_reporting = totals_metrics_reporting.autocorr_resid base_avg_baseline = float(totals_metrics_baseline.observed_mean) base_avg_reporting = float(totals_metrics_reporting.observed_mean) # these result in division by zero error for fsu_error_band if ( post_obs_baseline == 0 or autocorr_resid_baseline is None or abs(autocorr_resid_baseline) == 1 or base_obs_baseline == 0 or base_avg_baseline == 0 or post_obs_reporting == 0 or autocorr_resid_reporting is None or abs(autocorr_resid_reporting) == 1 or base_obs_reporting == 0 or base_avg_reporting == 0 ): return None autocorr_resid_baseline = float(autocorr_resid_baseline) autocorr_resid_reporting = float(autocorr_resid_reporting) nprime_baseline = float( base_obs_baseline * (1 - autocorr_resid_baseline) / (1 + autocorr_resid_baseline) ) nprime_reporting = float( base_obs_reporting * (1 - autocorr_resid_reporting) / (1 + autocorr_resid_reporting) ) total_base_energy_baseline = float(base_avg_baseline * base_obs_baseline) total_base_energy_reporting = float(base_avg_reporting * base_obs_reporting) fsu_error_band_baseline = _compute_fsu_error( t_stat_baseline, interval_baseline, post_obs_baseline, total_base_energy_baseline, rmse_base_residuals_baseline, base_avg_baseline, base_obs_baseline, nprime_baseline, ) fsu_error_band_reporting = _compute_fsu_error( t_stat_reporting, interval_reporting, post_obs_reporting, total_base_energy_reporting, rmse_base_residuals_reporting, base_avg_reporting, base_obs_reporting, nprime_reporting, ) return { "FSU Error Band: Baseline": fsu_error_band_baseline, "FSU Error Band: Reporting": fsu_error_band_reporting, "FSU Error Band": ( fsu_error_band_baseline**2.0 + fsu_error_band_reporting**2.0 ) ** 0.5, }
[docs]def modeled_savings( baseline_model, reporting_model, result_index, temperature_data, with_disaggregated=False, confidence_level=0.90, predict_kwargs=None, ): """Compute modeled savings, i.e., savings in which baseline and reporting usage values are based on models. This is appropriate for annualizing or weather normalizing models. Parameters ---------- baseline_model : :any:`eemeter.CalTRACKUsagePerDayCandidateModel` Model to use for predicting pre-intervention usage. reporting_model : :any:`eemeter.CalTRACKUsagePerDayCandidateModel` Model to use for predicting post-intervention usage. result_index : :any:`pandas.DatetimeIndex` The dates for which usage should be modeled. temperature_data : :any:`pandas.Series` Hourly-frequency timeseries of temperature data during the modeled period. with_disaggregated : :any:`bool`, optional If True, calculate modeled disaggregated usage estimates and savings. confidence_level : :any:`float`, optional The two-tailed confidence level used to calculate the t-statistic used in calculation of the error bands. Ignored if not computing error bands. predict_kwargs : :any:`dict`, optional Extra kwargs to pass to the baseline_model.predict and reporting_model.predict methods. Returns ------- results : :any:`pandas.DataFrame` DataFrame with modeled savings, indexed with the result_index. Will include the following columns: - ``modeled_baseline_usage`` - ``modeled_reporting_usage`` - ``modeled_savings`` If `with_disaggregated` is set to True, the following columns will also be in the results DataFrame: - ``modeled_baseline_base_load`` - ``modeled_baseline_cooling_load`` - ``modeled_baseline_heating_load`` - ``modeled_reporting_base_load`` - ``modeled_reporting_cooling_load`` - ``modeled_reporting_heating_load`` - ``modeled_base_load_savings`` - ``modeled_cooling_load_savings`` - ``modeled_heating_load_savings`` error_bands : :any:`dict`, optional If baseline_model and reporting_model are instances of CalTRACKUsagePerDayModelResults, will also return a dictionary of FSU and error bands for the aggregated energy savings over the normal year period. """ prediction_index = result_index if predict_kwargs is None: predict_kwargs = {} model_type = None # generic if isinstance(baseline_model, CalTRACKUsagePerDayModelResults): model_type = "usage_per_day" if model_type == "usage_per_day" and with_disaggregated: predict_kwargs["with_disaggregated"] = True def _predicted_usage(model): model_prediction = model.predict( prediction_index, temperature_data, **predict_kwargs ) predicted_usage = model_prediction.result return predicted_usage predicted_baseline_usage = _predicted_usage(baseline_model) predicted_reporting_usage = _predicted_usage(reporting_model) modeled_baseline_usage = predicted_baseline_usage["predicted_usage"].to_frame( "modeled_baseline_usage" ) modeled_reporting_usage = predicted_reporting_usage["predicted_usage"].to_frame( "modeled_reporting_usage" ) def modeled_savings_func(row): return row.modeled_baseline_usage - row.modeled_reporting_usage results = modeled_baseline_usage.join(modeled_reporting_usage).assign( modeled_savings=modeled_savings_func ) if model_type == "usage_per_day" and with_disaggregated: modeled_baseline_usage_disaggregated = predicted_baseline_usage[ ["base_load", "heating_load", "cooling_load"] ].rename( columns={ "base_load": "modeled_baseline_base_load", "heating_load": "modeled_baseline_heating_load", "cooling_load": "modeled_baseline_cooling_load", } ) modeled_reporting_usage_disaggregated = predicted_reporting_usage[ ["base_load", "heating_load", "cooling_load"] ].rename( columns={ "base_load": "modeled_reporting_base_load", "heating_load": "modeled_reporting_heating_load", "cooling_load": "modeled_reporting_cooling_load", } ) def modeled_base_load_savings_func(row): return row.modeled_baseline_base_load - row.modeled_reporting_base_load def modeled_heating_load_savings_func(row): return ( row.modeled_baseline_heating_load - row.modeled_reporting_heating_load ) def modeled_cooling_load_savings_func(row): return ( row.modeled_baseline_cooling_load - row.modeled_reporting_cooling_load ) results = ( results.join(modeled_baseline_usage_disaggregated) .join(modeled_reporting_usage_disaggregated) .assign( modeled_base_load_savings=modeled_base_load_savings_func, modeled_heating_load_savings=modeled_heating_load_savings_func, modeled_cooling_load_savings=modeled_cooling_load_savings_func, ) ) results = results.dropna().reindex(results.index) # carry NaNs error_bands = None if model_type == "usage_per_day": # has totals_metrics error_bands = _compute_error_bands_modeled_savings( baseline_model.totals_metrics, reporting_model.totals_metrics, results, baseline_model.interval, reporting_model.interval, confidence_level, ) return results, error_bands