Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dev_dir = pjoin(split_dir, "dev")
setup_test_corpus(orig_dir, trn_dir, dev_dir, n_exemplars)
orig_corpus = corpus({'location': orig_dir})
split_corpus(
orig_dir,
split_dir=split_dir,
split_name="dev",
split_words=19,
min_split_segs=1,
leftover_data_split_name="train",
rand_seed=1337,
)
# Make sure we didn't destroy input data
final_corpus = corpus({'location': orig_dir})
assert orig_corpus.validate() == 1
assert final_corpus.validate() == 1
orig_hashes = [_.hash() for _ in orig_corpus.exemplars]
final_hashes = [_.hash() for _ in final_corpus.exemplars]
assert all(h in final_hashes for h in orig_hashes)
# Make sure correct number of words present in data split
dev_corpus = corpus({'location': dev_dir})
assert sum(e.count_words() for e in dev_corpus.exemplars) == 20
assert dev_corpus.validate()
def test_split_corpus():
""" Test corpus splitter """
n_exemplars = 10
corpus_dir = "tests/split-corpus"
orig_dir = pjoin(corpus_dir, "orig")
split_dir = pjoin(corpus_dir, "splits")
trn_dir = pjoin(split_dir, "train")
dev_dir = pjoin(split_dir, "dev")
setup_test_corpus(orig_dir, trn_dir, dev_dir, n_exemplars)
orig_corpus = corpus({'location': orig_dir})
split_corpus(
orig_dir,
split_dir=split_dir,
split_name="dev",
split_words=19,
min_split_segs=1,
leftover_data_split_name="train",
rand_seed=1337,
)
# Make sure we didn't destroy input data
final_corpus = corpus({'location': orig_dir})
assert orig_corpus.validate() == 1
assert final_corpus.validate() == 1
orig_hashes = [_.hash() for _ in orig_corpus.exemplars]
final_hashes = [_.hash() for _ in final_corpus.exemplars]
# process audio files concurrently for speed
futures = [
executor.submit(
partial(
_.prepare_for_training,
target=target,
sample_rate=sample_rate,
nested=nested,
)) for _ in self.exemplars
]
# trigger conversion and gather results
new_exemplars = [future.result() for future in tqdm(futures)]
new_corpus = corpus({
"location":
target,
"exemplars": [eg for eg in new_exemplars if eg is not None],
})
new_corpus.validate()
return new_corpus.log()
def __sub__(self, other):
""" Allow addition of corpora via - operator """
return corpus({
"location":
None,
"exemplars":
[_ for _ in self.exemplars if _ not in other.exemplars],
})
def __getitem__(self, given):
""" Allow slicing of corpora via [] """
return corpus({
"location":
self.location,
"exemplars": [self.exemplars[given]]
if not isinstance(given, slice) else self.exemplars[given],
})
def __add__(self, other):
""" Allow addition of corpora via + operator """
return corpus({
"location": None,
"exemplars": self.exemplars + other.exemplars
})
def get_corpus(loc):
""" returns corpus for input location """
return corpus({"location": loc})
Returns the new splits as separate corpora
"""
valid_exemplars, total_words = self.count_exemplar_words()
# Raise error if we inputs are invalid to avoid infinite loop
if split_words < 0 or split_words > total_words:
raise ValueError("cannot split corpus with {} words into split with {} words".format(total_words, split_words))
exemplars_in_split = []
word_counter, seg_counter = 0, 0
while word_counter <= split_words or seg_counter <= min_segments:
exemplars_in_split += [valid_exemplars.pop(random.randrange(len(valid_exemplars)))]
word_counter += exemplars_in_split[-1].n_words
seg_counter += len(exemplars_in_split[-1].transcript_file.segments)
new_corpus = corpus({
"location": self.location,
"exemplars": exemplars_in_split,
})
remaining_corpus = self - new_corpus
remaining_corpus.location = self.location
return remaining_corpus, new_corpus