Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import datetime as dt
import numba as nb
import numpy as np
from randomgen import Xoroshiro128
x = Xoroshiro128()
f = x.ctypes.next_uint32
s = x.ctypes.state
@nb.jit(nopython=True)
def bounded_uint(lb, ub, state):
mask = delta = ub - lb
mask |= mask >> 1
mask |= mask >> 2
mask |= mask >> 4
mask |= mask >> 8
mask |= mask >> 16
val = f(state) & mask
while val > delta:
val = f(state) & mask
import datetime as dt
import numba as nb
import numpy as np
from randomgen import Xoroshiro128
x = Xoroshiro128()
f = x.ctypes.next_uint32
s = x.ctypes.state
@nb.jit(nopython=True)
def bounded_uint(lb, ub, state):
mask = delta = ub - lb
mask |= mask >> 1
mask |= mask >> 2
mask |= mask >> 4
mask |= mask >> 8
mask |= mask >> 16
val = f(state) & mask
while val > delta:
val = f(state) & mask
from randomgen import Xoroshiro128
ffi = FFI()
if os.path.exists("./distributions.dll"):
lib = ffi.dlopen("./distributions.dll")
elif os.path.exists("./libdistributions.so"):
lib = ffi.dlopen("./libdistributions.so")
else:
raise RuntimeError("Required DLL/so file was not found.")
ffi.cdef(
"""
double random_gauss_zig(void *bitgen_state);
"""
)
x = Xoroshiro128()
xffi = x.cffi
bit_generator = xffi.bit_generator
random_gauss_zig = lib.random_gauss_zig
def normals(n, bit_generator):
out = np.empty(n)
for i in range(n):
out[i] = random_gauss_zig(bit_generator)
return out
normalsj = nb.jit(normals, nopython=True)
# Numba requires a memory address for void *
"SFC64",
"Xorshift1024",
"ThreeFry",
]
if prng not in allowed_prngs:
raise ValueError(f"prng={prng} is not in {allowed_prngs}")
arr = np.zeros((size, size))
for i in range(size):
if prng == "python":
random.seed(i)
elif prng == "numpy":
np.random.seed(i)
elif prng == "java":
rnd = javarandom.Random(i)
elif prng == "Xoroshiro128":
rnd = RandomGenerator(Xoroshiro128())
elif prng == "Xorshift1024":
rnd = RandomGenerator(Xorshift1024())
elif prng == "ThreeFry":
rnd = RandomGenerator(ThreeFry())
elif prng == "MT19937":
rnd = Generator(MT19937())
elif prng == "Philox":
rnd = Generator(Philox())
elif prng == "SFC64":
rnd = Generator(SFC64())
for j in range(size):
if prng == "python":
random_number = random.random()
elif prng == "numpy":
random_number = np.random.random()
def sample_std_normal(thread_id, shape, dtype):
# create a thread-specifc RNG
rng = randomgen.RandomGenerator(
randomgen.Xoroshiro128(seed=20 + thread_id)
)
t = threading.currentThread()
while getattr(t, "do_run", True):
rnd_normal = rng.standard_normal(size=shape, dtype=dtype)
rnd_normal_queue.put(rnd_normal)
def sample_std_normal(thread_id, shape, dtype):
# create a thread-specifc RNG
rng = randomgen.RandomGenerator(
randomgen.Xoroshiro128(seed=20 + thread_id)
)
t = threading.currentThread()
while getattr(t, "do_run", True):
rnd_normal = rng.standard_normal(size=shape, dtype=dtype)
rnd_normal_queue.put(rnd_normal)