Class HyperBall

java.lang.Object
it.unimi.dsi.util.HyperLogLogCounterArray
it.unimi.dsi.big.webgraph.algo.HyperBall
All Implemented Interfaces:
SafelyCloseable, Closeable, Serializable, AutoCloseable

public class HyperBall extends HyperLogLogCounterArray implements SafelyCloseable

Computes an approximation of the neighbourhood function, of the size of the reachable sets, and of (discounted) positive geometric centralities of a graph using HyperBall.

HyperBall is an algorithm computing by dynamic programming an approximation of the sizes of the balls of growing radius around the nodes of a graph. Starting from these data, it can approximate the neighbourhood function of a graph, that is, the function returning for each t the number of pairs of nodes at distance at most t, the number of nodes reachable from each node, Bavelas's closeness centrality, Lin's index, and harmonic centrality (studied by Paolo Boldi and Sebastiano Vigna in “Axioms for Centrality”, Internet Math., 2014). HyperBall can also compute discounted centralities, in which the weight assigned to a node is some specified function of its distance. All centralities are computed in their positive version (i.e., using distance from the source: see below how to compute the more usual, and useful, negative version).

HyperBall has been described by Paolo Boldi and Sebastiano Vigna in “In-Core Computation of Geometric Centralities with HyperBall: A Hundred Billion Nodes and Beyond”, Proc. of 2013 IEEE 13th International Conference on Data Mining Workshops (ICDMW 2013), IEEE, 2013, and it is a generalization of the method described in “HyperANF: Approximating the Neighbourhood Function of Very Large Graphs on a Budget”, by Paolo Boldi, Marco Rosa and Sebastiano Vigna, Proceedings of the 20th international conference on World Wide Web, pages 625−634, ACM, (2011).

Incidentally, HyperBall (actually, HyperANF) has been used to show that Facebook has just four degrees of separation.

At step t, for each node we (approximately) keep track (using HyperLogLog counters) of the set of nodes at distance at most t. At each iteration, the sets associated with the successors of each node are merged, thus obtaining the new sets. A crucial component in making this process efficient and scalable is the usage of broadword programming to implement the join (merge) phase, which requires maximising in parallel the list of registers associated with each successor (the implementation is geared towards 64-bits processors).

Using the approximate sets, for each t we estimate the number of pairs of nodes (x,y) such that the distance from x to y is at most t. Since during the computation we are also in possession of the number of nodes at distance t − 1, we can also perform computations using the number of nodes at distance exactly t (e.g., centralities).

To use this class, you must first create an instance. Then, you call init() (once) and then iterate() as much as needed (you can init/iterate several times, if you want so). A commodity method will do everything for you. Finally, you must close() the instance. The method modified() will tell you whether the internal state of the algorithm has changed.

If you additionally pass to the constructor (or on the command line) the transpose of your graph (you can compute it using Transform.transposeOffline(ImmutableGraph,int) or Transform.transposeOffline(ImmutableGraph, int)), when three quarters of the nodes stop changing their value HyperBall will switch to a systolic computation: using the transpose, when a node changes it will signal back to its predecessors that at the next iteration they could change. At the next scan, only the successors of signalled nodes will be scanned. In particular, when a very small number of nodes is modified by an iteration, HyperBall will switch to a systolic local mode, in which all information about modified nodes is kept in (traditional) dictionaries, rather than being represented as arrays of booleans. This strategy makes the last phases of the computation orders of magnitude faster, and makes in practice the running time of HyperBall proportional to the theoretical bound O(m log n), where n is the number of nodes and m is the number of the arcs of the graph. Note that graphs with a large diameter require a correspondingly large number of iterations, and these iterations will have to pass over all nodes if you do not provide the tranpose.

Deciding when to stop iterating is a rather delicate issue. The only safe way is to iterate until modified() is zero, and systolic (local) computation makes this goal easily attainable. However, in some cases one can assume that the graph is not pathological, and stop when the relative increment of the number of pairs goes below some threshold.

Computing Centralities

Note that usually one is interested in the negative version of a centrality measure, that is, the version that depends on the incoming arcs. HyperBall can compute only positive centralities: if you are interested (as it usually happens) in the negative version, you must pass to HyperBall the transpose of the graph (and if you want to run in systolic mode, the original graph, which is the transpose of the transpose). Note that the neighbourhood function of the transpose is identical to the neighbourhood function of the original graph, so the exchange does not alter its computation.

Configuring the JVM

HyperBall computations go against all basic assumptions of Java garbage collection. It is thus essential that you reconfigure your JVM properly. A good starting point is the following command line:

 java -server -Xss256K -Xms100G -XX:PretenureSizeThreshold=512M -XX:MaxNewSize=4G \
      -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB \
      -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=99 -XX:+UseCMSInitiatingOccupancyOnly \
      -verbose:gc -Xloggc:gc.log ...
 
  • -Xss256K reduces the stack memory used by each thread.
  • -Xms100G size the heap: the more memory, the more counter per registers you can use (the amount, of course, depends on your hardware); please note that we set the starting heap size as expansion of large heaps is very expensive.
  • -XX:PretenureSizeThreshold=512M forces the allocation of registers directly into the old generation.
  • -XX:MaxNewSize=4G leaves almost all memory for the old generation.
  • -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=99 -XX:+UseCMSInitiatingOccupancyOnly set the concurrent garbage collector, and impose that no collection is performed until 99% of the permanent generation is filled.
  • -XX:+UseNUMA -XX:+UseTLAB -XX:+ResizeTLAB usually improve performance, but your mileage may vary.

Check the garbage collector logs (gc.log) to be sure that your minor and major collections are very infrequent (as they should be).

Performance issues

To use HyperBall effectively, you should aim at filling a large percentage of the available core memory. This requires, of course, to size properly the heap, but also to configure some parameters.

Most of the memory goes into storing HyperLogLog registers. By tuning the number of registers per counter, you can modify the memory allocated for them. The amount of memory is logged, and you should check that the number of registers you chose almost fills up the heap memory you allocated, possibly leaving space for the graph(s) (but read below). Note that you can only choose a number of registers per counter that is a power of two, so your latitude in adjusting the memory used for registers is somewhat limited.

If you have little memory, this class can perform external computations: instead of keeping in core memory an old and a new copy of the counters, it can dump on disk an update list containing pairs <nodecounter>. At the end of an iteration, the update list is loaded and applied to the counters in memory. The process is of course slower, but the core memory used is halved.

Then, some memory is necessary to load the graph (and possibly its tranpose). We suggest to check the offline option, which will map the graph into memory, rather than loading it. If you map the graph into memory, take care of leaving some free memory, beside that allocated for the heap, as the operating system will need some space to buffer the memory-mapped graph(s).

If there are several available cores, the runs of iterate() will be decomposed into relatively small tasks (small blocks of nodes) and each task will be assigned to the first available core. Since all tasks are completely independent, this behaviour ensures a very high degree of parallelism. Be careful, however, because this feature requires a graph with a reasonably fast random access (e.g., in the case of a BVGraph, short reference chains), as many calls to ImmutableGraph.nodeIterator(long) will be made. The granularity of the decomposition is the number of nodes assigned to each task.

In any case, when attacking very large graphs (in particular, in external mode) some system tuning (e.g., increasing the filesystem commit time) is a good idea. Also experimenting with granularity and buffer sizes can be useful. Smaller buffers reduce the waits on I/O calls, but increase the time spent in disk seeks. Large buffers improve I/O, but they use a lot of memory. The best possible setup is the one in which the cores are 100% busy during the graph scan, and the I/O time logged at the end of a scan is roughly equal to the time that is necessary to reload the counters from disk: in such a case, essentially, you are computing as fast as possible.

Author:
Sebastiano Vigna, Paolo Boldi, Marco Rosa
See Also:
  • Field Details

    • ASSERTS

      public static final boolean ASSERTS
      See Also:
    • DEFAULT_GRANULARITY

      public static final int DEFAULT_GRANULARITY
      The default granularity of a task.
      See Also:
    • DEFAULT_BUFFER_SIZE

      public static final int DEFAULT_BUFFER_SIZE
      The default size of a buffer in bytes.
      See Also:
    • gotTranspose

      protected final boolean gotTranspose
      True if we have the transpose graph.
    • systolic

      protected boolean systolic
      True if we started a systolic computation.
    • preLocal

      protected boolean preLocal
      True if we are preparing a local computation (we are systolic and less than 1% nodes were modified).
    • local

      protected boolean local
      True if we started a local computation.
    • doSumOfDistances

      protected final boolean doSumOfDistances
      Whether the sum of distances from each node (inverse of positive closeness centrality) should be computed; if false, sumOfDistances is null.
    • doSumOfInverseDistances

      protected boolean doSumOfInverseDistances
      Whether the sum of inverse distances from each node (positive harmonic centrality) should be computed; if false, sumOfInverseDistances is null.
    • neighbourhoodFunction

      public final DoubleArrayList neighbourhoodFunction
      The neighbourhood function, if requested.
    • sumOfDistances

      public final float[][] sumOfDistances
      The sum of the distances from every given node, if requested.
    • sumOfInverseDistances

      public final float[][] sumOfInverseDistances
      The sum of inverse distances from each given node, if requested.
    • discountFunction

      public final Int2DoubleFunction[] discountFunction
      A number of discounted centralities to be computed, possibly none.
    • discountedCentrality

      public final float[][][] discountedCentrality
      The overall discounted centrality, for every discountFunction.
    • numNodes

      protected final long numNodes
      The number of nodes of the graph, cached.
    • numArcs

      protected long numArcs
      The number of arcs of the graph, cached.
    • squareNumNodes

      protected final double squareNumNodes
      The square of numNodes, cached.
    • numberOfThreads

      protected final int numberOfThreads
      The number of cores used in the computation.
    • bufferSize

      protected final int bufferSize
      The size of an I/O buffer, in counters.
    • granularity

      protected final long granularity
      The number of actually scanned nodes per task in a multithreaded environment. Must be a multiple of Long.SIZE.
    • adaptiveGranularity

      protected long adaptiveGranularity
      The number of nodes per task (obtained by adapting granularity to the current ratio of modified nodes). Must be a multiple of Long.SIZE.
    • last

      protected double last
      The value computed by the last iteration.
    • current

      protected double current
      The value computed by the current iteration.
    • iteration

      protected int iteration
      The current iteration.
    • updateFile

      protected final File updateFile
      If external is true, the name of the temporary file that will be used to write the update list.
    • fileChannel

      protected final FileChannel fileChannel
      If external is true, a file channel used to write to the update list.
    • randomAccessFile

      protected RandomAccessFile randomAccessFile
      If external is true, the random-access file underlying fileChannel.
    • cumulativeOutdegrees

      protected final EliasFanoCumulativeOutdegreeList cumulativeOutdegrees
      The cumulative list of outdegrees.
    • pl

      protected final ProgressLogger pl
      A progress logger, or null.
    • lock

      protected final ReentrantLock lock
      The lock protecting all critical sections.
    • allWaiting

      protected final Condition allWaiting
      A condition that is notified when all iteration threads are waiting to be started.
    • start

      protected final Condition start
      The condition on which all iteration threads wait before starting a new phase.
    • phase

      public int phase
      The current computation phase.
    • closed

      protected boolean closed
      Whether this approximator has been already closed.
    • thread

      protected final it.unimi.dsi.big.webgraph.algo.HyperBall.IterationThread[] thread
      The threads performing the computation.
    • nodes

      protected final AtomicLong nodes
      An atomic integer keeping track of the number of node processed so far.
    • arcs

      protected final AtomicLong arcs
      An atomic integer keeping track of the number of arcs processed so far.
    • aliveThreads

      protected volatile int aliveThreads
      A variable used to wait for all threads to complete their iteration.
    • completed

      protected volatile boolean completed
      True if the computation is over.
    • numberOfWrites

      protected volatile long numberOfWrites
      Total number of write operation performed on fileChannel.
    • totalIoMillis

      protected volatile long totalIoMillis
      Total wait time in milliseconds of I/O activity on fileChannel.
    • nextNode

      protected long nextNode
      The starting node of the next chunk of nodes to be processed.
    • nextArcs

      protected long nextArcs
      The number of arcs before nextNode.
    • modified

      protected final AtomicLong modified
      The number of register modified by the last call to iterate().
    • unwritten

      protected final AtomicLong unwritten
      Counts the number of unwritten entries when external is true, or the number of counters that did not change their value.
    • relativeIncrement

      protected double relativeIncrement
      The relative increment of the neighbourhood function for the last iteration.
    • external

      protected boolean external
      Whether we should used an update list on disk, instead of computing results in core memory.
    • resultBits

      protected final long[][] resultBits
      If external is false, the arrays where results are stored.
    • resultRegisters

      protected final LongBigList[] resultRegisters
    • modifiedCounter

      protected boolean[][] modifiedCounter
      For each counter, whether it has changed its value. We use an array of boolean (instead of a LongArrayBitVector) just for access speed.
    • modifiedResultCounter

      protected boolean[][] modifiedResultCounter
      For each newly computed counter, whether it has changed its value. modifiedCounter will be updated with the content of this bit vector by the end of the iteration.
    • nextMustBeChecked

      protected boolean[][] nextMustBeChecked
      For each counter, whether it has changed its value. We use an array of boolean (instead of a LongArrayBitVector) just for access speed.
    • mustBeChecked

      protected boolean[][] mustBeChecked
      For each newly computed counter, whether it has changed its value. modifiedCounter will be updated with the content of this bit vector by the end of the iteration.
    • localCheckList

      protected long[] localCheckList
      If local is true, the sorted list of nodes that should be scanned.
    • localNextMustBeChecked

      protected final LongSet localNextMustBeChecked
      If preLocal is true, the list of nodes that should be scanned on the next iteration. Note that this set is synchronized.
    • threadThrowable

      protected volatile Throwable threadThrowable
      One of the throwables thrown by some of the threads, if at least one thread has thrown a throwable.
  • Constructor Details

    • HyperBall

      public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m, ProgressLogger pl, int numberOfThreads, int bufferSize, int granularity, boolean external) throws IOException
      Creates a new HyperBall instance.
      Parameters:
      g - the graph whose neighbourhood function you want to compute.
      gt - the transpose of g in case you want to perform systolic computations, or null.
      log2m - the logarithm of the number of registers per counter.
      pl - a progress logger, or null.
      numberOfThreads - the number of threads to be used (0 for automatic sizing).
      bufferSize - the size of an I/O buffer in bytes (0 for DEFAULT_BUFFER_SIZE).
      granularity - the number of node per task in a multicore environment (it will be rounded to the next multiple of 64), or 0 for DEFAULT_GRANULARITY.
      external - if true, results of an iteration will be stored on disk.
      Throws:
      IOException
    • HyperBall

      public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m) throws IOException
      Creates a new HyperBall instance using default values.
      Parameters:
      g - the graph whose neighbourhood function you want to compute.
      gt - the transpose of g in case you want to perform systolic computations, or null.
      log2m - the logarithm of the number of registers per counter.
      Throws:
      IOException
    • HyperBall

      public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m, ProgressLogger pl) throws IOException
      Creates a new HyperBall instance using default values.
      Parameters:
      g - the graph whose neighbourhood function you want to compute.
      gt - the transpose of g in case you want to perform systolic computations, or null.
      log2m - the logarithm of the number of registers per counter.
      pl - a progress logger, or null.
      Throws:
      IOException
    • HyperBall

      public HyperBall(ImmutableGraph g, int log2m) throws IOException
      Creates a new HyperBall instance using default values and disabling systolic computation.
      Parameters:
      g - the graph whose neighbourhood function you want to compute.
      log2m - the logarithm of the number of registers per counter.
      Throws:
      IOException
    • HyperBall

      public HyperBall(ImmutableGraph g, int log2m, long seed) throws IOException
      Creates a new HyperBall instance using default values and disabling systolic computation.
      Parameters:
      g - the graph whose neighbourhood function you want to compute.
      log2m - the logarithm of the number of registers per counter.
      seed - the random seed passed to HyperLogLogCounterArray(long, long, int, long).
      Throws:
      IOException
    • HyperBall

      public HyperBall(ImmutableGraph g, int log2m, ProgressLogger pl) throws IOException
      Creates a new HyperBall instance using default values and disabling systolic computation.
      Parameters:
      g - the graph whose neighbourhood function you want to compute.
      log2m - the logarithm of the number of registers per counter.
      pl - a progress logger, or null.
      Throws:
      IOException
    • HyperBall

      public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m, ProgressLogger pl, int numberOfThreads, int bufferSize, int granularity, boolean external, boolean doSumOfDistances, boolean doSumOfInverseDistances, Int2DoubleFunction[] discountFunction, long seed) throws IOException
      Creates a new HyperBall instance.
      Parameters:
      g - the graph whose neighbourhood function you want to compute.
      gt - the transpose of g, or null.
      log2m - the logarithm of the number of registers per counter.
      pl - a progress logger, or null.
      numberOfThreads - the number of threads to be used (0 for automatic sizing).
      bufferSize - the size of an I/O buffer in bytes (0 for DEFAULT_BUFFER_SIZE).
      granularity - the number of node per task in a multicore environment (it will be rounded to the next multiple of 64), or 0 for DEFAULT_GRANULARITY.
      external - if true, results of an iteration will be stored on disk.
      doSumOfDistances - whether the sum of distances from each node should be computed.
      doSumOfInverseDistances - whether the sum of inverse distances from each node should be computed.
      discountFunction - an array (possibly null) of discount functions.
      seed - the random seed passed to HyperLogLogCounterArray(long, long, int, long).
      Throws:
      IOException
  • Method Details