Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_degradation_classical_decomposition(self):
''' Test degradation with classical decomposition. '''
funcName = sys._getframe().f_code.co_name
print '\r', 'Running ', funcName
# test classical decomposition degradation calc
for input_freq in self.list_CD_input_freq:
print 'Frequency: ', input_freq
rd_result = degradation_classical_decomposition(self.test_corr_energy[input_freq])
self.assertAlmostEqual(rd_result[0], 100 * self.rd, places=1)
print 'Actual: ', 100 * self.rd
print 'Estimated: ', rd_result[0]
# TODO
def test_degradation_classical_decomposition(self):
''' Test degradation with classical decomposition. '''
funcName = sys._getframe().f_code.co_name
print '\r', 'Running ', funcName
# test classical decomposition degradation calc
print 'Frequency: ', self.input_freq
rd_result = degradation_classical_decomposition(self.test_CD_YOY_corr_energy)
self.assertAlmostEqual(rd_result[0], 100*self.rd, places=1)
print 'Actual: ', 100*self.rd
print 'Estimated: ', rd_result[0]
def test_degradation_with_ols(self):
''' Test degradation with ols. '''
funcName = sys._getframe().f_code.co_name
print '\r', 'Running ', funcName
# test ols degradation calc
print 'Frequency: ', self.input_freq
rd_result = degradation_ols(self.test_ols_corr_energy)
self.assertAlmostEqual(rd_result[0], 100*self.rd, places=1)
print 'Actual: ', 100*self.rd
print 'Estimated: ', rd_result[0]
def test_degradation_with_ols(self):
''' Test degradation with ols. '''
funcName = sys._getframe().f_code.co_name
print '\r', 'Running ', funcName
# test ols degradation calc
for input_freq in self.list_ols_input_freq:
print 'Frequency: ', input_freq
rd_result = degradation_ols(self.test_corr_energy[input_freq])
self.assertAlmostEqual(rd_result[0], 100 * self.rd, places=1)
print 'Actual: ', 100 * self.rd
print 'Estimated: ', rd_result[0]
def test_degradation_year_on_year(self):
''' Test degradation with year on year approach. '''
funcName = sys._getframe().f_code.co_name
print '\r', 'Running ', funcName
# test classical decomposition degradation calc
print 'Frequency: ', self.input_freq
rd_result = degradation_year_on_year(self.test_CD_YOY_corr_energy)
self.assertAlmostEqual(rd_result[0], 100*self.rd, places=1)
print 'Actual: ', 100*self.rd
print 'Estimated: ', rd_result[0]
def test_degradation_year_on_year(self):
''' Test degradation with year on year approach. '''
funcName = sys._getframe().f_code.co_name
print '\r', 'Running ', funcName
# test YOY degradation calc
for input_freq in self.list_YOY_input_freq:
print 'Frequency: ', input_freq
rd_result = degradation_year_on_year(self.test_corr_energy[input_freq])
self.assertAlmostEqual(rd_result[0], 100 * self.rd, places=1)
print 'Actual: ', 100 * self.rd
print 'Estimated: ', rd_result[0]
def irradiance_rescale(clearsky_poa_unscaled, poa, rescale_poa, method):
if rescale_poa:
clearsky_poa = normalization.irradiance_rescale(
poa, clearsky_poa_unscaled, method=method
)
else:
clearsky_poa = clearsky_poa_unscaled
return clearsky_poa
def power_to_energy(signal, max_timedelta):
return normalization.energy_from_power(signal, max_timedelta=max_timedelta)
interpolation_frequency = init.get('interpolation_frequency', None)
if interpolation_frequency is not None:
max_timedelta = init['max_timedelta']
interpolation_keys = [
'pv', 'poa', 'ghi', 'dni', 'dhi', 'ambient_temperature',
'windspeed'
]
for key in ['pv_tilt', 'pv_azimuth']:
if isinstance(init[key], (pd.Series, pd.DataFrame)):
interpolation_keys.append(key)
for key in interpolation_keys:
if key not in init:
continue
values = init[key]
interpolated = normalization.interpolate(
values, interpolation_frequency, max_timedelta
)
init[key] = interpolated
common_graph = {
'pvlib_location': (
pvlib.location.Location, 'latitude', 'longitude'
),
'times': (
lambda series: series.index, 'pv'
),
'pv_energy': (
power_to_energy, 'pv', 'max_timedelta'
),
'solar_position': (
lambda loc, times: loc.get_solarposition(times),
def aggregate(normalized, insolation, overall_filter,
aggregation_frequency):
norm = normalized[overall_filter]
insol = insolation[overall_filter]
aggregated = aggregation.aggregation_insol(norm, insol,
aggregation_frequency)
aggregated_insolation = insol.resample(aggregation_frequency).sum()
return aggregated, aggregated_insolation