How-to: Manage Time-Dependent Multilayer Networks in Apache Hadoop

Using an appropriate network representation and the right tool set are the key factors in successfully merging structured and time-series data for analysis.

In Part 1 of this series, you took your first steps for using Apache Giraph, the highly scalable graph-processing system, alongside Apache Hadoop. In this installment, you’ll explore a general use case for analyzing time-dependent, Big Data graphs using data from multiple sources. You’ll learn how to generate random large graphs and small-world networks using Giraph – as well as play with several parameters to probe the limits of your cluster.

As you’ll see, using an appropriate network representation, and the right tools for the job (such as Gephi for visualization, a Hadoop-Gephi connector, and CDH), can make this process much easier. (Note: Although neither Giraph nor Gephi are currently supported/shipping inside CDH, you may find this exercise interesting for learning, testing, and development purposes.)

The Use Case

Since Hadoop is a relatively recent innovation, attracting more and more people’s attention and even a lot of money, I was interested in the question: What can Wikipedia tell us about the emerging Hadoop market?

Surprisingly, no single Wikipedia page on this topic exists. A quick search for the terms “Hadoop” and “market”, however, shows 49 results. (That is, 49 Wikipedia pages contain both words.) I did not consider non-English pages, which introduced a strong and not quantified bias. Nobody is equipped to analyze all topic pages in every language, so I had to define a purely data-driven approach.

The solution is simple: Wikipedia pages, like other webpages, can easily be ranked with the classic PageRank algorithm, which is implemented in Giraph. But to do that, we would need the full page graph for every page in Wikipedia – and that is definitely not an option. This raises the question: What is a large graph? Or rather, how large are “large graphs”?

Large Graphs and Efficient Representation

To quantify the size of a graph which is represented in a format, suitable for Apache Giraph, one either specifies the required amount of memory that is needed to store the graph in RAM or disk (which is interesting for long-term storage), or counts the number of nodes and edges. Either approach will reveal something about possible limitations, whether on one machine or on a cluster, because in all cases resources are limited. Let’s look at two examples:

  • Wikipedia has about 30,000,000 pages (as of January 2014), and if the average number of links per page is assumed to be around eight, which was (according to S. N. Sorogovstev, p.8) the estimated number of hyperlinks per page in the WWW in 1999, we would need around 230MB to store each link as a byte in the adjacency matrix. To track the dynamics of such a network over five years based on daily snapshots of the network links, one would need around 407GB, but that’s without content – no words, images, edit history, or Java objects. Instead, we could calculate a correlation matrix — which contains the correlation link strength for all node pairs — from multiple node properties, but that would require around 410TB per snapshot (and 145PB for a full year of them!).
  • The social graph used by Facebook has about 1 billion input vectors with 100 features. Facebook claims, among other things, that its “performance and scalability on a 1-trillion edge social graph is two orders of magnitude beyond that scale of other public benchmarks.”

These massive numbers are cause for concern with respect to representation — especially in Hadoop, where data is distributed across many nodes. Whether the graph is stored as a file in Apache Hive, or even in Apache HBase tables, this consideration is critical because not all file formats can be split.

Usually a graph is presumed to be a matrix. If a link has more than one property or the nodes have more than one interaction mode (or multiple connectivity), tensor representation is necessary. Sometimes the nodes are of different types; such networks are called n-partite networks, and if multiple link types between the nodes are possible, multiplex networks – in which each individual link type defines a single network layer. Another option is a “hyper-graph” containing hyper-edges, which are characterized by the three or more nodes they tie together.

Other appropriate representations — like adjacency lists, or node and edge lists – are available. Such graph representations differ from the fundamental matrix representation. They often have a more efficient memory footprint; compressed link lists, for example, do not contain any values for non-existent links. In the case of a matrix, even a link strength of zero would require a stored value. Node identifiers can efficiently be encoded, for example with Huffman coding. Instead of the complete, sometimes quite long, node name like a URI, which is often used in semantic graphs only a number is used during computation, which also lowers memory requirements. In other cases, like in a tabular representation of a vertex cut, which is used in GraphX or Apache Spark, the data model contains additional information about the neighborhood of a node.

This brings us back to our example. We want to analyze how information about the Hadoop market is covered or represented in Wikipedia. Instead of working with the full page graph, I downloaded the neighborhood graphs, shown in Figure 1, wherein a) all languages are presented in an individual color, and b) highlights the local neighborhood. This means that all pages in the same language, like the initial page in English, are shown in group LN (green), while all remaining pages in all other languages are shown in the global neighborhood GN (blue).

Figure 1. Preparation of interconnected local networks for multilingual Wikipedia analysis.

The dark-colored nodes have a special meaning: The green one is our initial page, called central node CN. This node has so-called inter-wiki links to nodes in other languages, which describe the same semantic concept. All dark nodes form the core and all nodes in light colors are the “hull” of the local network.

l personally like the comparison with the model of an atom. Just as multiple atoms form molecules, several such local networks form a neighborhood network.

Such an intermediate result is visualized with Gephi in Figure 2. The neighborhood network for “Hadoop market” consists of manually selected pages (a set of 26 central nodes) and also includes their local and global neighborhoods, which means all direct linked pages in all Wikipedia languages are available.

Figure 2. The local neighborhood network for Wikipedia pages, which represents the Hadoop market, shows three major cluster: the green nodes on the top are all pages about open-source software projects. Companies in the Hadoop market are shown in the middle, and a cluster that contains more fundamental topics, like Java programming, is shown at the bottom.

This approach has two major advantages:

  • We can reduce the amount of data we have to handle without losing the embedding of the pages, and
  • We can easily separate and compare sub-data sets by language for language-dependent analysis.

But we have to be careful: a PageRank computation on this graph would give a different result compared to the result obtained from a full Wikipedia link graph. Comparing the global (full page graph) and local ranking (combined neighborhood graph) would tell us how the extraction of the local neighborhood network influences the results. This process is similar to an external disturbance of the system — one can expect that the absolute values of each node PageRank will differ in both networks, but the ranked list of PageRanks should not differ much (otherwise, the results may not be meaningful).

Now we have an approach that allows efficient extraction of sub-data sets. (Network nodes like Wikipedia pages have many different facets.) We can measure the amount of information on each page (text volume or number of words or sentences), the number of images, or links to other pages. Even a more dynamic view is possible; we just have to create correlation networks from user activity data, like access time series or edit history. All this data should be part of the data set, even if it is not used every time in every computation.

Visualization of Time-Dependent Multilayer Graphs

Before we get into more Giraph-related coding details, we need to add some tools to our graph-analysis workbench for quickly analyzing intermediate results and handling multilayer graphs. (Fortunately, such multilayer networks can be stored in Hive tables using partitions for each layer or for each temporal snapshot.)

We need multiple tables or multiple queries to extract edge and node data separately from one table. One sub-data set contains all the node properties of a neighborhood network. The links are also organized in multiple sub-data sets, one per link type. We can also store all links in just one table, but in this case each link type needs a classifier for filtering the list. (In the case of time-dependent networks, I recommend partitioned tables – this makes loading data into Gephi much faster.)

Figure 3. Gephi visualization of a correlation network.

Both features — semantic graph metadata management and “one-click” import from Hadoop — are implemented in the Gephi-Hadoop-Connector (see Figure 4). The Hive Metastore already stores some details about the data in our tables, like the table name, column names and types, the SerDe with its relevant parameters, and partitions. But in many research projects, one has to track yet more data that describes the meaning of a given table or even a column. Either for selection of filter criteria or for consistency checks, such information is highly relevant.

Figure 4. Metadata management for multiple time-dependent network layers is done via the Etosha plugin, which allows a comprehensive description of each single layer.

Here, we will store all facts about a data set as a page in a Semantic Media Wiki (SMW). This page allows human interaction, collaboration, and even machine access in parallel. Semantic annotations tell the Gephi-Hadoop-Connector all details about the multilayer network, such as which layer belongs to what network and which Hive or Impala query is required to load this data set from Hadoop. This information is retrieved via the query API of the SMW. (A future version of the Etosha Semantic Metastore, or ESM, will work with any type of triple store, which allows a more generic approach based on SPARQL queries.)

Our data-driven market study starts with an extraction of sub-data sets, which are defined by the local neighborhood graphs for a set of manually selected central nodes. We extract the access-rate and edit-event time series for those pages and calculate some characteristic values, like the local relevance index, based on access-rate data. This is shown in Figure 5, and the global relevance index for each page is shown in Figure 6. (Some more details about the analysis procedure are shown in this conference poster, presented during the Spring 2014 meeting of the DPG in Germany.)

Figure 5. Hadoop continuously attracts measurably more attention since 2009 until its “public” breakthrough in 2011 in English.

Figure 6. The increase of attention, people give to Hadoop related topics strongly depends on the language. In non-English Wikipedia projects, the Hadoop market is not that well recognized as in the English Wikipedia, but the increasing trend is clearly visible for Apache Solr (orange) and Hadoop (violet).

To summarize, our graph-analysis toolbox now contains:

  • Giraph 1.0
  • A CDH 4 development cluster in pseudo-distributed mode. We store data in Hive tables and the JDBC connector allows fast data access via Impala.
  • Gephi for visualization, on an external client machine. With the Gephi toolkit library, we can load and manipulate any graph file without showing a GUI. If we have to process a very large number of relatively small graphs, we can use this library within a map-only MapReduce job, but for large-scale processing, we will use Giraph.

Hands-On

Now, we can focus on practical things. First, we have to prepare the analysis workbench and then generate test data sets.

Two InputFormats for random graph generation are available in Giraph. Let’s calculate the PageRank for same random sample graphs.

Build and Deploy Giraph 1.1.0

1. Clone giraphl from Github into directory /home/cloudera/GIRAPH.

mkdir GIRAPH
cd GIRAPH
git clone <a href="https://github.com/kamir/giraphl.git" target="_blank">https://github.com/kamir/giraphl.git</a>
cd giraphl/bin
gedit bsg.sh

 

2. Check and if necessary modify settings: 

#####################################################
# Cloudera Quickstart VM C5    #    current VERSION !!!!!!
#############################
CDHV=5.0.0
USER=cloudera
HADOOP_MAPRED_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce
EXJARS=hadoop-mapreduce-examples-2.2.0-cdh5.0.0-beta-2.jar
GIRAPH_HOME=/usr/local/giraph
JTADDRESS=127.0.0.1
ZKSERVER=127.0.0.1
ZKPORT=2181

 

3. Run the “bootstrap giraph script” in ./giraphl/bin/bsg.sh:

./bsg.sh deploy

 

4. The build procedure will take some time. After about three to 10 minutes you should see the result:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache Giraph Parent .............................. SUCCESS [4.174s]
[INFO] Apache Giraph Core ................................ SUCCESS [29.513s]
[INFO] Apache Giraph Examples ............................ SUCCESS [14.484s]
[INFO] Apache Giraph Accumulo I/O ........................ SUCCESS [14.126s]
[INFO] Apache Giraph HBase I/O ........................... SUCCESS [14.439s]
[INFO] Apache Giraph HCatalog I/O ........................ SUCCESS [20.972s]
[INFO] Apache Giraph Hive I/O ............................ SUCCESS [22.264s]
[INFO] Apache Giraph Gora I/O ............................ SUCCESS [16.835s]
[INFO] Apache Giraph Rexster I/O ......................... SUCCESS [0.089s]
[INFO] Apache Giraph Rexster Kibble ...................... SUCCESS [3.227s]
[INFO] Apache Giraph Rexster I/O Formats ................. SUCCESS [13.861s]
[INFO] Apache Giraph Distribution ........................ SUCCESS [31.572s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3:06.224s
[INFO] Finished at: Mon Apr 07 20:43:43 PDT 2014
[INFO] Final Memory: 118M/848M
[INFO] ------------------------------------------------------------------------

 

5. Start a Giraph benchmark with the following command:

hadoop jar giraph-ex.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.zkList=<a href="http://127.0.0.1:2181" target="_blank">127.0.0.1:2181</a> -Dmapreduce.jobtracker.address=127.0.0.1 -libjars giraph-core.jar -e 1 -s 3 -v -V 50 -w 1

 

and it should give a comparable output:

14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:java.compiler=
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:os.version=2.6.32-220.el6.x86_64
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:user.name=cloudera
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:user.home=/home/cloudera
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Client environment:user.dir=/usr/local/giraph
14/04/07 23:39:32 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=60000 watcher=org.apache.giraph.job.JobProgressTracker@3a1d1cf7
14/04/07 23:39:33 INFO mapreduce.Job: Running job: job_1396880118210_0033
14/04/07 23:39:33 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost.localdomain/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
14/04/07 23:39:33 INFO zookeeper.ClientCnxn: Socket connection established to localhost.localdomain/127.0.0.1:2181, initiating session
14/04/07 23:39:33 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost.localdomain/127.0.0.1:2181, sessionid = 0x1453c8ace24080c, negotiated timeout = 60000
14/04/07 23:39:33 INFO job.JobProgressTracker: Data from 1 workers - Compute superstep 3: 50 out of 50 vertices computed; 1 out of 1 partitions computed; min free memory on worker 1 - 91.06MB, average 91.06MB
14/04/07 23:39:33 INFO zookeeper.ClientCnxn: EventThread shut down
14/04/07 23:39:33 INFO zookeeper.ZooKeeper: Session: 0x1453c8ace24080c closed
14/04/07 23:39:34 INFO mapreduce.Job: Job job_1396880118210_0033 running in uber mode : false
14/04/07 23:39:34 INFO mapreduce.Job:  map 50% reduce 0%
14/04/07 23:39:36 INFO mapreduce.Job:  map 100% reduce 0%
14/04/07 23:39:42 INFO mapreduce.Job: Job job_1396880118210_0033 completed successfully
14/04/07 23:39:42 INFO mapreduce.Job: Counters: 50
    File System Counters
      FILE: Number of bytes read=0
      FILE: Number of bytes written=182040
      FILE: Number of read operations=0
      FILE: Number of large read operations=0
      FILE: Number of write operations=0
      HDFS: Number of bytes read=88
      HDFS: Number of bytes written=0
      HDFS: Number of read operations=2
      HDFS: Number of large read operations=0
      HDFS: Number of write operations=1
    Job Counters
      Launched map tasks=2
      Other local map tasks=2
      Total time spent by all maps in occupied slots (ms)=7168000
      Total time spent by all reduces in occupied slots (ms)=0
    Map-Reduce Framework
      Map input records=2
      Map output records=0
      Input split bytes=88
      Spilled Records=0
      Failed Shuffles=0
      Merged Map outputs=0
      GC time elapsed (ms)=110
      CPU time spent (ms)=1130
      Physical memory (bytes) snapshot=370827264
      Virtual memory (bytes) snapshot=1842810880
      Total committed heap usage (bytes)=300941312
    Giraph Stats
      Aggregate edges=50
      Aggregate finished vertices=50
      Aggregate sent message message bytes=1275
      Aggregate sent messages=150
      Aggregate vertices=50
      Current master task partition=0
      Current workers=1
      Last checkpointed superstep=0
      Sent message bytes=0
      Sent messages=0
      Superstep=4
    Giraph Timers
      Initialize (ms)=1867
      Input superstep (ms)=192
      Setup (ms)=25
      Shutdown (ms)=9942
      Superstep 0 PageRankComputation (ms)=56
      Superstep 1 PageRankComputation (ms)=87
      Superstep 2 PageRankComputation (ms)=38
      Superstep 3 PageRankComputation (ms)=36
      Total (ms)=10377
    Zookeeper base path
      /_hadoopBsp/job_1396880118210_0033=0
    Zookeeper halt node
      /_hadoopBsp/job_1396880118210_0033/_haltComputation=0
    Zookeeper server:port
      127.0.0.1:2181=0
    File Input Format Counters
      Bytes Read=0
    File Output Format Counters
      Bytes Written=0

 

Congratulations, you have successfully built Giraph (branch: trunk) and tested it for CDH!

Generating Random Graphs with Giraph

Graph classification can be based on measurable properties. “How ordered or how random is the graph structure?” are important questions in graph analysis, and many researchers like to investigate the degree-distribution of any given graphs. Two networks with very different properties are shown in Figure 8 together with the corresponding degree distribution.

Giraph calculates several properties of a given graph, which is loaded before the computation starts, but the graph can be generated on the fly as well. Table 1 shows two VertexInputFormats that are used for random graph generation.

Table 1. VertexInputFormats are used as “on the fly” graph-generators.

Figure 8. Small-world network generated with WattsStrogatzVertexInputFormat

Figure 9. Random graph generated with PseudoRandomVertexInputFormat

Analysis of such a generated graph is done with one of many algorithms, and if the graph should only be stored, use the IdentityComputation with an appropriate EdgeOutputFormat (such as the GraphvizOutputFormat).

Because of a limitation in Gephi, it is necessary to install graphviz to plot graphs defined in DOT format. (OmniGraffle is another option that allows import of a subset of DOT data.) In order to use the generated output in graphviz, the graphivz tools have to be installed. In CentOS this is done via:

sudo yum install graphviz

 

The following two examples illustrate how the graphs in Figures 8 and 9 can be created with Giraph. Both commands are available as functions in the giraphl bsg.sh script. For Giraph library paths, the variables GEX and GEC are defined as follows:

GEX=/usr/local/giraph/giraph-examples/target/giraph-examples-1.1.0-SNAPSHOT-for-hadoop-2.2.0-jar-with-dependencies.jar
GEC=/usr/local/giraph/giraph-core/target/giraph-1.1.0-SNAPSHOT-for-hadoop-2.2.0-jar-with-dependencies.jar

 

Generate and calculate PageRank for a random network, store it in DOT format:

########################################################################
# RUN PAGERANK on generated graph  (PseudoRandom generator InputFormat)
# and store for visualization in Graphviz
########################################################################
hadoop jar $GEX org.apache.giraph.GiraphRunner
-Dgiraph.zkList=$ZKSERVER:$ZKPORT -Dmapreduce.jobtracker.address=$JTADDRESS
-libjars giraph-core.jar org.apache.giraph.examples.SimplePageRankComputation2
-mc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankMasterCompute2
-wc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankWorkerContext2
-vif org.apache.giraph.io.formats.PseudoRandomVertexInputFormat
-ca giraph.pseudoRandomInputFormat.aggregateVertices=60
-ca giraph.pseudoRandomInputFormat.edgesPerVertex=3
-ca giraph.pseudoRandomInputFormat.localEdgesMinRatio=0.2
-vof org.apache.giraph.io.formats.GraphvizOutputFormat
-op /user/$USER/goutput/graphviz_pseudo_$NOW
-w 4

hadoop fs -getmerge /user/$USER/goutput/graphviz_pseudo_$NOW graphviz_pseudo_$NOW.dot
dot -Tps graphviz_pseudo_$NOW.dot -o graph_pseudo_$NOW.ps
#######################################################

 

Generate and calculate PageRank for a small-world network, store it in DOT format:

#######################################################
# RUN PAGERANK on generated WattsStrogatzGraph => needs modified sample code
# and store for visualization in Graphviz
#######################################################
hadoop jar $GEX org.apache.giraph.GiraphRunner -Dgiraph.zkList=$ZKSERVER:$ZKPORT
-Dmapreduce.jobtracker.address=$JTADDRESS
-libjars $GEC org.apache.giraph.examples.SimplePageRankComputation2
-mc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankMasterCompute2
-wc org.apache.giraph.examples.SimplePageRankComputation2$SimplePageRankWorkerContext2
-vif org.apache.giraph.io.formats.WattsStrogatzVertexInputFormat
-vof org.apache.giraph.io.formats.GraphvizOutputFormat
-ca wattsStrogatz.aggregateVertices=160
-ca wattsStrogatz.edgesPerVertex=4
-ca wattsStrogatz.beta=0.2
-ca wattsStrogatz.seed=1
-op /user/$USER/goutput/graphviz_watts_$NOW
-w 4 

hadoop fs -getmerge /user/$USER/goutput/graphviz_watts_$NOW graphviz_watts_$NOW.dot
dot -Tps graphviz_watts_$NOW.dot -o graph_ws_$NOW.ps
#######################################################

 

Conclusion

You have now learned how to manage time-dependent multilayer networks in Hadoop using Giraph and other tools. To conclude this series, we will compute the correlation matrix from access-rate time series using Apache Crunch and then we calculate the PageRank with Giraph and GraphLab to compare results for static networks, correlation networks, and dependency networks created by two concurring open source graph processing tools, which integrate well into the Hadoop ecosystem.

See you soon, and have fun with Giraph!

Mirko Kämpf is the lead instructor for the Cloudera Administrator Training for Apache Hadoop for Cloudera University.

4 Responses
  • Danny / May 27, 2014 / 8:04 AM

    Thanks for the great article. I have a question: Do you have an working example of the Giraph/Hive Connector fpr Giraph 1.0.0? I’d like to read my Edge graph directly from Hive, and store the result in Hive without needto generate an intermediate CSV?

  • Mirko / May 27, 2014 / 9:38 AM

    Hi Danny,
    the project page is on Github (http://kamir.github.io/ghc/) and here is the source code (also on Github: https://github.com/kamir/ghc).
    The tools works with Impala and Hive.

    Giraph comes with a Hive-based InputFormat, so you do not need the connector. The connector is used for integration with Gephi for graph visualization.

  • Naveen / May 27, 2014 / 10:20 AM

    Can you please provide any info on the Etosha metastore? Thnx

  • Mirko / May 28, 2014 / 3:49 AM

    Naveen, the “Etosha metastore” is a partial result of an ongoing research projekt. We are close to a public presentation and the first release will be ready in August. Finally an application as an Apache Incubator project is on the agenda.

    The core idea is the following: the meta information about datasets, algorithms (even about implementations which might depend on some cluster specifics), references to research papers etc. are managed in a traditional knowledge management system. “Etosha tools” will connect this knowledge management tools, e.g. Google Docs, JIRA & Confluence or Mediawikis with cluster components. This means documentation of the data analysts can be done implicitly and data set discovery will be possible, across cluster boundaries.

    Our first use-case shows the multi-layer-network management procedure for Gephi.

Leave a comment


7 + = sixteen