Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_engine(alias='default'):
if alias not in Cache.engines:
engine_string = get_engine_string(alias)
# we have to use autocommit=True, because SQLAlchemy
# is not aware of Django transactions
kw = {}
if engine_string == 'sqlite3':
kw['native_datetime'] = True
pool = DjangoPool(alias=alias, creator=None)
Cache.engines[alias] = create_engine(get_connection_string(alias),
pool=pool, **kw)
return Cache.engines[alias]
def recreate(self):
self.logger.info("Pool recreating")
return DjangoPool(
self.alias,
self._creator,
recycle=self._recycle,
echo=self.echo,
logging_name=self._orig_logging_name,
use_threadlocal=self._use_threadlocal
)
def __init__(self, alias, *args, **kwargs):
super(DjangoPool, self).__init__(*args, **kwargs)
self.alias = alias