diff --git a/examples/multigpu/graphbolt/node_classification.py b/examples/multigpu/graphbolt/node_classification.py index b144fdb1d5cc..952b5b355c48 100644 --- a/examples/multigpu/graphbolt/node_classification.py +++ b/examples/multigpu/graphbolt/node_classification.py @@ -151,9 +151,7 @@ def evaluate(rank, model, dataloader, num_classes, device): y = [] y_hats = [] - for step, data in ( - tqdm.tqdm(enumerate(dataloader)) if rank == 0 else enumerate(dataloader) - ): + for data in tqdm.tqdm(dataloader) if rank == 0 else dataloader: blocks = data.blocks x = data.node_features["feat"] y.append(data.labels) @@ -271,8 +269,11 @@ def run(rank, world_size, args, devices, dataset): # Pin the graph and features to enable GPU access. if args.storage_device == "pinned": - dataset.graph.pin_memory_() - dataset.feature.pin_memory_() + graph = dataset.graph.pin_memory_() + feature = dataset.feature.pin_memory_() + else: + graph = dataset.graph.to(args.storage_device) + feature = dataset.feature.to(args.storage_device) train_set = dataset.tasks[0].train_set valid_set = dataset.tasks[0].validation_set @@ -280,13 +281,13 @@ def run(rank, world_size, args, devices, dataset): args.fanout = list(map(int, args.fanout.split(","))) num_classes = dataset.tasks[0].metadata["num_classes"] - in_size = dataset.feature.size("node", None, "feat")[0] + in_size = feature.size("node", None, "feat")[0] hidden_size = 256 out_size = num_classes - if args.gpu_cache_size > 0: - dataset.feature._features[("node", None, "feat")] = gb.GPUCachedFeature( - dataset.feature._features[("node", None, "feat")], + if args.gpu_cache_size > 0 and args.storage_device != "cuda": + feature._features[("node", None, "feat")] = gb.GPUCachedFeature( + feature._features[("node", None, "feat")], args.gpu_cache_size, ) @@ -297,24 +298,24 @@ def run(rank, world_size, args, devices, dataset): # Create data loaders. train_dataloader = create_dataloader( args, - dataset.graph, - dataset.feature, + graph, + feature, train_set, device, is_train=True, ) valid_dataloader = create_dataloader( args, - dataset.graph, - dataset.feature, + graph, + feature, valid_set, device, is_train=False, ) test_dataloader = create_dataloader( args, - dataset.graph, - dataset.feature, + graph, + feature, test_set, device, is_train=False, @@ -396,9 +397,9 @@ def parse_args(): parser.add_argument( "--mode", default="pinned-cuda", - choices=["cpu-cuda", "pinned-cuda"], - help="Dataset storage placement and Train device: 'cpu' for CPU and RAM," - " 'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory.", + choices=["cpu-cuda", "pinned-cuda", "cuda-cuda"], + help="Dataset storage placement and Train device: 'cpu' for CPU and RAM" + ", 'pinned' for pinned memory in RAM, 'cuda' for GPU and GPU memory.", ) return parser.parse_args() diff --git a/python/dgl/graphbolt/impl/fused_csc_sampling_graph.py b/python/dgl/graphbolt/impl/fused_csc_sampling_graph.py index de81c137833b..d30a3fbdfa83 100644 --- a/python/dgl/graphbolt/impl/fused_csc_sampling_graph.py +++ b/python/dgl/graphbolt/impl/fused_csc_sampling_graph.py @@ -1092,7 +1092,8 @@ def _pin(x): return self2._apply_to_members(_pin if device == "pinned" else _to) def pin_memory_(self): - """Copy `FusedCSCSamplingGraph` to the pinned memory in-place.""" + """Copy `FusedCSCSamplingGraph` to the pinned memory in-place. Returns + the same object modified in-place.""" # torch.Tensor.pin_memory() is not an inplace operation. To make it # truly in-place, we need to use cudaHostRegister. Then, we need to use # cudaHostUnregister to unpin the tensor in the destructor. @@ -1123,7 +1124,7 @@ def _pin(x): return x - self._apply_to_members(_pin) + return self._apply_to_members(_pin) def fused_csc_sampling_graph( diff --git a/python/dgl/graphbolt/impl/torch_based_feature_store.py b/python/dgl/graphbolt/impl/torch_based_feature_store.py index 577e29b7325b..9fd3c5f45f04 100644 --- a/python/dgl/graphbolt/impl/torch_based_feature_store.py +++ b/python/dgl/graphbolt/impl/torch_based_feature_store.py @@ -175,7 +175,8 @@ def metadata(self): ) def pin_memory_(self): - """In-place operation to copy the feature to pinned memory.""" + """In-place operation to copy the feature to pinned memory. Returns the + same object modified in-place.""" # torch.Tensor.pin_memory() is not an inplace operation. To make it # truly in-place, we need to use cudaHostRegister. Then, we need to use # cudaHostUnregister to unpin the tensor in the destructor. @@ -194,6 +195,8 @@ def pin_memory_(self): self._is_inplace_pinned.add(x) + return self + def is_pinned(self): """Returns True if the stored feature is pinned.""" return self._tensor.is_pinned() @@ -289,10 +292,13 @@ def __init__(self, feat_data: List[OnDiskFeatureData]): super().__init__(features) def pin_memory_(self): - """In-place operation to copy the feature store to pinned memory.""" + """In-place operation to copy the feature store to pinned memory. + Returns the same object modified in-place.""" for feature in self._features.values(): feature.pin_memory_() + return self + def is_pinned(self): """Returns True if all the stored features are pinned.""" return all(feature.is_pinned() for feature in self._features.values())