Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import logging
from alephclient.services.entityextract_pb2_grpc import EntityExtractStub
from alephclient.services.entityextract_pb2 import ExtractedEntity
from aleph import settings
from aleph.services import ServiceClientMixin
from aleph.analyze.analyzer import EntityAnalyzer, TextIterator
from aleph.model import DocumentTag, DocumentTagCollector
log = logging.getLogger(__name__)
TYPE = ExtractedEntity.Type.Value
class EntityExtractor(EntityAnalyzer, TextIterator, ServiceClientMixin):
SERVICE = settings.ENTITIES_SERVICE
ORIGIN = 'ner'
TYPES = {
ExtractedEntity.PERSON: DocumentTag.TYPE_PERSON,
ExtractedEntity.ORGANIZATION: DocumentTag.TYPE_ORGANIZATION,
ExtractedEntity.COMPANY: DocumentTag.TYPE_ORGANIZATION,
}
def __init__(self):
self.active = self.has_channel()
def extract(self, collector, document):
DocumentTagCollector(document, 'polyglot').save()
import logging
from alephclient.services.entityextract_pb2_grpc import EntityExtractStub
from alephclient.services.entityextract_pb2 import ExtractedEntity
from aleph import settings
from aleph.services import ServiceClientMixin
from aleph.analyze.analyzer import EntityAnalyzer, TextIterator
from aleph.model import DocumentTag, DocumentTagCollector
log = logging.getLogger(__name__)
TYPE = ExtractedEntity.Type.Value
class EntityExtractor(EntityAnalyzer, TextIterator, ServiceClientMixin):
SERVICE = settings.ENTITIES_SERVICE
ORIGIN = 'ner'
TYPES = {
ExtractedEntity.PERSON: DocumentTag.TYPE_PERSON,
ExtractedEntity.ORGANIZATION: DocumentTag.TYPE_ORGANIZATION,
ExtractedEntity.COMPANY: DocumentTag.TYPE_ORGANIZATION,
}
def __init__(self):
self.active = self.has_channel()
def extract(self, collector, document):
DocumentTagCollector(document, 'polyglot').save()