Source code for libai.data.build

# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import omegaconf
from oneflow.utils.data import DataLoader
from oneflow.utils.data.dataset import ConcatDataset

from libai.utils import distributed as dist

from .data_utils import split_ds
from .samplers import CyclicSampler, SingleRoundSampler
from .structures import Instance


[docs]def build_nlp_train_val_test_loader( dataset, splits, weights, train_batch_size, test_batch_size, sampler=None, num_workers=4, consumed_samples=0, seed=0, collate_fn=None, dataset_mixer=ConcatDataset, ): """ Build nlp train_val_test dataloader, it's used for dataset lack of valid/test dataset Returns: It will return train/valid/test dataloader * train_loader: dataloader for training * valid_loader: dataloader for validation * test_loader: dataloader for testing Arguments: dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...] splits: ratio config for spliting dataset to train/valid/test. e.g.: [[7, 2, 1], ...] weights: ratio config for concate dataset list (Not Supported yet). e.g.: [1.0, ...] train_batch_size: how many samples per batch to load in training (micro-batch-size per GPU). test_batch_size: how many samples per batch to load in testing (micro-batch-size per GPU). sampler: defines the strategy to draw samples from the dataset. Can be any ``Iterable`` with ``__len__`` implemented. num_workers: how many subprocesses to use for data loading. ``0`` means that the data will be loaded in the main process. (default: ``4``). consumed_samples: the number of samples that have been trained at the current time, used for resuming training (default: ``0``). seed: random seed, used for reproducing experiments (default: ``0``). collate_fn: merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. dataset_mixer: function for concating list dataset. """ # TODO: add dataset_weights sampler if isinstance(dataset, omegaconf.listconfig.ListConfig): dataset = list(dataset) elif not isinstance(dataset, list): dataset = [dataset] assert len(dataset) == len(splits), "datasets length must equal splits length" assert len(dataset) == len(weights), "datasets length must equal weights length" train_datasets, val_datasets, test_datasets = [], [], [] for dst, split in zip(dataset, splits): train_dataset, val_dataset, test_dataset = split_ds(dst, split) train_datasets.append(train_dataset) val_datasets.append(val_dataset) test_datasets.append(test_dataset) # [dataset, dataset] -> dataset -> dataloader train_dataset = dataset_mixer(train_datasets) val_dataset = dataset_mixer(val_datasets) test_dataset = dataset_mixer(test_datasets) collate_fn = trivial_batch_collator if collate_fn is None else collate_fn train_loader, _, _ = build_nlp_train_loader( dataset=train_dataset, train_batch_size=train_batch_size, test_batch_size=None, sampler=sampler, num_workers=num_workers, consumed_samples=consumed_samples, seed=seed, collate_fn=collate_fn, ) valid_loader = build_nlp_test_loader( dataset=val_dataset, test_batch_size=test_batch_size, sampler=sampler, num_workers=num_workers, seed=seed, collate_fn=collate_fn, ) test_loader = build_nlp_test_loader( dataset=test_dataset, test_batch_size=test_batch_size, sampler=sampler, num_workers=num_workers, seed=seed, collate_fn=collate_fn, ) return train_loader, valid_loader, test_loader
[docs]def build_nlp_train_loader( dataset, train_batch_size, test_batch_size=None, sampler=None, num_workers=4, consumed_samples=0, seed=0, collate_fn=None, dataset_mixer=ConcatDataset, **kwargs ): """ Build nlp train dataloader, it's used for train dataset Returns: It will return train dataloader, and Nonetype for valid/test dataloader * train_loader: dataloader for training * None: Nonetype * None: Nonetype Arguments: dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...] train_batch_size: how many samples per batch to load in training (micro-batch-size per GPU). test_batch_size: no use, set it to None. sampler: defines the strategy to draw samples from the dataset. Can be any ``Iterable`` with ``__len__`` implemented. num_workers: how many subprocesses to use for data loading. ``0`` means that the data will be loaded in the main process. (default: ``4``). consumed_samples: the number of samples that have been trained at the current time, used for resuming training (default: ``0``). seed: random seed, used for reproducing experiments (default: ``0``). collate_fn: merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. dataset_mixer: function for concating list dataset. """ if isinstance(dataset, omegaconf.listconfig.ListConfig): dataset = list(dataset) elif not isinstance(dataset, list): dataset = [dataset] if len(dataset) > 1: dataset = dataset_mixer(dataset) else: dataset = dataset[0] if sampler is None: sampler = CyclicSampler( dataset=dataset, micro_batch_size=train_batch_size, shuffle=True, consumed_samples=consumed_samples, data_parallel_rank=dist.get_data_parallel_rank(), data_parallel_size=dist.get_data_parallel_size(), seed=seed, ) dataloader = DataLoader( dataset, batch_sampler=sampler, num_workers=num_workers, collate_fn=trivial_batch_collator if collate_fn is None else collate_fn, **kwargs, ) return dataloader, None, None
[docs]def build_nlp_test_loader( dataset, test_batch_size, sampler=None, num_workers=4, seed=0, collate_fn=None, ): """ Build nlp test dataloader, it's used for test dataset Returns: It will return test dataloader * test_loader: dataloader for testing Arguments: dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...] test_batch_size: how many samples per batch to load in testing (micro-batch-size per GPU). sampler: defines the strategy to draw samples from the dataset. Can be any ``Iterable`` with ``__len__`` implemented. num_workers: how many subprocesses to use for data loading. ``0`` means that the data will be loaded in the main process. (default: ``4``). seed: random seed, used for reproducing experiments (default: ``0``). collate_fn: merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. """ collate_fn = trivial_batch_collator if collate_fn is None else collate_fn if sampler is None: sampler = SingleRoundSampler( dataset=dataset, micro_batch_size=test_batch_size, shuffle=False, data_parallel_rank=dist.get_data_parallel_rank(), data_parallel_size=dist.get_data_parallel_size(), seed=seed, drop_last=False, ) test_loader = DataLoader( dataset, batch_sampler=sampler, num_workers=num_workers, collate_fn=collate_fn ) return test_loader
[docs]def build_image_train_loader( dataset, train_batch_size, test_batch_size=None, sampler=None, num_workers=4, consumed_samples=0, seed=0, collate_fn=None, dataset_mixer=ConcatDataset, mixup_func=None, **kwargs ): """ Build image train dataloader, it's used for train dataset Returns: It will return train dataloader, and Nonetype for valid/test dataloader * train_loader: dataloader for training * None: Nonetype * None: Nonetype Arguments: dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...] train_batch_size: how many samples per batch to load in training (micro-batch-size per GPU). test_batch_size: no use, set it to None. sampler: defines the strategy to draw samples from the dataset. Can be any ``Iterable`` with ``__len__`` implemented. num_workers: how many subprocesses to use for data loading. ``0`` means that the data will be loaded in the main process. (default: ``4``). consumed_samples: the number of samples that have been trained at the current time, used for resuming training (default: ``0``). seed: random seed, used for reproducing experiments (default: ``0``). collate_fn: merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. dataset_mixer: function for concating list dataset. mixup_func: function for data argumentation. """ if isinstance(dataset, omegaconf.listconfig.ListConfig): dataset = list(dataset) elif not isinstance(dataset, list): dataset = [dataset] if len(dataset) > 1: dataset = dataset_mixer(dataset) else: dataset = dataset[0] if sampler is None: sampler = CyclicSampler( dataset=dataset, micro_batch_size=train_batch_size, shuffle=True, consumed_samples=consumed_samples, data_parallel_rank=dist.get_data_parallel_rank(), data_parallel_size=dist.get_data_parallel_size(), seed=seed, ) dataloader = DataLoader( dataset, batch_sampler=sampler, num_workers=num_workers, collate_fn=trivial_batch_collator if collate_fn is None else collate_fn, **kwargs, ) # Bind up mixup_func to dataloader, and this will be used in Trainer.step dataloader.mixup_func = mixup_func return dataloader, None, None
[docs]def build_image_test_loader( dataset, test_batch_size, sampler=None, num_workers=4, seed=0, collate_fn=None, **kwargs ): """ Build image test dataloader, it's used for test dataset Returns: It will return test dataloader * test_loader: dataloader for testing Arguments: dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...] test_batch_size: how many samples per batch to load in testing (micro-batch-size per GPU). sampler: defines the strategy to draw samples from the dataset. Can be any ``Iterable`` with ``__len__`` implemented. num_workers: how many subprocesses to use for data loading. ``0`` means that the data will be loaded in the main process. (default: ``4``). seed: random seed, used for reproducing experiments (default: ``0``). collate_fn: merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. """ if sampler is None: sampler = SingleRoundSampler( dataset=dataset, micro_batch_size=test_batch_size, shuffle=False, data_parallel_rank=dist.get_data_parallel_rank(), data_parallel_size=dist.get_data_parallel_size(), seed=seed, drop_last=False, ) return DataLoader( dataset, batch_sampler=sampler, num_workers=num_workers, collate_fn=trivial_batch_collator if collate_fn is None else collate_fn, **kwargs, )
def trivial_batch_collator(batch): assert isinstance(batch[0], Instance), "batch[0] must be `instance` for trivial batch collator" batch = Instance.stack(batch) return batch