Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from elasticsearch import Elasticsearch
from elasticsearch.client.utils import query_params
from elasticsearch.exceptions import NotFoundError
from elasticmock.behaviour.server_failure import server_failure
from elasticmock.utilities import get_random_id, get_random_scroll_id
from elasticmock.utilities.decorator import for_all_methods
from elasticmock.fake_indices import FakeIndicesClient
from elasticmock.fake_cluster import FakeClusterClient
PY3 = sys.version_info[0] == 3
if PY3:
unicode = str
@for_all_methods([server_failure])
class FakeElasticsearch(Elasticsearch):
__documents_dict = None
def __init__(self, hosts=None, transport_class=None, **kwargs):
self.__documents_dict = {}
self.__scrolls = {}
@property
def indices(self):
return FakeIndicesClient(self)
@property
def cluster(self):
return FakeClusterClient(self)
@query_params()