From 7afddaae937e13f5b9659ff85088298b2ec847e8 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 22 Aug 2023 10:57:50 +0900 Subject: [PATCH 1/9] quick and dirty multi-gpu support --- delft/applications/grobidTagger.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/delft/applications/grobidTagger.py b/delft/applications/grobidTagger.py index 0aee465c..0d1d7313 100644 --- a/delft/applications/grobidTagger.py +++ b/delft/applications/grobidTagger.py @@ -135,9 +135,9 @@ def configure(model, architecture, output_path=None, max_sequence_length=-1, bat # train a GROBID model with all available data -def train(model, embeddings_name=None, architecture=None, transformer=None, input_path=None, - output_path=None, features_indices=None, max_sequence_length=-1, batch_size=-1, max_epoch=-1, - use_ELMo=False, incremental=False, input_model_path=None, patience=-1, learning_rate=None): +def train(model, embeddings_name=None, architecture=None, transformer=None, input_path=None, + output_path=None, features_indices=None, max_sequence_length=-1, batch_size=-1, max_epoch=-1, + use_ELMo=False, incremental=False, input_model_path=None, patience=-1, learning_rate=None, multi_gpu=False): print('Loading data...') if input_path == None: @@ -188,7 +188,16 @@ def train(model, embeddings_name=None, architecture=None, transformer=None, inpu model.load() start_time = time.time() - model.train(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental=incremental) + if multi_gpu: + import tensorflow as tf + strategy = tf.distribute.MirroredStrategy() + print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) + + with strategy.scope(): + model.train(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental=incremental) + else: + model.train(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental=incremental) + runtime = round(time.time() - start_time, 3) print("training runtime: %s seconds " % (runtime)) @@ -396,6 +405,10 @@ class Tasks: parser.add_argument("--patience", type=int, default=-1, help="patience, number of extra epochs to perform after " "the best epoch before stopping a training.") parser.add_argument("--learning-rate", type=float, default=None, help="Initial learning rate") + parser.add_argument("--multi-gpu", default=False, help="Enable the support for distributed " + "computing (the batch size needs to be set " + "accordingly using --batch-size)", + action="store_true") @@ -415,6 +428,7 @@ class Tasks: incremental = args.incremental patience = args.patience learning_rate = args.learning_rate + multi_gpu = args.multi_gpu if architecture is None: raise ValueError("A model architecture has to be specified: " + str(architectures)) @@ -436,7 +450,8 @@ class Tasks: incremental=incremental, input_model_path=input_model_path, patience=patience, - learning_rate=learning_rate) + learning_rate=learning_rate, + multi_gpu=multi_gpu) if action == Tasks.EVAL: if args.fold_count is not None and args.fold_count > 1: @@ -471,6 +486,7 @@ class Tasks: someTexts.append("March the 27th, 2001") someTexts.append(" on April 27, 2001. . ") someTexts.append('2018') + someTexts.append('2023 July the 22nd') elif model == 'citation': someTexts.append("N. Al-Dhahir and J. Cioffi, \“On the uniform ADC bit precision and clip level computation for a Gaussian signal,\” IEEE Trans. Signal Processing, pp. 434–438, Feb. 1996.") someTexts.append("T. Steinherz, E. Rivlin, N. Intrator, Off-line cursive script word recognition—a survey, Int. J. Doc. Anal. Recognition 2(3) (1999) 1–33.") From 54b115d7ce7016732d664883bec8ebe4ffc8ca3b Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 22 Aug 2023 11:43:03 +0900 Subject: [PATCH 2/9] cleanup --- delft/applications/grobidTagger.py | 10 +--------- delft/sequenceLabelling/wrapper.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/delft/applications/grobidTagger.py b/delft/applications/grobidTagger.py index 0d1d7313..716fc43d 100644 --- a/delft/applications/grobidTagger.py +++ b/delft/applications/grobidTagger.py @@ -188,15 +188,7 @@ def train(model, embeddings_name=None, architecture=None, transformer=None, inpu model.load() start_time = time.time() - if multi_gpu: - import tensorflow as tf - strategy = tf.distribute.MirroredStrategy() - print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) - - with strategy.scope(): - model.train(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental=incremental) - else: - model.train(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental=incremental) + model.train(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental=incremental, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) print("training runtime: %s seconds " % (runtime)) diff --git a/delft/sequenceLabelling/wrapper.py b/delft/sequenceLabelling/wrapper.py index 29e09111..1793d64e 100644 --- a/delft/sequenceLabelling/wrapper.py +++ b/delft/sequenceLabelling/wrapper.py @@ -141,7 +141,18 @@ def __init__(self, early_stop, patience, max_checkpoints_to_keep, multiprocessing) - def train(self, x_train, y_train, f_train=None, x_valid=None, y_valid=None, f_valid=None, incremental=False, callbacks=None): + def train(self, x_train, y_train, f_train=None, x_valid=None, y_valid=None, f_valid=None, incremental=False, callbacks=None, multi_gpu=False): + # TBD if valid is None, segment train to get one if early_stop is True + if multi_gpu: + strategy = tf.distribute.MirroredStrategy() + print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) + + with strategy.scope(): + self.train_(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental, callbacks) + else: + self.train_(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental, callbacks) + + def train_(self, x_train, y_train, f_train=None, x_valid=None, y_valid=None, f_valid=None, incremental=False, callbacks=None): # TBD if valid is None, segment train to get one if early_stop is True # we concatenate all the training+validation data to create the model vocabulary From 83264582b40436de1c034cdd86c70e7c3f231d8a Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Tue, 22 Aug 2023 14:35:19 +0900 Subject: [PATCH 3/9] extend the multi-gpu to fold cross-validation --- delft/applications/grobidTagger.py | 6 +++--- delft/sequenceLabelling/wrapper.py | 12 +++++++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/delft/applications/grobidTagger.py b/delft/applications/grobidTagger.py index 716fc43d..b5965ecc 100644 --- a/delft/applications/grobidTagger.py +++ b/delft/applications/grobidTagger.py @@ -204,7 +204,7 @@ def train(model, embeddings_name=None, architecture=None, transformer=None, inpu def train_eval(model, embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, input_path=None, output_path=None, fold_count=1, features_indices=None, max_sequence_length=-1, batch_size=-1, max_epoch=-1, - use_ELMo=False, incremental=False, input_model_path=None, patience=-1, learning_rate=None): + use_ELMo=False, incremental=False, input_model_path=None, patience=-1, learning_rate=None, multi_gpu=False): print('Loading data...') if input_path is None: x_all, y_all, f_all = load_data_and_labels_crf_file('data/sequenceLabelling/grobid/'+model+'/'+model+'-060518.train') @@ -258,9 +258,9 @@ def train_eval(model, embeddings_name=None, architecture='BidLSTM_CRF', transfor start_time = time.time() if fold_count == 1: - model.train(x_train, y_train, f_train=f_train, x_valid=x_valid, y_valid=y_valid, f_valid=f_valid, incremental=incremental) + model.train(x_train, y_train, f_train=f_train, x_valid=x_valid, y_valid=y_valid, f_valid=f_valid, incremental=incremental, multi_gpu=multi_gpu) else: - model.train_nfold(x_train, y_train, f_train=f_train, x_valid=x_valid, y_valid=y_valid, f_valid=f_valid, incremental=incremental) + model.train_nfold(x_train, y_train, f_train=f_train, x_valid=x_valid, y_valid=y_valid, f_valid=f_valid, incremental=incremental, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) print("training runtime: %s seconds " % runtime) diff --git a/delft/sequenceLabelling/wrapper.py b/delft/sequenceLabelling/wrapper.py index 1793d64e..57945991 100644 --- a/delft/sequenceLabelling/wrapper.py +++ b/delft/sequenceLabelling/wrapper.py @@ -205,7 +205,17 @@ def train_(self, x_train, y_train, f_train=None, x_valid=None, y_valid=None, f_v if self.embeddings and self.embeddings.use_ELMo: self.embeddings.clean_ELMo_cache() - def train_nfold(self, x_train, y_train, x_valid=None, y_valid=None, f_train=None, f_valid=None, incremental=False, callbacks=None): + def train_nfold(self, x_train, y_train, x_valid=None, y_valid=None, f_train=None, f_valid=None, incremental=False, callbacks=None, multi_gpu=False): + if multi_gpu: + strategy = tf.distribute.MirroredStrategy() + print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) + + with strategy.scope(): + self.train_nfold_(x_train, y_train, x_valid, y_valid, f_train, f_valid, incremental, callbacks) + else: + self.train_nfold_(x_train, y_train, x_valid, y_valid, f_train, f_valid, incremental, callbacks) + + def train_nfold_(self, x_train, y_train, x_valid=None, y_valid=None, f_train=None, f_valid=None, incremental=False, callbacks=None): x_all = np.concatenate((x_train, x_valid), axis=0) if x_valid is not None else x_train y_all = np.concatenate((y_train, y_valid), axis=0) if y_valid is not None else y_train features_all = concatenate_or_none((f_train, f_valid), axis=0) From 226622a400da39f0f8c993fd3bafd355ac804d0f Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Thu, 24 Aug 2023 14:02:57 +0900 Subject: [PATCH 4/9] extend multi-gpu parameter to other scripts --- delft/applications/datasetTagger.py | 32 ++++++++++++++++----------- delft/applications/grobidTagger.py | 34 ++++++++++++++--------------- delft/applications/insultTagger.py | 17 +++++++++++---- delft/applications/nerTagger.py | 17 ++++++++++----- 4 files changed, 61 insertions(+), 39 deletions(-) diff --git a/delft/applications/datasetTagger.py b/delft/applications/datasetTagger.py index c414b62a..c4dfa890 100644 --- a/delft/applications/datasetTagger.py +++ b/delft/applications/datasetTagger.py @@ -68,7 +68,7 @@ def train(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, input_path=None, output_path=None, fold_count=1, features_indices=None, max_sequence_length=-1, batch_size=-1, max_epoch=-1, use_ELMo=False, patience=-1, - learning_rate=None): + learning_rate=None, multi_gpu=False): print('Loading data...') if input_path is None: x_all1 = y_all1 = x_all2 = y_all2 = x_all3 = y_all3 = [] @@ -116,7 +116,7 @@ def train(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, learning_rate=learning_rate) start_time = time.time() - model.train(x_train, y_train, x_valid=x_valid, y_valid=y_valid) + model.train(x_train, y_train, x_valid=x_valid, y_valid=y_valid, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) print("training runtime: %s seconds " % runtime) @@ -132,7 +132,7 @@ def train(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, def train_eval(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, input_path=None, output_path=None, fold_count=1, features_indices=None, max_sequence_length=-1, batch_size=-1, max_epoch=-1, use_ELMo=False, - patience=-1, learning_rate=None): + patience=-1, learning_rate=None, multi_gpu=False): print('Loading data...') if input_path is None: x_all1 = y_all1 = x_all2 = y_all2 = x_all3 = y_all3 = [] @@ -285,6 +285,9 @@ def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=Non parser.add_argument("--patience", type=int, default=-1, help="patience, number of extra epochs to perform after " "the best epoch before stopping a training.") parser.add_argument("--learning-rate", type=float, default=None, help="Initial learning rate") + parser.add_argument("--multi-gpu", default=False, + help="Enable the support for distributed computing (the batch size needs to be set accordingly using --batch-size)", + action="store_true") args = parser.parse_args() @@ -299,6 +302,7 @@ def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=Non use_ELMo = args.use_ELMo patience = args.patience learning_rate = args.learning_rate + multi_gpu = args.multi_gpu if transformer is None and embeddings_name is None: # default word embeddings @@ -306,15 +310,16 @@ def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=Non if action == "train": train(embeddings_name=embeddings_name, - architecture=architecture, - transformer=transformer, - input_path=input_path, - output_path=output, - max_sequence_length=max_sequence_length, - batch_size=batch_size, - use_ELMo=use_ELMo, - patience=patience, - learning_rate=learning_rate) + architecture=architecture, + transformer=transformer, + input_path=input_path, + output_path=output, + max_sequence_length=max_sequence_length, + batch_size=batch_size, + use_ELMo=use_ELMo, + patience=patience, + learning_rate=learning_rate, + multi_gpu=multi_gpu) if action == "eval": if args.fold_count is not None and args.fold_count > 1: @@ -337,7 +342,8 @@ def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=Non batch_size=batch_size, use_ELMo=use_ELMo, patience=patience, - learning_rate=learning_rate) + learning_rate=learning_rate, + multi_gpu=multi_gpu) if action == "tag": someTexts = [] diff --git a/delft/applications/grobidTagger.py b/delft/applications/grobidTagger.py index b5965ecc..e2c39047 100644 --- a/delft/applications/grobidTagger.py +++ b/delft/applications/grobidTagger.py @@ -397,9 +397,8 @@ class Tasks: parser.add_argument("--patience", type=int, default=-1, help="patience, number of extra epochs to perform after " "the best epoch before stopping a training.") parser.add_argument("--learning-rate", type=float, default=None, help="Initial learning rate") - parser.add_argument("--multi-gpu", default=False, help="Enable the support for distributed " - "computing (the batch size needs to be set " - "accordingly using --batch-size)", + parser.add_argument("--multi-gpu", default=False, + help="Enable the support for distributed computing (the batch size needs to be set accordingly using --batch-size)", action="store_true") @@ -431,19 +430,19 @@ class Tasks: if action == Tasks.TRAIN: train(model, - embeddings_name=embeddings_name, - architecture=architecture, - transformer=transformer, - input_path=input_path, - output_path=output, - max_sequence_length=max_sequence_length, - batch_size=batch_size, - use_ELMo=use_ELMo, - incremental=incremental, - input_model_path=input_model_path, - patience=patience, - learning_rate=learning_rate, - multi_gpu=multi_gpu) + embeddings_name=embeddings_name, + architecture=architecture, + transformer=transformer, + input_path=input_path, + output_path=output, + max_sequence_length=max_sequence_length, + batch_size=batch_size, + use_ELMo=use_ELMo, + incremental=incremental, + input_model_path=input_model_path, + patience=patience, + learning_rate=learning_rate, + multi_gpu=multi_gpu) if action == Tasks.EVAL: if args.fold_count is not None and args.fold_count > 1: @@ -468,7 +467,8 @@ class Tasks: use_ELMo=use_ELMo, incremental=incremental, input_model_path=input_model_path, - learning_rate=learning_rate) + learning_rate=learning_rate, + multi_gpu=multi_gpu) if action == Tasks.TAG: someTexts = [] diff --git a/delft/applications/insultTagger.py b/delft/applications/insultTagger.py index 0d00a1b2..f904ca5f 100644 --- a/delft/applications/insultTagger.py +++ b/delft/applications/insultTagger.py @@ -21,7 +21,8 @@ def configure(architecture, embeddings_name): return batch_size, maxlen, patience, early_stop, max_epoch, embeddings_name -def train(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, use_ELMo=False, learning_rate=None): +def train(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, use_ELMo=False, learning_rate=None, + multi_gpu=False): batch_size, maxlen, patience, early_stop, max_epoch, embeddings_name = configure(architecture, embeddings_name) root = 'data/sequenceLabelling/toxic/' @@ -42,7 +43,7 @@ def train(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, us model = Sequence(model_name, max_epoch=max_epoch, batch_size=batch_size, max_sequence_length=maxlen, embeddings_name=embeddings_name, architecture=architecture, patience=patience, early_stop=early_stop, transformer_name=transformer, use_ELMo=use_ELMo, learning_rate=learning_rate) - model.train(x_train, y_train, x_valid=x_valid, y_valid=y_valid) + model.train(x_train, y_train, x_valid=x_valid, y_valid=y_valid, multi_gpu=multi_gpu) print('training done') # saving the model (must be called after eval for multiple fold training) @@ -115,7 +116,10 @@ def annotate(texts, output_format, architecture='BidLSTM_CRF', transformer=None, ) parser.add_argument("--use-ELMo", action="store_true", help="Use ELMo contextual embeddings") parser.add_argument("--learning-rate", type=float, default=None, help="Initial learning rate") - + parser.add_argument("--multi-gpu", default=False, + help="Enable the support for distributed computing (the batch size needs to be set accordingly using --batch-size)", + action="store_true") + args = parser.parse_args() if args.action not in ('train', 'tag'): @@ -126,13 +130,18 @@ def annotate(texts, output_format, architecture='BidLSTM_CRF', transformer=None, transformer = args.transformer use_ELMo = args.use_ELMo learning_rate = args.learning_rate + multi_gpu = args.multi_gpu if transformer == None and embeddings_name == None: # default word embeddings embeddings_name = "glove-840B" if args.action == 'train': - train(embeddings_name=embeddings_name, architecture=architecture, transformer=transformer, use_ELMo=use_ELMo, learning_rate=learning_rate) + train(embeddings_name=embeddings_name, + architecture=architecture, + transformer=transformer, use_ELMo=use_ELMo, + learning_rate=learning_rate, + multi_gpu=multi_gpu) if args.action == 'tag': someTexts = ['This is a gentle test.', diff --git a/delft/applications/nerTagger.py b/delft/applications/nerTagger.py index 36a4deab..d7095171 100644 --- a/delft/applications/nerTagger.py +++ b/delft/applications/nerTagger.py @@ -195,7 +195,8 @@ def train_eval(embeddings_name=None, patience=-1, batch_size=-1, max_sequence_length=-1, - learning_rate=None): + learning_rate=None, + multi_gpu = False): batch_size, max_sequence_length, patience, recurrent_dropout, early_stop, max_epoch, embeddings_name, word_lstm_units, multiprocessing = \ configure(architecture, dataset_type, lang, embeddings_name, use_ELMo, @@ -453,9 +454,9 @@ def train_eval(embeddings_name=None, start_time = time.time() if fold_count == 1: - model.train(x_train, y_train, x_valid=x_valid, y_valid=y_valid) + model.train(x_train, y_train, x_valid=x_valid, y_valid=y_valid, multi_gpu=multi_gpu) else: - model.train_nfold(x_train, y_train, x_valid=x_valid, y_valid=y_valid) + model.train_nfold(x_train, y_train, x_valid=x_valid, y_valid=y_valid, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) print("training runtime: %s seconds " % (runtime)) @@ -613,6 +614,9 @@ def annotate(output_format, parser.add_argument("--patience", type=int, default=-1, help="patience, number of extra epochs to perform after " "the best epoch before stopping a training.") parser.add_argument("--learning-rate", type=float, default=None, help="Initial learning rate") + parser.add_argument("--multi-gpu", default=False, + help="Enable the support for distributed computing (the batch size needs to be set accordingly using --batch-size)", + action="store_true") args = parser.parse_args() @@ -634,6 +638,7 @@ def annotate(output_format, max_sequence_length = args.max_sequence_length batch_size = args.batch_size learning_rate = args.learning_rate + multi_gpu = args.multi_gpu # name of embeddings refers to the file delft/resources-registry.json # be sure to use here the same name as in the registry ('glove-840B', 'fasttext-crawl', 'word2vec'), @@ -653,7 +658,8 @@ def annotate(output_format, max_sequence_length=max_sequence_length, batch_size=batch_size, patience=patience, - learning_rate=learning_rate + learning_rate=learning_rate, + multi_gpu=multi_gpu ) if action == 'train_eval': @@ -672,7 +678,8 @@ def annotate(output_format, max_sequence_length=max_sequence_length, batch_size=batch_size, patience=patience, - learning_rate=learning_rate + learning_rate=learning_rate, + multi_gpu=multi_gpu ) if action == 'eval': From e169867f6911bb934147077beb0f1dcab4ca7a19 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 1 Sep 2023 12:26:25 +0900 Subject: [PATCH 5/9] fix exception when multi-gpu is used on a single gpu, improve message --- delft/sequenceLabelling/wrapper.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/delft/sequenceLabelling/wrapper.py b/delft/sequenceLabelling/wrapper.py index 57945991..7e30e68b 100644 --- a/delft/sequenceLabelling/wrapper.py +++ b/delft/sequenceLabelling/wrapper.py @@ -145,7 +145,12 @@ def train(self, x_train, y_train, f_train=None, x_valid=None, y_valid=None, f_va # TBD if valid is None, segment train to get one if early_stop is True if multi_gpu: strategy = tf.distribute.MirroredStrategy() - print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) + print('Running with multi-gpu. Number of devices: {}'.format(strategy.num_replicas_in_sync)) + + # This trick avoid an exception being through when the --multi-gpu approach is used on a single GPU system. + # It might be removed with TF 2.10 https://github.com/tensorflow/tensorflow/issues/50487 + import atexit + atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore with strategy.scope(): self.train_(x_train, y_train, f_train, x_valid, y_valid, f_valid, incremental, callbacks) @@ -208,7 +213,12 @@ def train_(self, x_train, y_train, f_train=None, x_valid=None, y_valid=None, f_v def train_nfold(self, x_train, y_train, x_valid=None, y_valid=None, f_train=None, f_valid=None, incremental=False, callbacks=None, multi_gpu=False): if multi_gpu: strategy = tf.distribute.MirroredStrategy() - print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) + print('Running with multi-gpu. Number of devices: {}'.format(strategy.num_replicas_in_sync)) + + # This trick avoid an exception being through when the --multi-gpu approach is used on a single GPU system. + # It might be removed with TF 2.10 https://github.com/tensorflow/tensorflow/issues/50487 + import atexit + atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore with strategy.scope(): self.train_nfold_(x_train, y_train, x_valid, y_valid, f_train, f_valid, incremental, callbacks) @@ -221,7 +231,7 @@ def train_nfold_(self, x_train, y_train, x_valid=None, y_valid=None, f_train=Non features_all = concatenate_or_none((f_train, f_valid), axis=0) if incremental: - if self.model == None and self.models == None: + if self.model is None and self.models is None: print("error: you must load a model first for an incremental training") return From d05ed043405f23ca439a0a7bfef56acea86bc761 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 1 Sep 2023 12:33:27 +0900 Subject: [PATCH 6/9] enable multi-gpu for inference in sequence labelling --- delft/applications/datasetTagger.py | 6 +++--- delft/applications/grobidTagger.py | 6 +++--- delft/applications/insultTagger.py | 4 ++-- delft/applications/nerTagger.py | 23 +++++++++++++---------- delft/sequenceLabelling/tagger.py | 19 ++++++++++++++++++- delft/sequenceLabelling/wrapper.py | 25 ++++++++++++++++++++----- 6 files changed, 59 insertions(+), 24 deletions(-) diff --git a/delft/applications/datasetTagger.py b/delft/applications/datasetTagger.py index c4dfa890..1ff7b663 100644 --- a/delft/applications/datasetTagger.py +++ b/delft/applications/datasetTagger.py @@ -207,7 +207,7 @@ def eval_(input_path=None, architecture=None): # annotate a list of texts -def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=None, use_ELMo=False): +def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=None, use_ELMo=False, multi_gpu=False): annotations = [] # load model @@ -221,7 +221,7 @@ def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=Non start_time = time.time() - annotations = model.tag(texts, output_format, features=features) + annotations = model.tag(texts, output_format, features=features, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) if output_format == 'json': @@ -353,7 +353,7 @@ def annotate_text(texts, output_format, architecture='BidLSTM_CRF', features=Non someTexts.append("We also compare ShanghaiTechRGBD with other RGB-D crowd counting datasets in , and we can see that ShanghaiTechRGBD is the most challenging RGB-D crowd counting dataset in terms of the number of images and heads.") someTexts.append("Insulin levels of all samples were measured by ELISA kit (Mercodia)") - result = annotate_text(someTexts, "json", architecture=architecture, use_ELMo=use_ELMo) + result = annotate_text(someTexts, "json", architecture=architecture, use_ELMo=use_ELMo, multi_gpu=multi_gpu) print(json.dumps(result, sort_keys=False, indent=4, ensure_ascii=False)) diff --git a/delft/applications/grobidTagger.py b/delft/applications/grobidTagger.py index e2c39047..ebda33a0 100644 --- a/delft/applications/grobidTagger.py +++ b/delft/applications/grobidTagger.py @@ -309,7 +309,7 @@ def eval_(model, input_path=None, architecture='BidLSTM_CRF', use_ELMo=False): # annotate a list of texts, this is relevant only of models taking only text as input # (so not text with layout information) -def annotate_text(texts, model, output_format, architecture='BidLSTM_CRF', features=None, use_ELMo=False): +def annotate_text(texts, model, output_format, architecture='BidLSTM_CRF', features=None, use_ELMo=False, multi_gpu=False): annotations = [] # load model @@ -323,7 +323,7 @@ def annotate_text(texts, model, output_format, architecture='BidLSTM_CRF', featu start_time = time.time() - annotations = model.tag(texts, output_format, features=features) + annotations = model.tag(texts, output_format, features=features, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) if output_format == 'json': @@ -495,7 +495,7 @@ class Tasks: someTexts.append("The statistical analysis was performed using IBM SPSS Statistics v. 20 (SPSS Inc, 2003, Chicago, USA).") if architecture.find("FEATURE") == -1: - result = annotate_text(someTexts, model, "json", architecture=architecture, use_ELMo=use_ELMo) + result = annotate_text(someTexts, model, "json", architecture=architecture, use_ELMo=use_ELMo, multi_gpu=multi_gpu) print(json.dumps(result, sort_keys=False, indent=4, ensure_ascii=False)) else: print("The model " + architecture + " cannot be used without supplying features as input and it's disabled. " diff --git a/delft/applications/insultTagger.py b/delft/applications/insultTagger.py index f904ca5f..310646e0 100644 --- a/delft/applications/insultTagger.py +++ b/delft/applications/insultTagger.py @@ -51,7 +51,7 @@ def train(embeddings_name=None, architecture='BidLSTM_CRF', transformer=None, us # annotate a list of texts, provides results in a list of offset mentions -def annotate(texts, output_format, architecture='BidLSTM_CRF', transformer=None, use_ELMo=False): +def annotate(texts, output_format, architecture='BidLSTM_CRF', transformer=None, use_ELMo=False, multi_gpu=False): annotations = [] model_name = 'insult-' + architecture @@ -64,7 +64,7 @@ def annotate(texts, output_format, architecture='BidLSTM_CRF', transformer=None, start_time = time.time() - annotations = model.tag(texts, output_format) + annotations = model.tag(texts, output_format, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) if output_format == 'json': diff --git a/delft/applications/nerTagger.py b/delft/applications/nerTagger.py index d7095171..747aaf6b 100644 --- a/delft/applications/nerTagger.py +++ b/delft/applications/nerTagger.py @@ -67,7 +67,8 @@ def configure(architecture, dataset_type, lang, embeddings_name, use_ELMo, max_s # train a model with all available for a given dataset def train(dataset_type='conll2003', lang='en', embeddings_name=None, architecture='BidLSTM_CRF', - transformer=None, data_path=None, use_ELMo=False, max_sequence_length=-1, batch_size=-1, patience=-1, learning_rate=None): + transformer=None, data_path=None, use_ELMo=False, max_sequence_length=-1, batch_size=-1, patience=-1, + learning_rate=None, multi_gpu=False): batch_size, max_sequence_length, patience, recurrent_dropout, early_stop, max_epoch, embeddings_name, word_lstm_units, multiprocessing = \ configure(architecture, dataset_type, lang, embeddings_name, use_ELMo, max_sequence_length, batch_size, patience) @@ -196,7 +197,7 @@ def train_eval(embeddings_name=None, batch_size=-1, max_sequence_length=-1, learning_rate=None, - multi_gpu = False): + multi_gpu=False): batch_size, max_sequence_length, patience, recurrent_dropout, early_stop, max_epoch, embeddings_name, word_lstm_units, multiprocessing = \ configure(architecture, dataset_type, lang, embeddings_name, use_ELMo, @@ -519,7 +520,8 @@ def annotate(output_format, architecture='BidLSTM_CRF', file_in=None, file_out=None, - use_ELMo=False): + use_ELMo=False, + multi_gpu=False): if file_in is None: raise ValueError("an input file to be annotated must be provided") @@ -556,7 +558,7 @@ def annotate(output_format, start_time = time.time() - model.tag_file(file_in=file_in, output_format=output_format, file_out=file_out) + model.tag_file(file_in=file_in, output_format=output_format, file_out=file_out, multi_gpu=multi_gpu) runtime = round(time.time() - start_time, 3) print("runtime: %s seconds " % (runtime)) @@ -695,14 +697,15 @@ def annotate(output_format, print("Language not supported:", lang) else: print(file_in) - result = annotate("json", - dataset_type, - lang, - architecture=architecture, + annotate("json", + dataset_type, + lang, + architecture=architecture, #transformer=transformer, - file_in=file_in, + file_in=file_in, file_out=file_out, - use_ELMo=use_ELMo) + use_ELMo=use_ELMo, + multi_gpu=multi_gpu) """ if result is not None: if file_out is None: diff --git a/delft/sequenceLabelling/tagger.py b/delft/sequenceLabelling/tagger.py index 71f6a2f9..e59c1c37 100644 --- a/delft/sequenceLabelling/tagger.py +++ b/delft/sequenceLabelling/tagger.py @@ -1,6 +1,7 @@ import datetime import numpy as np +import tensorflow as tf from delft.sequenceLabelling.data_generator import DataGeneratorTransformers from delft.sequenceLabelling.preprocess import Preprocessor @@ -22,7 +23,23 @@ def __init__(self, self.model_config = model_config self.embeddings = embeddings - def tag(self, texts, output_format, features=None): + + def tag(self, texts, output_format, features=None, multi_gpu=False): + if multi_gpu: + strategy = tf.distribute.MirroredStrategy() + print('Running with multi-gpu. Number of devices: {}'.format(strategy.num_replicas_in_sync)) + + # This trick avoid an exception being through when the --multi-gpu approach is used on a single GPU system. + # It might be removed with TF 2.10 https://github.com/tensorflow/tensorflow/issues/50487 + import atexit + atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore + + with strategy.scope(): + self.tag_(texts, output_format, features) + else: + return self.tag(texts, output_format, features) + + def tag_(self, texts, output_format, features=None): if output_format == 'json': res = { diff --git a/delft/sequenceLabelling/wrapper.py b/delft/sequenceLabelling/wrapper.py index 7e30e68b..ec6f7483 100644 --- a/delft/sequenceLabelling/wrapper.py +++ b/delft/sequenceLabelling/wrapper.py @@ -466,7 +466,22 @@ def eval_nfold(self, x_test, y_test, features=None): print(get_report(fold_average_evaluation, digits=4, include_avgs=['micro'])) - def tag(self, texts, output_format, features=None, batch_size=None): + def tag(self, texts, output_format, features=None, batch_size=None, multi_gpu=False): + if multi_gpu: + strategy = tf.distribute.MirroredStrategy() + print('Running with multi-gpu. Number of devices: {}'.format(strategy.num_replicas_in_sync)) + + # This trick avoid an exception being through when the --multi-gpu approach is used on a single GPU system. + # It might be removed with TF 2.10 https://github.com/tensorflow/tensorflow/issues/50487 + import atexit + atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore + + with strategy.scope(): + self.tag_(texts, output_format, features, batch_size) + else: + return self.tag(texts, output_format, features, batch_size) + + def tag_(self, texts, output_format, features=None, batch_size=None): # annotate a list of sentences, return the list of annotations in the # specified output_format @@ -493,13 +508,13 @@ def tag(self, texts, output_format, features=None, batch_size=None): else: raise (OSError('Could not find a model.' + str(self.model))) - def tag_file(self, file_in, output_format, file_out, batch_size=None): + def tag_file(self, file_in, output_format, file_out, batch_size=None, multi_gpu=False): # Annotate a text file containing one sentence per line, the annotations are # written in the output file if not None, in the standard output otherwise. # Processing is streamed by batches so that we can process huge files without # memory issues - if batch_size != None: + if batch_size is not None: self.model_config.batch_size = batch_size print("---") print("batch_size (prediction):", self.model_config.batch_size) @@ -517,10 +532,10 @@ def tag_file(self, file_in, output_format, file_out, batch_size=None): first = True with open(file_in, 'r') as f: texts = None - while texts == None or len(texts) == self.model_config.batch_size * self.nb_workers: + while texts is None or len(texts) == self.model_config.batch_size * self.nb_workers: texts = next_n_lines(f, self.model_config.batch_size * self.nb_workers) - annotations = tagger.tag(texts, output_format) + annotations = tagger.tag(texts, output_format, multi_gpu=multi_gpu) # if the following is true, we just output the JSON returned by the tagger without any modification directDump = False if first: From 0b066adcb94c0e445f3ed6a22697e0ad2a7aec5d Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 1 Sep 2023 12:50:01 +0900 Subject: [PATCH 7/9] fix wrong method name call --- delft/sequenceLabelling/tagger.py | 2 +- delft/sequenceLabelling/wrapper.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/delft/sequenceLabelling/tagger.py b/delft/sequenceLabelling/tagger.py index e59c1c37..95bfc2f0 100644 --- a/delft/sequenceLabelling/tagger.py +++ b/delft/sequenceLabelling/tagger.py @@ -37,7 +37,7 @@ def tag(self, texts, output_format, features=None, multi_gpu=False): with strategy.scope(): self.tag_(texts, output_format, features) else: - return self.tag(texts, output_format, features) + return self.tag_(texts, output_format, features) def tag_(self, texts, output_format, features=None): diff --git a/delft/sequenceLabelling/wrapper.py b/delft/sequenceLabelling/wrapper.py index ec6f7483..3c26db2e 100644 --- a/delft/sequenceLabelling/wrapper.py +++ b/delft/sequenceLabelling/wrapper.py @@ -479,7 +479,7 @@ def tag(self, texts, output_format, features=None, batch_size=None, multi_gpu=Fa with strategy.scope(): self.tag_(texts, output_format, features, batch_size) else: - return self.tag(texts, output_format, features, batch_size) + return self.tag_(texts, output_format, features, batch_size) def tag_(self, texts, output_format, features=None, batch_size=None): # annotate a list of sentences, return the list of annotations in the From 8a03944a25595a5547e8b67bce40a65d4871dab8 Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 1 Sep 2023 12:57:58 +0900 Subject: [PATCH 8/9] fix missing return --- delft/sequenceLabelling/tagger.py | 2 +- delft/sequenceLabelling/wrapper.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/delft/sequenceLabelling/tagger.py b/delft/sequenceLabelling/tagger.py index 95bfc2f0..d7fa4822 100644 --- a/delft/sequenceLabelling/tagger.py +++ b/delft/sequenceLabelling/tagger.py @@ -35,7 +35,7 @@ def tag(self, texts, output_format, features=None, multi_gpu=False): atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore with strategy.scope(): - self.tag_(texts, output_format, features) + return self.tag_(texts, output_format, features) else: return self.tag_(texts, output_format, features) diff --git a/delft/sequenceLabelling/wrapper.py b/delft/sequenceLabelling/wrapper.py index 3c26db2e..3ea20f20 100644 --- a/delft/sequenceLabelling/wrapper.py +++ b/delft/sequenceLabelling/wrapper.py @@ -477,7 +477,7 @@ def tag(self, texts, output_format, features=None, batch_size=None, multi_gpu=Fa atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore with strategy.scope(): - self.tag_(texts, output_format, features, batch_size) + return self.tag_(texts, output_format, features, batch_size) else: return self.tag_(texts, output_format, features, batch_size) From f8df0131f31238bbf617667e53c27e42267ab52d Mon Sep 17 00:00:00 2001 From: Luca Foppiano Date: Fri, 1 Sep 2023 12:59:09 +0900 Subject: [PATCH 9/9] fix the closing pool only for tf < 2.10.0 --- delft/sequenceLabelling/tagger.py | 7 ++++--- delft/sequenceLabelling/wrapper.py | 7 +++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/delft/sequenceLabelling/tagger.py b/delft/sequenceLabelling/tagger.py index d7fa4822..cf389f54 100644 --- a/delft/sequenceLabelling/tagger.py +++ b/delft/sequenceLabelling/tagger.py @@ -2,6 +2,7 @@ import numpy as np import tensorflow as tf +from packaging import version from delft.sequenceLabelling.data_generator import DataGeneratorTransformers from delft.sequenceLabelling.preprocess import Preprocessor @@ -23,7 +24,6 @@ def __init__(self, self.model_config = model_config self.embeddings = embeddings - def tag(self, texts, output_format, features=None, multi_gpu=False): if multi_gpu: strategy = tf.distribute.MirroredStrategy() @@ -31,8 +31,9 @@ def tag(self, texts, output_format, features=None, multi_gpu=False): # This trick avoid an exception being through when the --multi-gpu approach is used on a single GPU system. # It might be removed with TF 2.10 https://github.com/tensorflow/tensorflow/issues/50487 - import atexit - atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore + if version.parse(tf.__version__) < version.parse('2.10.0'): + import atexit + atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore with strategy.scope(): return self.tag_(texts, output_format, features) diff --git a/delft/sequenceLabelling/wrapper.py b/delft/sequenceLabelling/wrapper.py index 3ea20f20..f2b2d802 100644 --- a/delft/sequenceLabelling/wrapper.py +++ b/delft/sequenceLabelling/wrapper.py @@ -1,5 +1,7 @@ import os +from packaging import version + # ask tensorflow to be quiet and not print hundred lines of logs from delft.utilities.Transformer import TRANSFORMER_CONFIG_FILE_NAME, DEFAULT_TRANSFORMER_TOKENIZER_DIR from delft.utilities.misc import print_parameters @@ -473,8 +475,9 @@ def tag(self, texts, output_format, features=None, batch_size=None, multi_gpu=Fa # This trick avoid an exception being through when the --multi-gpu approach is used on a single GPU system. # It might be removed with TF 2.10 https://github.com/tensorflow/tensorflow/issues/50487 - import atexit - atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore + if version.parse(tf.__version__) < version.parse('2.10.0'): + import atexit + atexit.register(strategy._extended._collective_ops._pool.close) # type: ignore with strategy.scope(): return self.tag_(texts, output_format, features, batch_size)