Browse Source

merge callback test to speed up

tags/v1.0.0alpha
yh 3 years ago
parent
commit
6b56d8a082
4 changed files with 512 additions and 523 deletions
  1. +310
    -315
      tests/core/callbacks/test_checkpoint_callback_torch.py
  2. +34
    -36
      tests/core/callbacks/test_load_best_model_callback_torch.py
  3. +142
    -146
      tests/core/callbacks/test_more_evaluate_callback.py
  4. +26
    -26
      tests/core/callbacks/test_progress_callback_torch.py

+ 310
- 315
tests/core/callbacks/test_checkpoint_callback_torch.py View File

@@ -75,131 +75,129 @@ def model_and_optimizers(request):


@pytest.mark.torch @pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", [0, 1])]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) @pytest.mark.parametrize("driver,device", [("torch", [0, 1])]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)
@pytest.mark.parametrize("version", [0, 1])
@pytest.mark.parametrize("only_state_dict", [True, False])
@magic_argv_env_context(timeout=100) @magic_argv_env_context(timeout=100)
def test_model_checkpoint_callback_1( def test_model_checkpoint_callback_1(
model_and_optimizers: TrainerParameters, model_and_optimizers: TrainerParameters,
driver, driver,
device,
version,
only_state_dict
device
): ):
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=1, every_n_batches=123, last=False, on_exceptions=None, topk=0,
monitor=None, only_state_dict=only_state_dict, save_object='model')
]
elif version == 1:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=3, every_n_batches=None, last=True, on_exceptions=None, topk=2,
monitor="acc", only_state_dict=only_state_dict, save_object='model')
]

trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=10,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run()
print("Finish train")
all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:

if not isinstance(device, list):
assert "model-epoch_10" in all_saved_model_paths
assert "model-epoch_4-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["model-epoch_10"]
step_save_path = all_saved_model_paths["model-epoch_4-batch_123"]

assert len(all_saved_model_paths) == 12
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "model-epoch_6" in all_saved_model_paths
assert "model-epoch_9-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["model-epoch_6"]
step_save_path = all_saved_model_paths["model-epoch_9-batch_123"]

assert len(all_saved_model_paths) == 11
all_state_dicts = [epoch_save_path, step_save_path]

elif version == 1:

pattern = re.compile("model-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*")

if not isinstance(device, list):
assert "model-epoch_9" in all_saved_model_paths
assert "model-last" in all_saved_model_paths
aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

epoch_save_path = all_saved_model_paths["model-epoch_9"]
last_save_path = all_saved_model_paths["model-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 6
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "model-epoch_9" in all_saved_model_paths
assert "model-last" in all_saved_model_paths

aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

epoch_save_path = all_saved_model_paths["model-epoch_9"]
last_save_path = all_saved_model_paths["model-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 6

all_state_dicts = [epoch_save_path, last_save_path, topk_save_path]

for folder in all_state_dicts:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=2,
output_from_new_proc="all"
)
trainer.load_model(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()
finally:
rank_zero_rm(path)
for version in [0, 1]:
for only_state_dict in [True, False]:
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=1, every_n_batches=123, last=False, on_exceptions=None, topk=0,
monitor=None, only_state_dict=only_state_dict, save_object='model')
]
elif version == 1:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=3, every_n_batches=None, last=True, on_exceptions=None, topk=2,
monitor="acc", only_state_dict=only_state_dict, save_object='model')
]

trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=10,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run()
print("Finish train")
all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:

if not isinstance(device, list):
assert "model-epoch_10" in all_saved_model_paths
assert "model-epoch_4-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["model-epoch_10"]
step_save_path = all_saved_model_paths["model-epoch_4-batch_123"]

assert len(all_saved_model_paths) == 12
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "model-epoch_6" in all_saved_model_paths
assert "model-epoch_9-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["model-epoch_6"]
step_save_path = all_saved_model_paths["model-epoch_9-batch_123"]

assert len(all_saved_model_paths) == 11
all_state_dicts = [epoch_save_path, step_save_path]

elif version == 1:

pattern = re.compile("model-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*")

if not isinstance(device, list):
assert "model-epoch_9" in all_saved_model_paths
assert "model-last" in all_saved_model_paths
aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

epoch_save_path = all_saved_model_paths["model-epoch_9"]
last_save_path = all_saved_model_paths["model-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 6
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "model-epoch_9" in all_saved_model_paths
assert "model-last" in all_saved_model_paths

aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

epoch_save_path = all_saved_model_paths["model-epoch_9"]
last_save_path = all_saved_model_paths["model-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 6

all_state_dicts = [epoch_save_path, last_save_path, topk_save_path]

for folder in all_state_dicts:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=2,
output_from_new_proc="all"
)
trainer.load_model(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()
finally:
rank_zero_rm(path)


if dist.is_initialized(): if dist.is_initialized():
dist.destroy_process_group() dist.destroy_process_group()
@@ -207,92 +205,91 @@ def test_model_checkpoint_callback_1(


@pytest.mark.torch @pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)
@pytest.mark.parametrize("only_state_dict", [True])
@magic_argv_env_context(timeout=100) @magic_argv_env_context(timeout=100)
def test_model_checkpoint_callback_2( def test_model_checkpoint_callback_2(
model_and_optimizers: TrainerParameters, model_and_optimizers: TrainerParameters,
driver, driver,
device,
only_state_dict
device
): ):
try:
path = Path.cwd().joinpath("test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

from fastNLP.core.callbacks.callback_event import Event

@Trainer.on(Event.on_train_epoch_end())
def raise_exception(trainer):
if trainer.driver.get_local_rank() == 0 and trainer.cur_epoch_idx == 4:
raise NotImplementedError

callbacks = [
CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=False,
on_exceptions=NotImplementedError, topk=None, monitor=None, only_state_dict=only_state_dict,
save_object='model'),
]

with pytest.raises(NotImplementedError):
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=10,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run()

if dist.is_initialized():
dist.destroy_process_group()
if FASTNLP_DISTRIBUTED_CHECK in os.environ:
os.environ.pop(FASTNLP_DISTRIBUTED_CHECK)
for only_state_dict in [True, False]:
try:
path = Path.cwd().joinpath("test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)


# 检查生成保存模型文件的数量是不是正确的;
all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}

if not isinstance(device, list):
assert "model-epoch_4-batch_100-exception_NotImplementedError" in all_saved_model_paths
exception_model_path = all_saved_model_paths["model-epoch_4-batch_100-exception_NotImplementedError"]
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "model-epoch_4-batch_52-exception_NotImplementedError" in all_saved_model_paths
exception_model_path = all_saved_model_paths["model-epoch_4-batch_52-exception_NotImplementedError"]
from fastNLP.core.callbacks.callback_event import Event


assert len(all_saved_model_paths) == 1
all_state_dicts = [exception_model_path]
@Trainer.on(Event.on_train_epoch_end())
def raise_exception(trainer):
if trainer.driver.get_local_rank() == 0 and trainer.cur_epoch_idx == 4:
raise NotImplementedError


for folder in all_state_dicts:
trainer = Trainer(
model=model_and_optimizers.model,
driver="torch",
device=4,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=2,
output_from_new_proc="all"
)
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=False,
on_exceptions=NotImplementedError, topk=None, monitor=None, only_state_dict=only_state_dict,
save_object='model'),
]


trainer.load_model(folder, only_state_dict=only_state_dict)
trainer.run()
trainer.driver.barrier()
with pytest.raises(NotImplementedError):
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=10,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run()

if dist.is_initialized():
dist.destroy_process_group()
if FASTNLP_DISTRIBUTED_CHECK in os.environ:
os.environ.pop(FASTNLP_DISTRIBUTED_CHECK)

# 检查生成保存模型文件的数量是不是正确的;
all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}


finally:
rank_zero_rm(path)
# pass
if not isinstance(device, list):
assert "model-epoch_4-batch_100-exception_NotImplementedError" in all_saved_model_paths
exception_model_path = all_saved_model_paths["model-epoch_4-batch_100-exception_NotImplementedError"]
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "model-epoch_4-batch_52-exception_NotImplementedError" in all_saved_model_paths
exception_model_path = all_saved_model_paths["model-epoch_4-batch_52-exception_NotImplementedError"]

assert len(all_saved_model_paths) == 1
all_state_dicts = [exception_model_path]

for folder in all_state_dicts:
trainer = Trainer(
model=model_and_optimizers.model,
driver="torch",
device=4,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=2,
output_from_new_proc="all"
)

trainer.load_model(folder, only_state_dict=only_state_dict)
trainer.run()
trainer.driver.barrier()

finally:
rank_zero_rm(path)
# pass


if dist.is_initialized(): if dist.is_initialized():
dist.destroy_process_group() dist.destroy_process_group()
@@ -300,130 +297,128 @@ def test_model_checkpoint_callback_2(


@pytest.mark.torch @pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 0)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 0)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)
@pytest.mark.parametrize("version", [0, 1])
@pytest.mark.parametrize("only_state_dict", [True, False])
@magic_argv_env_context(timeout=100) @magic_argv_env_context(timeout=100)
def test_trainer_checkpoint_callback_1( def test_trainer_checkpoint_callback_1(
model_and_optimizers: TrainerParameters, model_and_optimizers: TrainerParameters,
driver, driver,
device,
version,
only_state_dict
device
): ):
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=7, every_n_batches=123, last=False, on_exceptions=None, topk=0,
monitor=None, only_state_dict=only_state_dict, save_object='trainer')
]
elif version == 1:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=True, on_exceptions=None,
topk=2, monitor="acc", only_state_dict=only_state_dict, save_object='trainer')
]

trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=10,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run()

all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:

if not isinstance(device, list):
assert "trainer-epoch_7" in all_saved_model_paths
assert "trainer-epoch_4-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["trainer-epoch_7"]
step_save_path = all_saved_model_paths["trainer-epoch_4-batch_123"]

assert len(all_saved_model_paths) == 3
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "trainer-epoch_7" in all_saved_model_paths
assert "trainer-epoch_9-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["trainer-epoch_7"]
step_save_path = all_saved_model_paths["trainer-epoch_9-batch_123"]

assert len(all_saved_model_paths) == 2
all_state_dicts = [epoch_save_path, step_save_path]

elif version == 1:

pattern = re.compile("trainer-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*")

# all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
if not isinstance(device, list):
assert "trainer-last" in all_saved_model_paths
aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

last_save_path = all_saved_model_paths["trainer-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 3
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "trainer-last" in all_saved_model_paths

aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

last_save_path = all_saved_model_paths["trainer-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 3

all_state_dicts = [last_save_path, topk_save_path]

for folder in all_state_dicts:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=13,
output_from_new_proc="all"
)
trainer.load_checkpoint(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()

finally:
rank_zero_rm(path)
for version in [0, 1]:
for only_state_dict in [True, False]:
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=7, every_n_batches=123, last=False, on_exceptions=None, topk=0,
monitor=None, only_state_dict=only_state_dict, save_object='trainer')
]
elif version == 1:
callbacks = [
CheckpointCallback(folder=path, every_n_epochs=None, every_n_batches=None, last=True, on_exceptions=None,
topk=2, monitor="acc", only_state_dict=only_state_dict, save_object='trainer')
]

trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=10,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run()

all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:

if not isinstance(device, list):
assert "trainer-epoch_7" in all_saved_model_paths
assert "trainer-epoch_4-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["trainer-epoch_7"]
step_save_path = all_saved_model_paths["trainer-epoch_4-batch_123"]

assert len(all_saved_model_paths) == 3
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "trainer-epoch_7" in all_saved_model_paths
assert "trainer-epoch_9-batch_123" in all_saved_model_paths

epoch_save_path = all_saved_model_paths["trainer-epoch_7"]
step_save_path = all_saved_model_paths["trainer-epoch_9-batch_123"]

assert len(all_saved_model_paths) == 2
all_state_dicts = [epoch_save_path, step_save_path]

elif version == 1:

pattern = re.compile("trainer-epoch_[0-9]+-batch_[0-9]+-[a-zA-Z#]+_[0-9]*.?[0-9]*")

# all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
if not isinstance(device, list):
assert "trainer-last" in all_saved_model_paths
aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

last_save_path = all_saved_model_paths["trainer-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 3
# ddp 下的文件名不同,因为同样的数据,ddp 用了更少的步数跑完;
else:
assert "trainer-last" in all_saved_model_paths

aLL_topk_folders = []
for each_folder_name in all_saved_model_paths:
each_folder_name = pattern.findall(each_folder_name)
if len(each_folder_name) != 0:
aLL_topk_folders.append(each_folder_name[0])
assert len(aLL_topk_folders) == 2

last_save_path = all_saved_model_paths["trainer-last"]
topk_save_path = all_saved_model_paths[aLL_topk_folders[0]]

assert len(all_saved_model_paths) == 3

all_state_dicts = [last_save_path, topk_save_path]

for folder in all_state_dicts:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,

n_epochs=13,
output_from_new_proc="all"
)
trainer.load_checkpoint(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()

finally:
rank_zero_rm(path)


if dist.is_initialized(): if dist.is_initialized():
dist.destroy_process_group() dist.destroy_process_group()


+ 34
- 36
tests/core/callbacks/test_load_best_model_callback_torch.py View File

@@ -72,47 +72,45 @@ def model_and_optimizers(request):




@pytest.mark.torch @pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", [4, 5]), ("torch", 1), ("torch", "cpu")]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)
@pytest.mark.parametrize("save_folder", ['save_models', None])
@pytest.mark.parametrize("only_state_dict", [True, False])
@pytest.mark.parametrize("driver,device", [("torch", [0, 1]), ("torch", 1), ("torch", "cpu")]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)
@magic_argv_env_context @magic_argv_env_context
def test_load_best_model_callback( def test_load_best_model_callback(
model_and_optimizers: TrainerParameters, model_and_optimizers: TrainerParameters,
driver, driver,
device,
save_folder,
only_state_dict
device
): ):
callbacks = [LoadBestModelCallback(monitor='acc')]

trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']},
metrics=model_and_optimizers.metrics,
n_epochs=3,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run(num_eval_sanity_batch=0)

driver = TorchSingleDriver(model_and_optimizers.model, device=torch.device('cuda'))
evaluator = Evaluator(model_and_optimizers.model, driver=driver, device=device,
dataloaders={'dl1': model_and_optimizers.evaluate_dataloaders},
metrics={'acc': Accuracy(aggregate_when_get_metric=False)},
output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']},
progress_bar='rich', use_dist_sampler=False)
results = evaluator.run()
assert np.allclose(callbacks[0].monitor_value, results['acc#acc#dl1'])
if save_folder:
import shutil
shutil.rmtree(save_folder, ignore_errors=True)
for save_folder in ['save_models', None]:
for only_state_dict in [True, False]:
callbacks = [LoadBestModelCallback(monitor='acc')]

trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']},
metrics=model_and_optimizers.metrics,
n_epochs=3,
callbacks=callbacks,
output_from_new_proc="all"
)

trainer.run(num_eval_sanity_batch=0)

driver = TorchSingleDriver(model_and_optimizers.model, device=torch.device('cuda'))
evaluator = Evaluator(model_and_optimizers.model, driver=driver, device=device,
dataloaders={'dl1': model_and_optimizers.evaluate_dataloaders},
metrics={'acc': Accuracy(aggregate_when_get_metric=False)},
output_mapping=lambda output: output if ('loss' in output) else {'pred':output['preds'], 'target': output['target']},
progress_bar='rich', use_dist_sampler=False)
results = evaluator.run()
assert np.allclose(callbacks[0].monitor_value, results['acc#acc#dl1'])
if save_folder:
import shutil
shutil.rmtree(save_folder, ignore_errors=True)
if dist.is_initialized(): if dist.is_initialized():
dist.destroy_process_group() dist.destroy_process_group()




+ 142
- 146
tests/core/callbacks/test_more_evaluate_callback.py View File

@@ -93,84 +93,82 @@ def model_and_optimizers(request):


@pytest.mark.torch @pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)
@pytest.mark.parametrize("version", [0, 1])
@pytest.mark.parametrize("only_state_dict", [True, False])
@magic_argv_env_context @magic_argv_env_context
def test_model_more_evaluate_callback_1( def test_model_more_evaluate_callback_1(
model_and_optimizers: TrainerParameters, model_and_optimizers: TrainerParameters,
driver, driver,
device, device,
version,
only_state_dict
): ):
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=-1,
folder=path, topk=-1,
topk_monitor='acc', only_state_dict=only_state_dict, save_object='model')
]
elif version == 1:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False,
folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict,
save_object='model')
]
n_epochs = 5
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
callbacks=callbacks,
output_from_new_proc="all",
evaluate_fn='train_step'
)

trainer.run()

all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:
assert len(all_saved_model_paths) == n_epochs
elif version == 1:
assert len(all_saved_model_paths) == 1

for folder in all_saved_model_paths:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=2,
output_from_new_proc="all",
evaluate_fn='train_step'
)
folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder)
trainer.load_model(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()
finally:
rank_zero_rm(path)
for only_state_dict in [True, False]:
for version in [0, 1]:
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=-1,
folder=path, topk=-1,
topk_monitor='acc', only_state_dict=only_state_dict, save_object='model')
]
elif version == 1:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False,
folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict,
save_object='model')
]
n_epochs = 5
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
callbacks=callbacks,
output_from_new_proc="all",
evaluate_fn='train_step'
)

trainer.run()

all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:
assert len(all_saved_model_paths) == n_epochs
elif version == 1:
assert len(all_saved_model_paths) == 1

for folder in all_saved_model_paths:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=2,
output_from_new_proc="all",
evaluate_fn='train_step'
)
folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder)
trainer.load_model(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()
finally:
rank_zero_rm(path)


if dist.is_initialized(): if dist.is_initialized():
dist.destroy_process_group() dist.destroy_process_group()
@@ -178,85 +176,83 @@ def test_model_more_evaluate_callback_1(


@pytest.mark.torch @pytest.mark.torch
@pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 0)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1) @pytest.mark.parametrize("driver,device", [("torch", "cpu"), ("torch", [0, 1]), ("torch", 0)]) # ("torch", "cpu"), ("torch", [0, 1]), ("torch", 1)
@pytest.mark.parametrize("version", [0, 1])
@pytest.mark.parametrize("only_state_dict", [True, False])
@magic_argv_env_context @magic_argv_env_context
def test_trainer_checkpoint_callback_1( def test_trainer_checkpoint_callback_1(
model_and_optimizers: TrainerParameters, model_and_optimizers: TrainerParameters,
driver, driver,
device,
version,
only_state_dict
device
): ):
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=-1,
folder=path, topk=-1,
topk_monitor='acc', only_state_dict=only_state_dict, save_object='trainer')
]
elif version == 1:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False,
folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict,
save_object='trainer')
]
n_epochs = 5
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
callbacks=callbacks,
output_from_new_proc="all",
evaluate_fn='train_step'
)

trainer.run()

all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:
assert len(all_saved_model_paths) == n_epochs
elif version == 1:
assert len(all_saved_model_paths) == 1

for folder in all_saved_model_paths:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=7,
output_from_new_proc="all",
evaluate_fn='train_step'
)
folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder)
trainer.load_checkpoint(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()

finally:
rank_zero_rm(path)
for version in [0, 1]:
for only_state_dict in [True, False]:
try:
path = Path.cwd().joinpath(f"test_model_checkpoint")
path.mkdir(exist_ok=True, parents=True)

if version == 0:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=-1,
folder=path, topk=-1,
topk_monitor='acc', only_state_dict=only_state_dict, save_object='trainer')
]
elif version == 1:
callbacks = [
MoreEvaluateCallback(dataloaders=model_and_optimizers.evaluate_dataloaders,
metrics=model_and_optimizers.more_metrics,
evaluate_every=None, watch_monitor='loss', watch_monitor_larger_better=False,
folder=path, topk=1, topk_monitor='acc', only_state_dict=only_state_dict,
save_object='trainer')
]
n_epochs = 5
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
callbacks=callbacks,
output_from_new_proc="all",
evaluate_fn='train_step'
)

trainer.run()

all_saved_model_paths = {w.name: w for w in path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).iterdir()}
# 检查生成保存模型文件的数量是不是正确的;
if version == 0:
assert len(all_saved_model_paths) == n_epochs
elif version == 1:
assert len(all_saved_model_paths) == 1

for folder in all_saved_model_paths:
trainer = Trainer(
model=model_and_optimizers.model,
driver=driver,
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=7,
output_from_new_proc="all",
evaluate_fn='train_step'
)
folder = path.joinpath(os.environ[FASTNLP_LAUNCH_TIME]).joinpath(folder)
trainer.load_checkpoint(folder, only_state_dict=only_state_dict)

trainer.run()
trainer.driver.barrier()

finally:
rank_zero_rm(path)


if dist.is_initialized(): if dist.is_initialized():
dist.destroy_process_group() dist.destroy_process_group()

+ 26
- 26
tests/core/callbacks/test_progress_callback_torch.py View File

@@ -82,37 +82,37 @@ def model_and_optimizers(request):


@pytest.mark.torch @pytest.mark.torch
@pytest.mark.parametrize('device', ['cpu', [0, 1]]) @pytest.mark.parametrize('device', ['cpu', [0, 1]])
@pytest.mark.parametrize('progress_bar', ['rich', 'auto', None, 'raw', 'tqdm'])
@magic_argv_env_context @magic_argv_env_context
def test_run( model_and_optimizers: TrainerParameters, device, progress_bar):
def test_run( model_and_optimizers: TrainerParameters, device):


if device != 'cpu' and not torch.cuda.is_available(): if device != 'cpu' and not torch.cuda.is_available():
pytest.skip(f"No cuda for device:{device}") pytest.skip(f"No cuda for device:{device}")
n_epochs = 5 n_epochs = 5
trainer = Trainer(
model=model_and_optimizers.model,
driver='torch',
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
callbacks=None,
progress_bar=progress_bar,
output_from_new_proc="all",
evaluate_fn='train_step',
larger_better=False
)

trainer.run()

evaluator = Evaluator(model=model_and_optimizers.model, dataloaders=model_and_optimizers.train_dataloader,
driver=trainer.driver, metrics=model_and_optimizers.metrics,
progress_bar=progress_bar, evaluate_fn='train_step')
evaluator.run()
for progress_bar in ['rich', 'auto', None, 'raw', 'tqdm']:
trainer = Trainer(
model=model_and_optimizers.model,
driver='torch',
device=device,
optimizers=model_and_optimizers.optimizers,
train_dataloader=model_and_optimizers.train_dataloader,
evaluate_dataloaders=model_and_optimizers.evaluate_dataloaders,
input_mapping=model_and_optimizers.input_mapping,
output_mapping=model_and_optimizers.output_mapping,
metrics=model_and_optimizers.metrics,
n_epochs=n_epochs,
callbacks=None,
progress_bar=progress_bar,
output_from_new_proc="all",
evaluate_fn='train_step',
larger_better=False
)

trainer.run()

evaluator = Evaluator(model=model_and_optimizers.model, dataloaders=model_and_optimizers.train_dataloader,
driver=trainer.driver, metrics=model_and_optimizers.metrics,
progress_bar=progress_bar, evaluate_fn='train_step')
evaluator.run()


if dist.is_initialized(): if dist.is_initialized():
dist.destroy_process_group() dist.destroy_process_group()


Loading…
Cancel
Save