|
- from nvidia.dali.pipeline import Pipeline
- from nvidia.dali import ops
- from nvidia.dali import types
- from nvidia.dali.plugin.pytorch import DALIClassificationIterator
-
- import numpy as np
- import torch
- from torch import nn
-
-
- class HybridTrainPipeline(Pipeline):
-
- def __init__(self, batch_size, file_root, num_threads, device_id, num_shards, shard_id):
- super(HybridTrainPipeline, self).__init__(batch_size, num_threads, device_id)
-
- device_type = {0:"cpu"}
- if num_shards == 0:
- self.input = ops.FileReader(file_root = file_root)
- else:
- self.input = ops.FileReader(file_root = file_root, num_shards = num_shards, shard_id = shard_id)
-
- # ##### 可自由更改 ###################################
- self.decode = ops.ImageDecoder(device = device_type.get(num_shards, "mixed"), output_type = types.RGB)
- self.res = ops.RandomResizedCrop(device=device_type.get(num_shards, "gpu"), size = 224)
- self.cmnp = ops.CropMirrorNormalize(device=device_type.get(num_shards, "gpu"),
- dtype = types.FLOAT, # output_dtype=types.FLOAT,
- output_layout=types.NCHW,
- mean=0. ,# if spos_pre else [0.485 * 255, 0.456 * 255, 0.406 * 255],
- std=1. )# if spos_pre else [0.229 * 255, 0.224 * 255, 0.225 * 255])
-
- # ####################################################
-
- def define_graph(self, ):
- jpegs, labels = self.input(name="Reader")
- images = self.decode(jpegs)
- images = self.res(images)
- images = self.cmnp(images)
- return images, labels
-
-
- class HybridValPipeline(Pipeline):
-
- def __init__(self, batch_size, file_root, num_threads, device_id, num_shards, shard_id):
- super(HybridValPipeline, self).__init__(batch_size, num_threads, device_id)
-
- device_type = {0:"cpu"}
- if num_shards == 0:
- self.input = ops.FileReader(file_root = file_root)
- else:
- self.input = ops.FileReader(file_root = file_root, num_shards = num_shards, shard_id = shard_id)
-
-
-
- # ##### 可自由更改 ###################################
- self.decode = ops.ImageDecoder(device = device_type.get(num_shards, "mixed"), output_type = types.RGB)
- self.res = ops.RandomResizedCrop(device=device_type.get(num_shards, "gpu"), size = 224)
- self.cmnp = ops.CropMirrorNormalize(device=device_type.get(num_shards, "gpu"),
- dtype = types.FLOAT, # output_dtype=types.FLOAT,
- output_layout=types.NCHW,
- mean=0. ,# if spos_pre else [0.485 * 255, 0.456 * 255, 0.406 * 255],
- std=1. )# if spos_pre else [0.229 * 255, 0.224 * 255, 0.225 * 255])
-
- # ####################################################
-
- def define_graph(self, ):
- jpegs, labels = self.input(name="Reader")
- images = self.decode(jpegs)
- images = self.res(images)
- images = self.cmnp(images)
- return images, labels
-
-
- class TorchWrapper:
-
- """
- 将多个pipeline封装为一个iterator
-
- parameters:
- num_shards : int 显卡并行数
- data_loader : dali.pipeline.Pipeline类型 经过pipeline处理的数据结果
- iter_mode : str recursion, iter 指定多个pipeline合并的方式,默认recursion
- """
-
-
- def __init__(self, num_shards, data_loader, iter_mode = "recursion"):
- self.index = 0
- self.count = 0
- self.num_shards = num_shards
- self.data_loader = data_loader
- self.iter_mode = iter_mode
- if self.iter_mode not in {"recursion", "iter"}:
- raise Exception("iter_mode should be either 'recursion' or 'iter'")
-
- def __iter__(self,):
- return self
-
- def __len__(self, ):
- # 返回样本总量,而非batch_num
- if num_shards == 0:
- return self.data_loader.size
-
- else:
- return len(self.data_loader)*self.data_loader[0].size
-
- def __next__(self, ):
- if num_shards == 0:
- # 不使用GPU
- data = next(self.data_loader)
- return data[0]["data"], data[0]["label"].view(-1).long()
-
- else:
- # 使用一块或多块GPU
- if self.iter_mode == "recursion":
- return self._get_next_recursion()
- elif self.iter_mode == "iter":
- return self._get_next_iter(self.data_loader[0])
-
- def _get_next_iter(self, data_loader):
-
- if self.count == data_loader.size:
- self.index+=1
- data_loader = self.data_loader[self.index]
-
- self.count+=1
- data = next(data_loader)
- return data[0]["data"], data[0]["label"].view(-1).long()
-
- def _get_next_recursion(self, ):
-
- self.index = self.count%self.num_shards
- self.count+=1
-
- data_loader = self.data_loader[self.index]
- data = next(data_loader)
-
- return data[0]["data"], data[0]["label"].view(-1).long()
-
-
- def get_iter_dali_cuda(batch_size=256, train_file_root="", val_file_root="", num_threads=4, device_id=[-1], num_shards=0, shard_id=[-1]):
-
- """
- 获取可用于pytorch训练的数据迭代器
- 数据的读取和处理部分可以使用多张GPU来完成
-
- 1、创建dali pipeline
- 2、封装为适用于pytorch的数据迭代器
- 3、将多卡的各个pipeline封装在一起
- 4、数据输出在cpu端,在cuda中
-
- 数据需要保证如下形式:
- images
- |-file_list.txt
- |-images/dog
- |-dog_4.jpg
- |-dog_5.jpg
- |-dog_9.jpg
- |-dog_6.jpg
- |-dog_3.jpg
- |-images/kitten
- |-cat_10.jpg
- |-cat_5.jpg
- |-cat_9.jpg
- |-cat_8.jpg
- |-cat_1.jpg
-
- parameters:
-
- batch_size : int 每批数据的量
- file_root : str 数据的路径
- num_threads : int 读取数据的CPU线程数
- device_id : list of int GPU的物理编号
- shard_id : list of int GPU的虚拟编号
- num_shard : int
-
- methods:
-
- get_train_pipeline(shard_id, device_id) : 创建dali的pipeline,用以读取并处理训练数据
- get_val_pipeline(shard_id, device_id) : 创建dali的pipeline,用以读取并处理验证数据
- get_dali_iter_for_torch(piplines, data_num) : 封装成可用于pytorch的数据迭代器
- get_data_size(pipeline) : 计算每个pipeline实际输出的数据总量,数据总量是文件中的数据量,实际输出是去掉了不满一个批次大小的数据
-
- 例:
- # 分别从TRAIN_PATH和VAL_PATH读取训练和验证数据,batch_size选择256,启动4个线程来读取数据,用2块GPU处理数据,分别是第0号和第4号GPU
- # 程序默认使用所有显卡,和4线程
- # 如果使用单张GPU,请设置num_shards = 1, shard_id = [0], device_id保持一个列表形式
- # 如果不使用GPU,请使用get_iter_dali_cpu()
- train_data_iter, val_data_iter = get_iter_dali(batch_size=256,
- train_file_root=TRAIN_PATH,
- val_file_root=Val_PATH,
- num_threads=4,
- device_id=[0,4],
- num_shards=2,
- shard_id=[0,1])
-
- # 在torch中训练
- torch_model = TorchModel(para)
- criterion = nn.CrossEntropyLoss()
- optimizer = torch.optim.Adam(torch_model.parameters())
-
- for epoch in range(epoches):
- for step, x,y in enumerate(train_data_iter):
-
- # 数据 : x
- # 标签 : y
- x = x.to("cuda:0")
- y = y.to("cuda:0")
- output = my_model(x)
-
- optimizer.zero_grad()
- loss = criterion(output, y)
- loss.backward()
- optimizer.step()
- ...
- ...
-
- """
-
- def get_train_pipeline(shard_id, device_id):
-
- pipeline = HybridTrainPipeline(batch_size = batch_size,
- file_root = train_file_root,
- num_threads = num_threads,
- num_shards = num_shards,
- shard_id = shard_id,
- device_id = device_id)
- return pipeline
-
- def get_val_pipeline(shard_id, device_id):
-
- pipeline = HybridValPipeline(batch_size = batch_size,
- file_root = val_file_root,
- num_threads = num_threads,
- num_shards = num_shards,
- shard_id = shard_id,
- device_id = device_id)
- return pipeline
-
-
-
- pipeline_for_train = [get_train_pipeline(shard_id = shard_id_index, device_id = device_id_index) \
- for shard_id_index, device_id_index in zip(shard_id, device_id)]
- pipeline_for_val = [get_val_pipeline(shard_id = shard_id_index, device_id = device_id_index) \
- for shard_id_index, device_id_index in zip(shard_id, device_id)]
-
-
- [pipeline.build() for pipeline in pipeline_for_train]
- [pipeline.build() for pipeline in pipeline_for_val]
-
-
- def get_data_size(pipeline):
- data_num = pipeline.epoch_size()["Reader"]
- batch_size = pipeline.batch_size
- return data_num//batch_size*batch_size
-
-
- data_num_train = get_data_size(pipeline_for_train[0])
- data_num_val = get_data_size(pipeline_for_val[0])
- def get_dali_iter_for_torch(pipelines, data_num):
- return [DALIClassificationIterator(pipelines=pipeline,
- last_batch_policy="drop",size = data_num) for pipeline in pipelines]
-
-
- data_loader_train = get_dali_iter_for_torch(pipeline_for_train, data_num_train)
- data_loader_val = get_dali_iter_for_torch(pipeline_for_val, data_num_val)
-
-
- train_data_iter = TorchWrapper(num_shards, data_loader_train)
- val_data_iter = TorchWrapper(num_shards, data_loader_val)
-
-
- return train_data_iter, val_data_iter
-
-
- def get_iter_dali_cpu(batch_size=256, train_file_root="", val_file_root="", num_threads=4):
-
- pipeline_train = HybridTrainPipeline(batch_size = batch_size,
- file_root = train_file_root,
- num_threads = num_threads,
- num_shards = 0,
- shard_id = -1,
- device_id = 0)
-
-
- pipeline_val = HybridTrainPipeline(batch_size = batch_size,
- file_root = val_file_root,
- num_threads = num_threads,
- num_shards = 0,
- shard_id = -1,
- device_id = 0)
-
- pipeline_train.build()
- pipeline_val.build()
-
- def get_data_size(pipeline):
- data_num = pipeline.epoch_size()["Reader"]
- batch_size = pipeline.batch_size
- return data_num//batch_size*batch_size
-
- data_num_train = get_data_size(pipeline_train)
- data_num_val = get_data_size(pipeline_val)
-
- data_loader_train = DALIClassificationIterator(pipelines=pipeline_train,
- last_batch_policy="drop",size = data_num_train)
- data_loader_val = DALIClassificationIterator(pipelines=pipeline_val,
- last_batch_policy="drop",size = data_num_val)
-
- train_data_iter = TorchWrapper(0,data_loader_train)
- val_data_iter = TorchWrapper(0,data_loader_val)
-
- return train_data_iter, val_data_iter
-
-
-
- if __name__ == "__main__":
-
- PATH = "./imagenet"
- TRAIN_PATH = "./imagenet/train"
- VALID_PATH = "./imagenet/val"
-
- train_data_iter_cuda, val_data_iter_cuda = get_iter_dali_cuda(batch_size=256,
- train_file_root=TRAIN_PATH,
- val_file_root=TRAIN_PATH,
- num_threads=4,
- device_id=[0,4],
- num_shards=2,
- shard_id=[0,1])
-
- train_data_iter_cpu, val_data_iter_cpu = get_iter_dali_cpu(batch_size=256,
- train_file_root=TRAIN_PATH,
- val_file_root=TRAIN_PATH,
- num_threads=4)
|