Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
type=int, nargs='?', const=0,
help='GPU ID (negative value indicates CPU)')
args = parser.parse_args()
device = chainer.get_device(args.device)
xp = device.xp
device.use()
model, vocab = memnn.load_model(args.MODEL)
model.to_device(device)
network = model.predictor
max_memory = network.max_memory
id_to_vocab = {i: v for v, i in vocab.items()}
test_data = babi.read_data(vocab, args.DATA)
print('Test data: %s: %d' % (args.DATA, len(test_data)))
sentence_len = max(max(len(s.sentence) for s in story)
for story in test_data)
correct = total = 0
for story in test_data:
mem = xp.zeros((max_memory, sentence_len), dtype=numpy.int32)
i = 0
for sent in story:
if isinstance(sent, babi.Sentence):
if i == max_memory:
mem[0:i - 1, :] = mem[1:i, :]
i -= 1
mem[i, 0:len(sent.sentence)] = xp.asarray(sent.sentence)
i += 1
elif isinstance(sent, babi.Query):
print('Test data: %s: %d' % (args.DATA, len(test_data)))
sentence_len = max(max(len(s.sentence) for s in story)
for story in test_data)
correct = total = 0
for story in test_data:
mem = xp.zeros((max_memory, sentence_len), dtype=numpy.int32)
i = 0
for sent in story:
if isinstance(sent, babi.Sentence):
if i == max_memory:
mem[0:i - 1, :] = mem[1:i, :]
i -= 1
mem[i, 0:len(sent.sentence)] = xp.asarray(sent.sentence)
i += 1
elif isinstance(sent, babi.Query):
query = xp.array(sent.sentence, dtype=numpy.int32)
# networks assumes mini-batch data
score = network(mem[None], query[None])[0]
answer = int(xp.argmax(score.array))
if answer == sent.answer:
correct += 1
total += 1
print(id_to_vocab[answer], id_to_vocab[sent.answer])
accuracy = float(correct) / total
print('Accuracy: %.2f%%' % (accuracy * 100))
network = model.predictor
max_memory = network.max_memory
id_to_vocab = {i: v for v, i in vocab.items()}
test_data = babi.read_data(vocab, args.DATA)
print('Test data: %s: %d' % (args.DATA, len(test_data)))
sentence_len = max(max(len(s.sentence) for s in story)
for story in test_data)
correct = total = 0
for story in test_data:
mem = xp.zeros((max_memory, sentence_len), dtype=numpy.int32)
i = 0
for sent in story:
if isinstance(sent, babi.Sentence):
if i == max_memory:
mem[0:i - 1, :] = mem[1:i, :]
i -= 1
mem[i, 0:len(sent.sentence)] = xp.asarray(sent.sentence)
i += 1
elif isinstance(sent, babi.Query):
query = xp.array(sent.sentence, dtype=numpy.int32)
# networks assumes mini-batch data
score = network(mem[None], query[None])[0]
answer = int(xp.argmax(score.array))
if answer == sent.answer:
correct += 1
total += 1
print(id_to_vocab[answer], id_to_vocab[sent.answer])
help='Maximum number of memory')
parser.add_argument('--sentence-repr',
choices=['bow', 'pe'], default='bow',
help='Sentence representation. '
'Select from BoW ("bow") or position encoding ("pe")')
args = parser.parse_args()
vocab = collections.defaultdict(lambda: len(vocab))
vocab[''] = 0
for data_id in six.moves.range(1, 21):
train_data = babi.read_data(
vocab,
glob.glob('%s/qa%d_*train.txt' % (args.data, data_id))[0])
test_data = babi.read_data(
vocab,
glob.glob('%s/qa%d_*test.txt' % (args.data, data_id))[0])
print('Training data: %d' % len(train_data))
train_data = convert_data(train_data, args.max_memory)
test_data = convert_data(test_data, args.max_memory)
if args.sentence_repr == 'bow':
encoder = BoWEncoder()
elif args.sentence_repr == 'pe':
encoder = PositionEncoder()
else:
print('Unknonw --sentence-repr option: "%s"' % args.sentence_repr)
sys.exit(1)
memnn = MemNN(args.unit, len(vocab), encoder, args.max_memory, args.hop)
def parse_line(vocab, line):
if '\t' in line:
# question line
question, answer, fact_id = line.split('\t')
aid = convert(vocab, [answer])[0]
words = split(question)
wid = convert(vocab, words)
ids = list(map(int, fact_id.split(' ')))
return Query(wid, aid, ids)
else:
# sentence line
words = split(line)
wid = convert(vocab, words)
return Sentence(wid)
def parse_line(vocab, line):
if '\t' in line:
# question line
question, answer, fact_id = line.split('\t')
aid = convert(vocab, [answer])[0]
words = split(question)
wid = convert(vocab, words)
ids = list(map(int, fact_id.split(' ')))
return Query(wid, aid, ids)
else:
# sentence line
words = split(line)
wid = convert(vocab, words)
return Sentence(wid)
def __init__(self, X, q, y):
assert X.shape[0] == q.shape[0]
assert X.shape[0] == y.shape[0]
self.X = X
self.q = q
self.y = y
def __getitem__(self, idx):
return (self.X[idx], self.q[idx]), self.y[idx]
def __len__(self):
return self.X.shape[0]
train_data = BABIDataset(X=X_train, q=q_train, y=y_train)
test_data = BABIDataset(X=X_test, q=q_test, y=y_test)
train_indices, search_indices = train_test_split(range(len(X_train)), train_size=0.5)
dataloaders = {
"train" : ZipDataloader([
torch.utils.data.DataLoader(
dataset=train_data,
batch_size=32,
sampler=torch.utils.data.sampler.SubsetRandomSampler(train_indices),
),
torch.utils.data.DataLoader(
dataset=train_data,
batch_size=32,
sampler=torch.utils.data.sampler.SubsetRandomSampler(search_indices),
)
]),
"test" : DataLoader(
class BABIDataset(Dataset):
def __init__(self, X, q, y):
assert X.shape[0] == q.shape[0]
assert X.shape[0] == y.shape[0]
self.X = X
self.q = q
self.y = y
def __getitem__(self, idx):
return (self.X[idx], self.q[idx]), self.y[idx]
def __len__(self):
return self.X.shape[0]
train_data = BABIDataset(X=X_train, q=q_train, y=y_train)
test_data = BABIDataset(X=X_test, q=q_test, y=y_test)
train_indices, search_indices = train_test_split(range(len(X_train)), train_size=0.5)
dataloaders = {
"train" : ZipDataloader([
torch.utils.data.DataLoader(
dataset=train_data,
batch_size=32,
sampler=torch.utils.data.sampler.SubsetRandomSampler(train_indices),
),
torch.utils.data.DataLoader(
dataset=train_data,
batch_size=32,
sampler=torch.utils.data.sampler.SubsetRandomSampler(search_indices),
)
]),
with tf.variable_scope("embedding"):
A = VariableEmbedder(params, wd=wd, initializer=initializer, name='A')
Aq = A(q, name='Aq') # [N, S, J, d]
Ax = A(x, name='Ax') # [N, S, J, d]
with tf.name_scope("encoding"):
encoder = PositionEncoder(J, d)
u = encoder(Aq, q_mask) # [N, d]
m = encoder(Ax, x_mask) # [N, M, d]
with tf.variable_scope("networks"):
m_mask = tf.reduce_max(tf.cast(x_mask, 'int64'), 2, name='m_mask') # [N, M]
gate_mask = tf.expand_dims(m_mask, -1)
m_length = tf.reduce_sum(m_mask, 1, name='m_length') # [N]
prev_u = tf.tile(tf.expand_dims(u, 1), [1, M, 1]) # [N, M, d]
reg_layer = VectorReductionLayer(N, M, d) if use_vector_gate else ReductionLayer(N, M, d)
gate_size = d if use_vector_gate else 1
h = None # [N, M, d]
as_, rfs, rbs = [], [], []
hs = []
for layer_idx in range(L):
with tf.name_scope("layer_{}".format(layer_idx)):
dr_prev_u = tf.nn.dropout(prev_u, 0.7) if params.use_dropout else prev_u
u_t = tf.tanh(linear([dr_prev_u, m], d, True, wd=wd, scope='u_t'))
a = tf.cast(gate_mask, 'float') * tf.sigmoid(linear([dr_prev_u * m], gate_size, True, initializer=initializer, wd=wd, scope='a') - att_forget_bias)
h = reg_layer(u_t, a, 1.0-a, scope='h')
if layer_idx + 1 < L:
if params.use_reset:
rf, rb = tf.split(2, 2, tf.cast(gate_mask, 'float') *
tf.sigmoid(linear([dr_prev_u * m], 2 * gate_size, True, initializer=initializer, wd=wd, scope='r')))
else:
def train(train_data_path, test_data_path, args):
device = chainer.get_device(args.device)
device.use()
vocab = collections.defaultdict(lambda: len(vocab))
vocab[''] = 0
train_data = babi.read_data(vocab, train_data_path)
test_data = babi.read_data(vocab, test_data_path)
print('Training data: %s: %d' % (train_data_path, len(train_data)))
print('Test data: %s: %d' % (test_data_path, len(test_data)))
train_data = memnn.convert_data(train_data, args.max_memory)
test_data = memnn.convert_data(test_data, args.max_memory)
encoder = memnn.make_encoder(args.sentence_repr)
network = memnn.MemNN(
args.unit, len(vocab), encoder, args.max_memory, args.hop)
model = chainer.links.Classifier(network, label_key='answer')
opt = chainer.optimizers.Adam()
model.to_device(device)
opt.setup(model)