|
- import hetu as ht
- from hetu import stream
- from hetu import init
-
- import os
- import sys
- import json
- import time
- import argparse
- import numpy as np
- import logging
-
- np.random.seed(123)
-
-
- def convert_to_one_hot(vals, max_val=0):
- """Helper method to convert label array to one-hot array."""
- if max_val == 0:
- max_val = vals.max() + 1
- one_hot_vals = np.zeros((vals.size, max_val))
- one_hot_vals[np.arange(vals.size), vals] = 1
- return one_hot_vals
-
-
- def fc(x, shape, name, with_relu=True, ctx=None):
- weight = init.random_normal(
- shape=shape, stddev=0.04, name=name+'_weight', ctx=ctx)
- bias = init.random_normal(
- shape=shape[-1:], stddev=0.04, name=name+'_bias', ctx=ctx)
- x = ht.matmul_op(x, weight)
- x = x + ht.broadcastto_op(bias, x)
- if with_relu:
- x = ht.relu_op(x)
- return x
-
-
- if __name__ == "__main__":
- # argument parser
- parser = argparse.ArgumentParser()
- parser.add_argument('--steps', type=int, default=8, help='training steps')
- parser.add_argument('--warmup', type=int, default=2,
- help='warm up steps excluded from timing')
- parser.add_argument('--batch-size', type=int, default=8, help='batch size')
- parser.add_argument('--learning-rate', type=float,
- default=0.00001, help='learning rate')
- args = parser.parse_args()
-
- # init and opt for both ranks
- comm = ht.wrapped_mpi_nccl_init()
- device_id = comm.dev_id
- print("mpi_nccl init for gpu device: {}".format(device_id))
- executor_ctx = ht.gpu(device_id)
- opt = ht.optim.SGDOptimizer(learning_rate=args.learning_rate)
-
- # init logger
- logger = logging.getLogger()
- ch = logging.StreamHandler()
- formatter = logging.Formatter('[rank{}, PID{}]'.format(
- device_id, os.getpid()) + ' %(asctime)s: %(message)s')
- ch.setLevel(logging.DEBUG)
- ch.setFormatter(formatter)
- logger.addHandler(ch)
- log = logger.warning
-
- # nccl communicate stream for pipeline_send/receive
- communicate_stream = stream.create_stream_handle(executor_ctx)
-
- # dataset
- datasets = ht.data.mnist()
- train_set_x, train_set_y = datasets[0]
- valid_set_x, valid_set_y = datasets[1]
- test_set_x, test_set_y = datasets[2]
-
- batch_size = 10000
- batch_num = 5
- value_x_list = []
- value_y_list = []
- for i in range(batch_num):
- start = i * batch_size
- ending = (i+1) * batch_size
- value_x_list.append(train_set_x[start:ending])
- value_y_list.append(train_set_y[start:ending])
-
- x = ht.Variable(name="dataloader_x", trainable=False)
- y_ = ht.Variable(name="dataloader_y", trainable=False)
-
- # model parallel
- if comm.myRank.value == 0:
- # rank0
-
- # forward
- activation = fc(x, (784, 1024), 'mlp_fc1', with_relu=True,
- ctx=ht.gpu(comm.localRank.value))
- activation = fc(activation, (1024, 2048), 'mlp_fc2',
- with_relu=True, ctx=ht.gpu(comm.localRank.value))
- activation = fc(activation, (2048, 1024), 'mlp_fc3',
- with_relu=True, ctx=ht.gpu(comm.localRank.value))
- activation_send_op = ht.pipeline_send_op(
- activation, 1, comm, stream=communicate_stream)
-
- # backward
- gradient_receive_op = ht.pipeline_receive_op(
- 1, comm, ctx=executor_ctx, stream=communicate_stream)
- required_vars = opt.get_var_list(activation)
- opt.params = required_vars
- grads = ht.gradients(activation, required_vars,
- insert_grad=gradient_receive_op)
- train_op = ht.optim.OptimizerOp(grads, opt)
-
- executor = ht.Executor(
- [activation_send_op, train_op], ctx=executor_ctx)
-
- elif comm.myRank.value != 7:
- # from rank1 to rank6
- previous_rank = comm.myRank.value - 1
- next_rank = comm.myRank.value + 1
-
- # 1. receive activation from previous rank
- activation_receive_op = ht.pipeline_receive_op(
- previous_rank, comm, ctx=executor_ctx, stream=communicate_stream)
- # forward
- activation = fc(activation_receive_op, (1024, 2048), 'mlp_fc1',
- with_relu=True, ctx=ht.gpu(comm.localRank.value))
- activation = fc(activation, (2048, 2048), 'mlp_fc2',
- with_relu=True, ctx=ht.gpu(comm.localRank.value))
- activation = fc(activation, (2048, 1024), 'mlp_fc3',
- with_relu=True, ctx=ht.gpu(comm.localRank.value))
-
- # 2. send activation to next rank
- activation_send_op = ht.pipeline_send_op(
- activation, next_rank, comm, ctx=executor_ctx, stream=communicate_stream)
-
- # 3. receive gradients from next rank
- gradient_receive_op = ht.pipeline_receive_op(
- next_rank, comm, ctx=executor_ctx, stream=communicate_stream)
- # backward
- required_vars = opt.get_var_list(activation)
- opt.params = required_vars
- required_vars = [activation_receive_op] + required_vars
- grads = ht.gradients(activation, required_vars,
- insert_grad=gradient_receive_op)
- train_op = ht.optim.OptimizerOp(grads[1:], opt)
-
- # 4. send gradients to previous rank
- sendback_grad_op = ht.pipeline_send_op(
- grads[0], previous_rank, comm, stream=communicate_stream)
-
- executor = ht.Executor(
- [activation_send_op, sendback_grad_op, train_op], ctx=executor_ctx)
-
- else:
- # rank7
- activation_receive_op = ht.pipeline_receive_op(
- 6, comm, ctx=executor_ctx, stream=communicate_stream)
-
- # forward
- activation = fc(activation_receive_op, (1024, 2048), 'mlp_fc1',
- with_relu=True, ctx=ht.gpu(comm.localRank.value))
- activation = fc(activation, (2048, 1024), 'mlp_fc2',
- with_relu=True, ctx=ht.gpu(comm.localRank.value))
- y_pred = fc(activation, (1024, 10), 'mlp_fc3', with_relu=False)
- loss = ht.softmaxcrossentropy_op(y_pred, y_)
- loss = ht.reduce_mean_op(loss, [0])
-
- # backward
- required_vars = opt.get_var_list(loss)
- opt.params = required_vars
- required_vars = [activation_receive_op] + required_vars
- grads = ht.gradients(loss, required_vars)
- train_op = ht.optim.OptimizerOp(grads[1:], opt)
-
- sendback_grad_op = ht.pipeline_send_op(
- grads[0], 6, comm, stream=communicate_stream)
- executor = ht.Executor(
- [loss, sendback_grad_op, train_op], ctx=executor_ctx)
-
- # training
- for step in range(args.steps):
- if step == args.warmup:
- start = time.time()
- if comm.myRank.value == 0:
- log("step {}:".format(step))
- if comm.myRank.value == 0:
- executor.run(feed_dict={x: value_x_list[step % batch_num]})
- log("gpu0 ok")
- elif comm.myRank.value == 7:
- loss, _, _ = executor.run(
- feed_dict={y_: value_y_list[step % batch_num]}, convert_to_numpy_ret_vals=True)
- log("gpu7 ok, loss: {}".format(loss[0]))
- else:
- executor.run()
- log("gpu{} ok".format(comm.myRank.value))
-
- # comm.stream.sync()
- if communicate_stream:
- communicate_stream.sync()
-
- end = time.time()
- log("time elapsed for {} steps: {}s".format(
- args.steps-args.warmup, round(end-start, 3)))
|