Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
print(
"velocity perturbations, perpendicular: %g,%g,%g"
% (vp_perp[0], vp_perp[1], vp_perp[2])
)
R_thr = metadata["threshold"]
R_min = metadata["zerovalue"]
num_ensemble_workers = n_ens_members if num_workers > n_ens_members else num_workers
if measure_time:
starttime_init = time.time()
# get methods
extrapolator_method = extrapolation.get_method(extrap_method)
x_values, y_values = np.meshgrid(np.arange(R.shape[2]), np.arange(R.shape[1]))
xy_coords = np.stack([x_values, y_values])
decomp_method = cascade.get_method(decomp_method)
filter_method = cascade.get_method(bandpass_filter_method)
if noise_method is not None:
init_noise, generate_noise = noise.get_method(noise_method)
# advect the previous precipitation fields to the same position with the
# most recent one (i.e. transform them into the Lagrangian coordinates)
R = R[-(ar_order + 1) :, :, :].copy()
extrap_kwargs = extrap_kwargs.copy()
extrap_kwargs["xy_coords"] = xy_coords
res = []
pysteps.extrapolation.interface
"""
_check_inputs(precip, velocity)
if extrap_kwargs is None:
extrap_kwargs = dict()
print("Computing extrapolation nowcast from a "
f"{precip.shape[0]:d}x{precip.shape[1]:d} input grid... ", end="")
if measure_time:
start_time = time.time()
extrapolation_method = extrapolation.get_method(extrap_method)
precip_forecast = extrapolation_method(precip, velocity, num_timesteps,
**extrap_kwargs)
if measure_time:
computation_time = time.time() - start_time
print(f"{computation_time:.2f} seconds.")
if measure_time:
return precip_forecast, computation_time
else:
return precip_forecast
if measure_time:
starttime_init = time.time()
fft = utils.get_method(fft_method, shape=R.shape[1:],
n_threads=num_workers)
M, N = R.shape[1:]
# initialize the band-pass filter
filter_method = cascade.get_method(bandpass_filter_method)
filter = filter_method((M, N), n_cascade_levels, **filter_kwargs)
decomp_method = cascade.get_method(decomp_method)
extrapolator_method = extrapolation.get_method(extrap_method)
R = R[-(ar_order + 1):, :, :].copy()
R_min = R.min()
if conditional:
MASK_thr = np.logical_and.reduce([R[i, :, :] >= R_thr for i in range(R.shape[0])])
else:
MASK_thr = None
# initialize the extrapolator
x_values, y_values = np.meshgrid(np.arange(R.shape[2]),
np.arange(R.shape[1]))
xy_coords = np.stack([x_values, y_values])
extrap_kwargs = extrap_kwargs.copy()
## set NaN equal to zero
R_[~np.isfinite(R_)] = 0.0
## transform to dBR
R_, _ = stp.utils.dB_transform(R_)
# Compute motion field
oflow_method = stp.motion.get_method("lucaskanade")
UV = oflow_method(R_)
# Perform the advection of the radar field
n_lead_times = 12
adv_method = stp.extrapolation.get_method("semilagrangian")
R_fct = adv_method(R_[-1,:,:], UV, n_lead_times, verbose=True)
## transform forecast values back to mm/h
R_fct, _ = stp.utils.dB_transform(R_fct, inverse=True)
# Plot the nowcast...
stp.plt.animate(R, R_fct=R_fct, UV=UV, nloops=5)
else num_workers
if measure_time:
starttime_init = time.time()
fft = utils.get_method(fft_method, shape=R.shape[1:], n_threads=num_workers)
M, N = R.shape[1:]
# initialize the band-pass filter
filter_method = cascade.get_method(bandpass_filter_method)
filter = filter_method((M, N), n_cascade_levels, **filter_kwargs)
decomp_method = cascade.get_method(decomp_method)
extrapolator_method = extrapolation.get_method(extrap_method)
x_values, y_values = np.meshgrid(np.arange(R.shape[2]),
np.arange(R.shape[1]))
xy_coords = np.stack([x_values, y_values])
R = R[-(ar_order + 1):, :, :].copy()
if conditional:
MASK_thr = np.logical_and.reduce([R[i, :, :] >= R_thr for i in range(R.shape[0])])
else:
MASK_thr = None
# advect the previous precipitation fields to the same position with the
# most recent one (i.e. transform them into the Lagrangian coordinates)
extrap_kwargs = extrap_kwargs.copy()
converter = stp.utils.get_method(unit)
R, metadata = converter(R, metadata)
## transform the data
transformer = stp.utils.get_method(transformation)
R, metadata = transformer(R, metadata)
## set NaN equal to zero
R[~np.isfinite(R)] = metadata["zerovalue"]
# Compute motion field
oflow_method = stp.motion.get_method(oflow_method)
UV = oflow_method(R)
# Perform the advection of the radar field
adv_method = stp.extrapolation.get_method(adv_method)
R_fct = adv_method(R[-1,:,:], UV, n_lead_times, verbose=True)
print("The forecast array has size [nleadtimes,nrows,ncols] =", R_fct.shape)
## if necessary, transform back all data
R_fct, _ = transformer(R_fct, metadata, inverse=True)
R, metadata = transformer(R, metadata, inverse=True)
## convert all data to mm/h
converter = stp.utils.get_method("mm/h")
R_fct, _ = converter(R_fct, metadata)
R, metadata = converter(R, metadata)
## plot the nowcast...
R[Rmask] = np.nan # reapply radar mask
stp.plt.animate(R, nloops=2, timestamps=metadata["timestamps"],
R_fct=R_fct, timestep_min=ds.timestep,