Sorry, you need to enable JavaScript to visit this website.

The exponential growth in the study of graph-based data dependencies is fueling the need for large scale machine learning (ML) frameworks that can analyze these relationships. Recently, frameworks, such as Google’s Pregel, Apache’s Hama, and CMU’s GraphLab, have emerged to perform these often iterative and data-dependent computations in a distributed manner at commercial scale [1], [2], [3].  But in order for data scientists to use these frameworks, they must have the tools to construct very large graphs with arbitrary edge and vertex relationships and data structures.

Unfortunately, tools do not exist today to efficiently and easily construct graphs with billions or trillions of elements from unstructured or semi-structured data sources.  While one may program MapReduce frameworks, like Apache Hadoop, to do this, the programmer must possess a deep understanding of not only their application and the relevant ML algorithms but also how to parallelize graph construction while conserving system resources. And, they must then partition the graph effectively for distributed computation and mining.  As a result of this burden, many data scientists spend most of their time preparing data using scripts or application-specific MapReduce programs, with little time left over for analysis [4].

To address the problem, we developed GraphBuilder, a scalable graph construction library for Hadoop MapReduce, which provides a simple Java library with algorithms for parallel graph construction, transformation, and verification that are useful for graph mining.  GraphBuilder may also partition and serialize large-scale graphs for ingest by GraphLab and other distributed graph analytics frameworks. The GraphBuilder library implements Extract, Transform, and Load (ETL) functions for graph-analytics computing systems. The library is responsible for getting the data in a correct graph format for graph-based computation and ensuring balanced resource utilization in a distributed environment. The graph construction data flow is described in the figure below.

Extract - Graph formation and tabulation of network information

The extraction of information relevant to graph formation from large datasets is a very application-specific task, requiring support for a wide variety of data sources and types.  The library exposes an interface for custom feature extractors (i.e., GraphTokenizer) that may parse and extract the important features for subsequent analysis.  The library's interfaces don't require the user to understand parallel-programing paradigms, like MapReduce; basic sequential programming expertise is sufficient.  

The library generates an edge list for the graph, connecting one or more classes of features to each other.  Edges are defined using a vertex adjacency list and vertices are identified by user-specified identifiers.

The library supplies an interface to define custom tabulation functions and a set of built-in functions, such as TF (term frequency), TFIDF, WC, ADD, MUL, and DIV, that may be used to tabulate vertex values and edge weights.

Transform - Graph transformation and checking

Once the graph is constructed, library filters may be selectively applied to clean up the graph or even transform it into a different type.  Depending on the problem, the data scientist may want to remove duplicate, dangling, and/or self edges.  Library map tasks use distributed hashing algorithms to ensure that all edges appearing between the same two vertices are assigned to the same reduce task while load balancing the reducers.  This scheme permits the easy removal of duplicate edges and conversion of a directed graph into an undirected one, if desired.

Load - Graph compression and partitioning

The library offers load optimization for graphs that will be subsequently used in distributed graph-analytics frameworks like GraphLab. Memory and storage may be conserved by compressing the graph. The library includes the distributed graph normalization algorithm for compression that exploits data locality for high-performance compression. A MapReduce task creates a key-value dictionary, indexed by ordered integers and alphabetically, of all vertex identifiers.  The dictionary is then sharded across machines and co-located with shards of the unconverted edge list sorted alphabetically by source vertex.  The local dictionaries are then used to convert the source vertex labels to integers.  The edge list is then shuffled to alphabetically shard it by destination vertex and the integer substitution repeated.

Following compression, the graph is partitioned.  Balanced partitioning of arbitrarily connected graphs, like power-law graphs, is an NP hard problem. Furthermore, traditional graph partitioning algorithms perform poorly on power-law graphs.  We implement the balanced p-way vertex cut scheme using random and heuristic MapReduce algorithms. These algorithms partition the graph in a way that will:

  • Minimize communications by minimizing the number of machines each vertex spans
  • Load balance the work by placing roughly the same number of edges on each machine.

The partitions are serialized and stored into files in HDFS using JSON, an extensible self-describing data format that offers compression.  JSON makes it easy for computational frameworks and data mining tools to interpret the partition files.

References

[1] Malewicz, G., et al., Pregel: A system for large-scale graph processing, in ACM SIGMOD 2010, ACM: Indianapolis, Indiana.

[2] Seo, S., et al., HAMA: An efficient matrix computation with the MapReduce framework, in 2nd IEEE International Conference on Cloud Computing Technology and Science 2010: Indianapolis, Indiana USA.

[3] Gonzalez, J.E., et al., PowerGraph: Distributed Graph-Parallel Computation on Natural Graphs, in 10th USENIX Symposium on Operating Systems Design and Implementation 2012, USENIX: Hollywood, California USA.

[4] Anonymous, 2012: Quote from Data Scientist in Jeffrey Heer's Stanford (Interview) Study

Project: