diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 08343df5505a..079ed8806a96 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -88,24 +88,26 @@ def _dump_part_config(part_config, part_metadata): json.dump(part_metadata, outfile, sort_keys=False, indent=4) -def _save_graphs(filename, g_list, formats=None, sort_etypes=False): +def process_partitions(g, formats=None, sort_etypes=False): """Preprocess partitions before saving: 1. format data types. 2. sort csc/csr by tag. """ - for g in g_list: - for k, dtype in RESERVED_FIELD_DTYPE.items(): - if k in g.ndata: - g.ndata[k] = F.astype(g.ndata[k], dtype) - if k in g.edata: - g.edata[k] = F.astype(g.edata[k], dtype) - for g in g_list: - if (not sort_etypes) or (formats is None): - continue + for k, dtype in RESERVED_FIELD_DTYPE.items(): + if k in g.ndata: + g.ndata[k] = F.astype(g.ndata[k], dtype) + if k in g.edata: + g.edata[k] = F.astype(g.edata[k], dtype) + + if (sort_etypes) and (formats is not None): if "csr" in formats: g = sort_csr_by_tag(g, tag=g.edata[ETYPE], tag_type="edge") if "csc" in formats: g = sort_csc_by_tag(g, tag=g.edata[ETYPE], tag_type="edge") + return g + + +def _save_dgl_graphs(filename, g_list, formats=None): save_graphs(filename, g_list, formats=formats) @@ -332,9 +334,10 @@ def load_partition(part_config, part_id, load_feats=True, use_graphbolt=False): "part-{}".format(part_id) in part_metadata ), "part-{} does not exist".format(part_id) part_files = part_metadata["part-{}".format(part_id)] - part_graph_field = "part_graph" if use_graphbolt: part_graph_field = "part_graph_graphbolt" + else: + part_graph_field = "part_graph" assert ( part_graph_field in part_files ), f"the partition does not contain graph structure: {part_graph_field}" @@ -461,7 +464,7 @@ def load_partition_feats( return node_feats, edge_feats -def load_partition_book(part_config, part_id): +def load_partition_book(part_config, part_id, part_metadata=None): """Load a graph partition book from the partition config file. Parameters @@ -470,6 +473,8 @@ def load_partition_book(part_config, part_id): The path of the partition config file. part_id : int The partition ID. + part_metadata : dict + The meta data of partition. Returns ------- @@ -482,7 +487,8 @@ def load_partition_book(part_config, part_id): dict The edge types """ - part_metadata = _load_part_config(part_config) + if part_metadata is None: + part_metadata = _load_part_config(part_config) assert "num_parts" in part_metadata, "num_parts does not exist." assert ( part_metadata["num_parts"] > part_id @@ -666,6 +672,38 @@ def _set_trainer_ids(g, sim_g, node_parts): g.edges[c_etype].data["trainer_id"] = trainer_id +def _partition_to_graphbolt( + parts, + part_i, + part_config, + part_metadata, + *, + store_eids=True, + store_inner_node=False, + store_inner_edge=False, + graph_formats=None, +): + gpb, _, ntypes, etypes = load_partition_book( + part_config=part_config, part_id=part_i, part_metadata=part_metadata + ) + graph = parts[part_i] + csc_graph = gb_convert_single_dgl_partition( + ntypes=ntypes, + etypes=etypes, + gpb=gpb, + part_meta=part_metadata, + graph=graph, + store_eids=store_eids, + store_inner_edge=store_inner_edge, + store_inner_node=store_inner_node, + graph_formats=graph_formats, + ) + rel_path_result = _save_graph_gb( + part_config=part_config, part_id=part_i, csc_graph=csc_graph + ) + part_metadata[f"part-{part_i}"]["part_graph_graphbolt"] = rel_path_result + + def _update_node_edge_map(node_map_val, edge_map_val, g, num_parts): """ If the original graph contains few nodes or edges for specific node/edge @@ -1303,6 +1341,7 @@ def get_homogeneous(g, balance_ntypes): "ntypes": ntypes, "etypes": etypes, } + part_config = os.path.join(out_path, graph_name + ".json") for part_id in range(num_parts): part = parts[part_id] @@ -1425,30 +1464,54 @@ def get_homogeneous(g, balance_ntypes): part_dir = os.path.join(out_path, "part" + str(part_id)) node_feat_file = os.path.join(part_dir, "node_feat.dgl") edge_feat_file = os.path.join(part_dir, "edge_feat.dgl") - part_graph_file = os.path.join(part_dir, "graph.dgl") - part_metadata["part-{}".format(part_id)] = { - "node_feats": os.path.relpath(node_feat_file, out_path), - "edge_feats": os.path.relpath(edge_feat_file, out_path), - "part_graph": os.path.relpath(part_graph_file, out_path), - } + os.makedirs(part_dir, mode=0o775, exist_ok=True) save_tensors(node_feat_file, node_feats) save_tensors(edge_feat_file, edge_feats) + part_metadata["part-{}".format(part_id)] = { + "node_feats": os.path.relpath(node_feat_file, out_path), + "edge_feats": os.path.relpath(edge_feat_file, out_path), + } sort_etypes = len(g.etypes) > 1 - _save_graphs( - part_graph_file, - [part], - formats=graph_formats, - sort_etypes=sort_etypes, - ) - print( - "Save partitions: {:.3f} seconds, peak memory: {:.3f} GB".format( - time.time() - start, get_peak_mem() - ) - ) + part = process_partitions(part, graph_formats, sort_etypes) + + # transmit to graphbolt and save graph + if use_graphbolt: + # save FusedCSCSamplingGraph + kwargs["graph_formats"] = graph_formats + n_jobs = kwargs.pop("n_jobs", 1) + mp_ctx = mp.get_context("spawn") + with concurrent.futures.ProcessPoolExecutor( # pylint: disable=unexpected-keyword-arg + max_workers=min(num_parts, n_jobs), + mp_context=mp_ctx, + ) as executor: + for part_id in range(num_parts): + executor.submit( + _partition_to_graphbolt( + part_i=part_id, + part_config=part_config, + part_metadata=part_metadata, + parts=parts, + **kwargs, + ) + ) + part_metadata["node_map_dtype"] = "int64" + part_metadata["edge_map_dtype"] = "int64" + else: + for part_id, part in parts.items(): + part_dir = os.path.join(out_path, "part" + str(part_id)) + part_graph_file = os.path.join(part_dir, "graph.dgl") + part_metadata["part-{}".format(part_id)][ + "part_graph" + ] = os.path.relpath(part_graph_file, out_path) + # save DGLGraph + _save_dgl_graphs( + part_graph_file, + [part], + formats=graph_formats, + ) - part_config = os.path.join(out_path, graph_name + ".json") _dump_part_config(part_config, part_metadata) num_cuts = sim_g.num_edges() - tot_num_inner_edges @@ -1460,12 +1523,11 @@ def get_homogeneous(g, balance_ntypes): ) ) - if use_graphbolt: - kwargs["graph_formats"] = graph_formats - dgl_partition_to_graphbolt( - part_config, - **kwargs, + print( + "Save partitions: {:.3f} seconds, peak memory: {:.3f} GB".format( + time.time() - start, get_peak_mem() ) + ) if return_mapping: return orig_nids, orig_eids @@ -1513,20 +1575,142 @@ def init_type_per_edge(graph, gpb): return etype_ids -def gb_convert_single_dgl_partition( - part_id, - graph_formats, - part_config, - store_eids, +def _load_part(part_config, part_id, parts=None): + """load parts from variable or dist.""" + if parts is None: + graph, _, _, _, _, _, _ = load_partition( + part_config, part_id, load_feats=False + ) + else: + graph = parts[part_id] + return graph + + +def _save_graph_gb(part_config, part_id, csc_graph): + csc_graph_save_dir = os.path.join( + os.path.dirname(part_config), + f"part{part_id}", + ) + csc_graph_path = os.path.join( + csc_graph_save_dir, "fused_csc_sampling_graph.pt" + ) + torch.save(csc_graph, csc_graph_path) + + return os.path.relpath(csc_graph_path, os.path.dirname(part_config)) + + +def cast_various_to_minimum_dtype_gb( + graph, + part_meta, + num_parts, + indptr, + indices, + type_per_edge, + etypes, + ntypes, + node_attributes, + edge_attributes, +): + """Cast various data to minimum dtype.""" + # Cast 1: indptr. + indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr) + # Cast 2: indices. + indices = _cast_to_minimum_dtype(graph.num_nodes(), indices) + # Cast 3: type_per_edge. + type_per_edge = _cast_to_minimum_dtype( + len(etypes), type_per_edge, field=ETYPE + ) + # Cast 4: node/edge_attributes. + predicates = { + NID: part_meta["num_nodes"], + "part_id": num_parts, + NTYPE: len(ntypes), + EID: part_meta["num_edges"], + ETYPE: len(etypes), + DGL2GB_EID: part_meta["num_edges"], + GB_DST_ID: part_meta["num_nodes"], + } + for attributes in [node_attributes, edge_attributes]: + for key in attributes: + if key not in predicates: + continue + attributes[key] = _cast_to_minimum_dtype( + predicates[key], attributes[key], field=key + ) + return indptr, indices, type_per_edge + + +def _create_attributes_gb( + graph, + gpb, + edge_ids, + is_homo, store_inner_node, store_inner_edge, + store_eids, + debug_mode, +): + # Save node attributes. Detailed attributes are shown below. + # DGL_GB\Attributes dgl.NID("_ID") dgl.NTYPE("_TYPE") "inner_node" "part_id" + # DGL_Homograph ✅ 🚫 ✅ ✅ + # GB_Homograph ✅ 🚫 optional 🚫 + # DGL_Heterograph ✅ ✅ ✅ ✅ + # GB_Heterograph ✅ 🚫 optional 🚫 + required_node_attrs = [NID] + if store_inner_node: + required_node_attrs.append("inner_node") + if debug_mode: + required_node_attrs = list(graph.ndata.keys()) + node_attributes = {attr: graph.ndata[attr] for attr in required_node_attrs} + + # Save edge attributes. Detailed attributes are shown below. + # DGL_GB\Attributes dgl.EID("_ID") dgl.ETYPE("_TYPE") "inner_edge" + # DGL_Homograph ✅ 🚫 ✅ + # GB_Homograph optional 🚫 optional + # DGL_Heterograph ✅ ✅ ✅ + # GB_Heterograph optional ✅ optional + type_per_edge = None + if not is_homo: + type_per_edge = init_type_per_edge(graph, gpb)[edge_ids] + type_per_edge = type_per_edge.to(RESERVED_FIELD_DTYPE[ETYPE]) + required_edge_attrs = [] + if store_eids: + required_edge_attrs.append(EID) + if store_inner_edge: + required_edge_attrs.append("inner_edge") + if debug_mode: + required_edge_attrs = list(graph.edata.keys()) + edge_attributes = { + attr: graph.edata[attr][edge_ids] for attr in required_edge_attrs + } + return node_attributes, edge_attributes, type_per_edge + + +def gb_convert_single_dgl_partition( + ntypes, + etypes, + gpb, + part_meta, + graph, + graph_formats=None, + store_eids=False, + store_inner_node=False, + store_inner_edge=False, ): """Converts a single DGL partition to GraphBolt. Parameters ---------- - part_id : int - The numerical ID of the partition to convert. + node types : dict + The node types + edge types : dict + The edge types + gpb : GraphPartitionBook + The global partition information. + part_meta : dict + Contain the meta data of the partition. + graph : DGLGraph + The graph to be converted to graphbolt graph. graph_formats : str or list[str], optional Save partitions in specified formats. It could be any combination of `coo`, `csc`. As `csc` format is mandatory for `FusedCSCSamplingGraph`, @@ -1547,14 +1731,8 @@ def gb_convert_single_dgl_partition( "Running in debug mode which means all attributes of DGL partitions" " will be saved to the new format." ) - - part_meta = _load_part_config(part_config) num_parts = part_meta["num_parts"] - graph, _, _, gpb, _, _, _ = load_partition( - part_config, part_id, load_feats=False - ) - _, _, ntypes, etypes = load_partition_book(part_config, part_id) is_homo = is_homogeneous(ntypes, etypes) node_type_to_id = ( None if is_homo else {ntype: ntid for ntid, ntype in enumerate(ntypes)} @@ -1569,39 +1747,16 @@ def gb_convert_single_dgl_partition( # Obtain CSC indtpr and indices. indptr, indices, edge_ids = graph.adj_tensors("csc") - # Save node attributes. Detailed attributes are shown below. - # DGL_GB\Attributes dgl.NID("_ID") dgl.NTYPE("_TYPE") "inner_node" "part_id" - # DGL_Homograph ✅ 🚫 ✅ ✅ - # GB_Homograph ✅ 🚫 optional 🚫 - # DGL_Heterograph ✅ ✅ ✅ ✅ - # GB_Heterograph ✅ 🚫 optional 🚫 - required_node_attrs = [NID] - if store_inner_node: - required_node_attrs.append("inner_node") - if debug_mode: - required_node_attrs = list(graph.ndata.keys()) - node_attributes = {attr: graph.ndata[attr] for attr in required_node_attrs} - - # Save edge attributes. Detailed attributes are shown below. - # DGL_GB\Attributes dgl.EID("_ID") dgl.ETYPE("_TYPE") "inner_edge" - # DGL_Homograph ✅ 🚫 ✅ - # GB_Homograph optional 🚫 optional - # DGL_Heterograph ✅ ✅ ✅ - # GB_Heterograph optional ✅ optional - type_per_edge = None - if not is_homo: - type_per_edge = init_type_per_edge(graph, gpb)[edge_ids] - type_per_edge = type_per_edge.to(RESERVED_FIELD_DTYPE[ETYPE]) - required_edge_attrs = [] - if store_eids: - required_edge_attrs.append(EID) - if store_inner_edge: - required_edge_attrs.append("inner_edge") - if debug_mode: - required_edge_attrs = list(graph.edata.keys()) - edge_attributes = { - attr: graph.edata[attr][edge_ids] for attr in required_edge_attrs - } + node_attributes, edge_attributes, type_per_edge = _create_attributes_gb( + graph, + gpb, + edge_ids, + is_homo, + store_inner_node, + store_inner_edge, + store_eids, + debug_mode, + ) # When converting DGLGraph to FusedCSCSamplingGraph, edge IDs are # re-ordered(actually FusedCSCSamplingGraph does not have edge IDs # in nature). So we need to save such re-order info for any @@ -1623,32 +1778,18 @@ def gb_convert_single_dgl_partition( indptr, dtype=indices.dtype ) - # Cast various data to minimum dtype. - # Cast 1: indptr. - indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr) - # Cast 2: indices. - indices = _cast_to_minimum_dtype(graph.num_nodes(), indices) - # Cast 3: type_per_edge. - type_per_edge = _cast_to_minimum_dtype( - len(etypes), type_per_edge, field=ETYPE + indptr, indices, type_per_edge = cast_various_to_minimum_dtype_gb( + graph, + part_meta, + num_parts, + indptr, + indices, + type_per_edge, + etypes, + ntypes, + node_attributes, + edge_attributes, ) - # Cast 4: node/edge_attributes. - predicates = { - NID: part_meta["num_nodes"], - "part_id": num_parts, - NTYPE: len(ntypes), - EID: part_meta["num_edges"], - ETYPE: len(etypes), - DGL2GB_EID: part_meta["num_edges"], - GB_DST_ID: part_meta["num_nodes"], - } - for attributes in [node_attributes, edge_attributes]: - for key in attributes: - if key not in predicates: - continue - attributes[key] = _cast_to_minimum_dtype( - predicates[key], attributes[key], field=key - ) csc_graph = gb.fused_csc_sampling_graph( indptr, @@ -1660,17 +1801,128 @@ def gb_convert_single_dgl_partition( node_type_to_id=node_type_to_id, edge_type_to_id=edge_type_to_id, ) - orig_graph_path = os.path.join( - os.path.dirname(part_config), - part_meta[f"part-{part_id}"]["part_graph"], + return csc_graph + + +def _convert_partition_to_graphbolt( + part_config, + part_id, + graph_formats=None, + store_eids=False, + store_inner_node=False, + store_inner_edge=False, +): + """ + The pipeline converting signle partition to graphbolt. + + Parameters + ---------- + part_config : str + The path of the partition config file. + part_id : int + The partition ID. + graph_formats : str or list[str], optional + Save partitions in specified formats. It could be any combination of + `coo`, `csc`. As `csc` format is mandatory for `FusedCSCSamplingGraph`, + it is not necessary to specify this argument. It's mainly for + specifying `coo` format to save edge ID mapping and destination node + IDs. If not specified, whether to save `coo` format is determined by + the availability of the format in DGL partitions. Default: None. + store_eids : bool, optional + Whether to store edge IDs in the new graph. Default: True. + store_inner_node : bool, optional + Whether to store inner node mask in the new graph. Default: False. + store_inner_edge : bool, optional + Whether to store inner edge mask in the new graph. Default: False. + + Returns + ------- + str + The path csc_graph to save. + """ + gpb, _, ntypes, etypes = load_partition_book( + part_config=part_config, part_id=part_id ) - csc_graph_path = os.path.join( - os.path.dirname(orig_graph_path), "fused_csc_sampling_graph.pt" + part = _load_part(part_config, part_id) + part_meta = copy.deepcopy(_load_part_config(part_config)) + csc_graph = gb_convert_single_dgl_partition( + graph=part, + ntypes=ntypes, + etypes=etypes, + gpb=gpb, + part_meta=part_meta, + graph_formats=graph_formats, + store_eids=store_eids, + store_inner_node=store_inner_node, + store_inner_edge=store_inner_edge, ) - torch.save(csc_graph, csc_graph_path) + rel_path = _save_graph_gb(part_config, part_id, csc_graph) + return rel_path - return os.path.relpath(csc_graph_path, os.path.dirname(part_config)) - # Update graph path. + +def _convert_partition_to_graphbolt_wrapper( + graph_formats, + part_config, + store_eids, + store_inner_node, + store_inner_edge, + n_jobs, + num_parts, +): + # [Rui] DGL partitions are always saved as homogeneous graphs even though + # the original graph is heterogeneous. But heterogeneous information like + # node/edge types are saved as node/edge data alongside with partitions. + # What needs more attention is that due to the existence of HALO nodes in + # each partition, the local node IDs are not sorted according to the node + # types. So we fail to assign ``node_type_offset`` as required by GraphBolt. + # But this is not a problem since such information is not used in sampling. + # We can simply pass None to it. + + # Iterate over partitions. + convert_with_format = partial( + _convert_partition_to_graphbolt, + part_config=part_config, + graph_formats=graph_formats, + store_eids=store_eids, + store_inner_node=store_inner_node, + store_inner_edge=store_inner_edge, + ) + # Need to create entirely new interpreters, because we call C++ downstream + # See https://docs.python.org/3.12/library/multiprocessing.html#contexts-and-start-methods + # and https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil + rel_path_results = [] + if n_jobs > 1 and num_parts > 1: + mp_ctx = mp.get_context("spawn") + with concurrent.futures.ProcessPoolExecutor( # pylint: disable=unexpected-keyword-arg + max_workers=min(num_parts, n_jobs), + mp_context=mp_ctx, + ) as executor: + for part_id in range(num_parts): + rel_path_results.append( + executor.submit( + convert_with_format, part_id=part_id + ).result() + ) + + else: + # If running single-threaded, avoid spawning new interpreter, which is slow + for part_id in range(num_parts): + rel_path = convert_with_format(part_id=part_id) + rel_path_results.append(rel_path) + part_meta = _load_part_config(part_config) + for part_id in range(num_parts): + # Update graph path. + part_meta[f"part-{part_id}"]["part_graph_graphbolt"] = rel_path_results[ + part_id + ] + + # Save dtype info into partition config. + # [TODO][Rui] Always use int64_t for node/edge IDs in GraphBolt. See more + # details in #7175. + part_meta["node_map_dtype"] = "int64" + part_meta["edge_map_dtype"] = "int64" + + return part_meta def dgl_partition_to_graphbolt( @@ -1719,59 +1971,14 @@ def dgl_partition_to_graphbolt( " will be saved to the new format." ) part_meta = _load_part_config(part_config) - new_part_meta = copy.deepcopy(part_meta) num_parts = part_meta["num_parts"] - - # [Rui] DGL partitions are always saved as homogeneous graphs even though - # the original graph is heterogeneous. But heterogeneous information like - # node/edge types are saved as node/edge data alongside with partitions. - # What needs more attention is that due to the existence of HALO nodes in - # each partition, the local node IDs are not sorted according to the node - # types. So we fail to assign ``node_type_offset`` as required by GraphBolt. - # But this is not a problem since such information is not used in sampling. - # We can simply pass None to it. - - # Iterate over partitions. - convert_with_format = partial( - gb_convert_single_dgl_partition, + part_meta = _convert_partition_to_graphbolt_wrapper( graph_formats=graph_formats, part_config=part_config, store_eids=store_eids, store_inner_node=store_inner_node, store_inner_edge=store_inner_edge, + n_jobs=n_jobs, + num_parts=num_parts, ) - # Need to create entirely new interpreters, because we call C++ downstream - # See https://docs.python.org/3.12/library/multiprocessing.html#contexts-and-start-methods - # and https://pybind11.readthedocs.io/en/stable/advanced/misc.html#global-interpreter-lock-gil - rel_path_results = [] - if n_jobs > 1 and num_parts > 1: - mp_ctx = mp.get_context("spawn") - with concurrent.futures.ProcessPoolExecutor( # pylint: disable=unexpected-keyword-arg - max_workers=min(num_parts, n_jobs), - mp_context=mp_ctx, - ) as executor: - futures = [] - for part_id in range(num_parts): - futures.append(executor.submit(convert_with_format, part_id)) - - for part_id in range(num_parts): - rel_path_results.append(futures[part_id].result()) - else: - # If running single-threaded, avoid spawning new interpreter, which is slow - for part_id in range(num_parts): - rel_path_results.append(convert_with_format(part_id)) - - for part_id in range(num_parts): - # Update graph path. - new_part_meta[f"part-{part_id}"][ - "part_graph_graphbolt" - ] = rel_path_results[part_id] - - # Save dtype info into partition config. - # [TODO][Rui] Always use int64_t for node/edge IDs in GraphBolt. See more - # details in #7175. - new_part_meta["node_map_dtype"] = "int64" - new_part_meta["edge_map_dtype"] = "int64" - - _dump_part_config(part_config, new_part_meta) - print(f"Converted partitions to GraphBolt format into {part_config}") + _dump_part_config(part_config, part_meta)