public class HyperBall extends HyperLogLogCounterArray implements SafelyCloseable
Computes an approximation of the neighbourhood function, of the size of the reachable sets, and of (discounted) positive geometric centralities of a graph using HyperBall.
HyperBall is an algorithm computing by dynamic programming an approximation of the sizes of the balls of growing radius around the nodes of a graph. Starting from these data, it can approximate the neighbourhood function of a graph, that is, the function returning for each t the number of pairs of nodes at distance at most t, the number of nodes reachable from each node, Bavelas's closeness centrality, Lin's index, and harmonic centrality (studied by Paolo Boldi and Sebastiano Vigna in “Axioms for Centrality”, Internet Math., 2014). HyperBall can also compute discounted centralities, in which the weight assigned to a node is some specified function of its distance. All centralities are computed in their positive version (i.e., using distance from the source: see below how to compute the more usual, and useful, negative version).
HyperBall has been described by Paolo Boldi and Sebastiano Vigna in “InCore Computation of Geometric Centralities with HyperBall: A Hundred Billion Nodes and Beyond”, Proc. of 2013 IEEE 13th International Conference on Data Mining Workshops (ICDMW 2013), IEEE, 2013, and it is a generalization of the method described in “HyperANF: Approximating the Neighbourhood Function of Very Large Graphs on a Budget”, by Paolo Boldi, Marco Rosa and Sebastiano Vigna, Proceedings of the 20th international conference on World Wide Web, pages 625−634, ACM, (2011).
Incidentally, HyperBall (actually, HyperANF) has been used to show that Facebook has just four degrees of separation.
At step t, for each node we (approximately) keep track (using HyperLogLog counters) of the set of nodes at distance at most t. At each iteration, the sets associated with the successors of each node are merged, thus obtaining the new sets. A crucial component in making this process efficient and scalable is the usage of broadword programming to implement the join (merge) phase, which requires maximising in parallel the list of registers associated with each successor (the implementation is geared towards 64bits processors).
Using the approximate sets, for each t we estimate the number of pairs of nodes (x,y) such that the distance from x to y is at most t. Since during the computation we are also in possession of the number of nodes at distance t − 1, we can also perform computations using the number of nodes at distance exactly t (e.g., centralities).
To use this class, you must first create an instance.
Then, you call init()
(once) and then iterate()
as much as needed (you can init/iterate several times, if you want so).
A commodity method will do everything for you.
Finally, you must close()
the instance. The method modified()
will tell you whether the internal state of
the algorithm has changed.
If you additionally pass to the constructor (or on the command line) the transpose of your graph (you can compute it using Transform.transposeOffline(ImmutableGraph,int)
or Transform.transposeOffline(ImmutableGraph, int)
), when three quarters of the nodes stop changing their value
HyperBall will switch to a systolic computation: using the transpose, when a node changes it will signal back
to its predecessors that at the next iteration they could change. At the next scan, only the successors of
signalled nodes will be scanned. In particular,
when a very small number of nodes is modified by an iteration, HyperBall will switch to a systolic local mode,
in which all information about modified nodes is kept in (traditional) dictionaries, rather than being represented as arrays of booleans.
This strategy makes the last phases of the computation orders of magnitude faster, and makes
in practice the running time of HyperBall proportional to the theoretical bound
O(m log n), where n
is the number of nodes and m is the number of the arcs of the graph. Note that
graphs with a large diameter require a correspondingly large number of iterations, and these iterations will have to
pass over all nodes if you do not provide the tranpose.
Deciding when to stop iterating is a rather delicate issue. The only safe way is to iterate until modified()
is zero,
and systolic (local) computation makes this goal easily attainable.
However, in some cases one can assume that the graph is not pathological, and stop when the relative increment of the number of pairs goes below
some threshold.
Note that usually one is interested in the negative version of a centrality measure, that is, the version that depends on the incoming arcs. HyperBall can compute only positive centralities: if you are interested (as it usually happens) in the negative version, you must pass to HyperBall the transpose of the graph (and if you want to run in systolic mode, the original graph, which is the transpose of the transpose). Note that the neighbourhood function of the transpose is identical to the neighbourhood function of the original graph, so the exchange does not alter its computation.
HyperBall computations go against all basic assumptions of Java garbage collection. It is thus essential that you reconfigure your JVM properly. A good starting point is the following command line:
java server Xss256K Xms100G XX:PretenureSizeThreshold=512M XX:MaxNewSize=4G \ XX:+UseNUMA XX:+UseTLAB XX:+ResizeTLAB \ XX:+UseConcMarkSweepGC XX:CMSInitiatingOccupancyFraction=99 XX:+UseCMSInitiatingOccupancyOnly \ verbose:gc Xloggc:gc.log ...
Xss256K
reduces the stack memory used by each thread.
Xms100G
size the heap: the more memory, the more counter per registers
you can use (the amount, of course, depends on your hardware); please note that we set the
starting heap size as expansion of large heaps is very expensive.
XX:PretenureSizeThreshold=512M
forces the allocation of registers directly into the old generation.
XX:MaxNewSize=4G
leaves almost all memory for the old generation.
XX:+UseConcMarkSweepGC XX:CMSInitiatingOccupancyFraction=99 XX:+UseCMSInitiatingOccupancyOnly
set the concurrent garbage collector, and impose that no collection is performed until 99% of the permanent
generation is filled.
XX:+UseNUMA XX:+UseTLAB XX:+ResizeTLAB
usually improve performance, but your mileage may vary.
Check the garbage collector logs (gc.log
) to be sure that your
minor and major collections are very infrequent (as they should be).
To use HyperBall effectively, you should aim at filling a large percentage of the available core memory. This requires, of course, to size properly the heap, but also to configure some parameters.
Most of the memory goes into storing HyperLogLog registers. By tuning the number of registers per counter, you can modify the memory allocated for them. The amount of memory is logged, and you should check that the number of registers you chose almost fills up the heap memory you allocated, possibly leaving space for the graph(s) (but read below). Note that you can only choose a number of registers per counter that is a power of two, so your latitude in adjusting the memory used for registers is somewhat limited.
If you have little memory, this class can perform external computations: instead of keeping in core memory an old and a new copy of the counters, it can dump on disk an update list containing pairs <node, counter>. At the end of an iteration, the update list is loaded and applied to the counters in memory. The process is of course slower, but the core memory used is halved.
Then, some memory is necessary to load the graph (and possibly its tranpose). We suggest to check the offline option, which will map the graph into memory, rather than loading it. If you map the graph into memory, take care of leaving some free memory, beside that allocated for the heap, as the operating system will need some space to buffer the memorymapped graph(s).
If there are several available cores, the runs of iterate()
will be decomposed into relatively
small tasks (small blocks of nodes) and each task will be assigned to the first available core. Since all tasks are completely
independent, this behaviour ensures a very high degree of parallelism. Be careful, however, because this feature requires a graph with
a reasonably fast random access (e.g., in the case of a BVGraph
, short reference chains), as many
calls to ImmutableGraph.nodeIterator(long)
will be made. The granularity of the decomposition
is the number of nodes assigned to each task.
In any case, when attacking very large graphs (in particular, in external mode) some system tuning (e.g., increasing the filesystem commit time) is a good idea. Also experimenting with granularity and buffer sizes can be useful. Smaller buffers reduce the waits on I/O calls, but increase the time spent in disk seeks. Large buffers improve I/O, but they use a lot of memory. The best possible setup is the one in which the cores are 100% busy during the graph scan, and the I/O time logged at the end of a scan is roughly equal to the time that is necessary to reload the counters from disk: in such a case, essentially, you are computing as fast as possible.
Modifier and Type  Field and Description 

protected long 
adaptiveGranularity
The number of nodes per task (obtained by adapting
granularity to the current ratio of modified nodes). 
protected int 
aliveThreads
A variable used to wait for all threads to complete their iteration.

protected java.util.concurrent.locks.Condition 
allWaiting
A condition that is notified when all iteration threads are waiting to be started.

protected java.util.concurrent.atomic.AtomicLong 
arcs
An atomic integer keeping track of the number of arcs processed so far.

static boolean 
ASSERTS 
protected int 
bufferSize
The size of an I/O buffer, in counters.

protected boolean 
closed
Whether this approximator has been already closed.

protected boolean 
completed
True if the computation is over.

protected EliasFanoCumulativeOutdegreeList 
cumulativeOutdegrees
The cumulative list of outdegrees.

protected double 
current
The value computed by the current iteration.

static int 
DEFAULT_BUFFER_SIZE
The default size of a buffer in bytes.

static int 
DEFAULT_GRANULARITY
The default granularity of a task.

float[][][] 
discountedCentrality
The overall discounted centrality, for every
discountFunction . 
Int2DoubleFunction[] 
discountFunction
A number of discounted centralities to be computed, possibly none.

protected boolean 
doSumOfDistances
Whether the sum of distances from each node (inverse of positive closeness centrality) should be computed; if false,
sumOfDistances is null . 
protected boolean 
doSumOfInverseDistances
Whether the sum of inverse distances from each node (positive harmonic centrality) should be computed; if false,
sumOfInverseDistances is null . 
protected boolean 
external
Whether we should used an update list on disk, instead of computing results in core memory.

protected java.nio.channels.FileChannel 
fileChannel
If
external is true, a file channel used to write to the update list. 
protected boolean 
gotTranspose
True if we have the transpose graph.

protected long 
granularity
The number of actually scanned nodes per task in a multithreaded environment.

protected int 
iteration
The current iteration.

protected double 
last
The value computed by the last iteration.

protected boolean 
local
True if we started a local computation.

protected long[] 
localCheckList
If
local is true, the sorted list of nodes that should be scanned. 
protected LongSet 
localNextMustBeChecked
If
preLocal is true, the list of nodes that should be scanned on the next iteration. 
protected java.util.concurrent.locks.ReentrantLock 
lock
The lock protecting all critical sections.

protected java.util.concurrent.atomic.AtomicLong 
modified
The number of register modified by the last call to
iterate() . 
protected boolean[][] 
modifiedCounter
For each counter, whether it has changed its value.

protected boolean[][] 
modifiedResultCounter
For each newly computed counter, whether it has changed its value.

protected boolean[][] 
mustBeChecked
For each newly computed counter, whether it has changed its value.

DoubleArrayList 
neighbourhoodFunction
The neighbourhood function, if requested.

protected long 
nextArcs
The number of arcs before
nextNode . 
protected boolean[][] 
nextMustBeChecked
For each counter, whether it has changed its value.

protected long 
nextNode
The starting node of the next chunk of nodes to be processed.

protected java.util.concurrent.atomic.AtomicLong 
nodes
An atomic integer keeping track of the number of node processed so far.

protected long 
numArcs
The number of arcs of the graph, cached.

protected int 
numberOfThreads
The number of cores used in the computation.

protected long 
numberOfWrites
Total number of write operation performed on
fileChannel . 
protected long 
numNodes
The number of nodes of the graph, cached.

int 
phase
The current computation phase.

protected ProgressLogger 
pl
A progress logger, or
null . 
protected boolean 
preLocal
True if we are preparing a local computation (we are
systolic and less than 1% nodes were modified). 
protected java.io.RandomAccessFile 
randomAccessFile
If
external is true, the randomaccess file underlying fileChannel . 
protected double 
relativeIncrement
The relative increment of the neighbourhood function for the last iteration.

protected long[][] 
resultBits
If
external is false, the arrays where results are stored. 
protected LongBigList[] 
resultRegisters

protected double 
squareNumNodes
The square of
numNodes , cached. 
protected java.util.concurrent.locks.Condition 
start
The condition on which all iteration threads wait before starting a new phase.

float[][] 
sumOfDistances
The sum of the distances from every given node, if requested.

float[][] 
sumOfInverseDistances
The sum of inverse distances from each given node, if requested.

protected boolean 
systolic
True if we started a systolic computation.

protected it.unimi.dsi.big.webgraph.algo.HyperBall.IterationThread[] 
thread
The threads performing the computation.

protected java.lang.Throwable 
threadThrowable
One of the throwables thrown by some of the threads, if at least one thread has thrown a throwable.

protected long 
totalIoMillis
Total wait time in milliseconds of I/O activity on
fileChannel . 
protected java.util.concurrent.atomic.AtomicLong 
unwritten
Counts the number of unwritten entries when
external is true, or
the number of counters that did not change their value. 
protected java.io.File 
updateFile
If
external is true, the name of the temporary file that will be used to write the update list. 
bits, CHUNK_MASK, CHUNK_SHIFT, CHUNK_SIZE, counterLongwords, counterResidualMask, counterShift, counterSize, log2m, longwordAligned, lsbMask, m, mMinus1, msbMask, registers, registerSize, seed
Constructor and Description 

HyperBall(ImmutableGraph g,
ImmutableGraph gt,
int log2m)
Creates a new HyperBall instance using default values.

HyperBall(ImmutableGraph g,
ImmutableGraph gt,
int log2m,
ProgressLogger pl)
Creates a new HyperBall instance using default values.

HyperBall(ImmutableGraph g,
ImmutableGraph gt,
int log2m,
ProgressLogger pl,
int numberOfThreads,
int bufferSize,
int granularity,
boolean external)
Creates a new HyperBall instance.

HyperBall(ImmutableGraph g,
ImmutableGraph gt,
int log2m,
ProgressLogger pl,
int numberOfThreads,
int bufferSize,
int granularity,
boolean external,
boolean doSumOfDistances,
boolean doSumOfInverseDistances,
Int2DoubleFunction[] discountFunction,
long seed)
Creates a new HyperBall instance.

HyperBall(ImmutableGraph g,
int log2m)
Creates a new HyperBall instance using default values and disabling systolic computation.

HyperBall(ImmutableGraph g,
int log2m,
long seed)
Creates a new HyperBall instance using default values and disabling systolic computation.

HyperBall(ImmutableGraph g,
int log2m,
ProgressLogger pl)
Creates a new HyperBall instance using default values and disabling systolic computation.

Modifier and Type  Method and Description 

void 
close() 
protected static int 
ensureRegisters(int log2m) 
protected void 
finalize() 
void 
init()
Initialises the approximator.

void 
init(long seed)
Initialises the approximator, providing a new seed to the underlying
HyperLogLogCounterArray . 
void 
iterate()
Performs a new iteration of HyperBall.

static void 
main(java.lang.String[] arg) 
long 
modified()
Returns the number of HyperLogLog counters that were modified by the last call to
iterate() . 
void 
run()
Runs HyperBall.

void 
run(long upperBound)
Runs HyperBall.

void 
run(long upperBound,
double threshold)
Runs HyperBall.

void 
run(long upperBound,
double threshold,
long seed)
Runs HyperBall.

add, chunk, clear, clear, count, count, getCounter, getCounter, log2NumberOfRegisters, max, max, offset, registers, registerSize, relativeStandardDeviation, setCounter, setCounter, transfer
public static final boolean ASSERTS
public static final int DEFAULT_GRANULARITY
public static final int DEFAULT_BUFFER_SIZE
protected final boolean gotTranspose
protected boolean systolic
protected boolean preLocal
systolic
and less than 1% nodes were modified).protected boolean local
protected final boolean doSumOfDistances
sumOfDistances
is null
.protected boolean doSumOfInverseDistances
sumOfInverseDistances
is null
.public final DoubleArrayList neighbourhoodFunction
public final float[][] sumOfDistances
public final float[][] sumOfInverseDistances
public final Int2DoubleFunction[] discountFunction
public final float[][][] discountedCentrality
discountFunction
.protected final long numNodes
protected long numArcs
protected final double squareNumNodes
numNodes
, cached.protected final int numberOfThreads
protected final int bufferSize
protected final long granularity
Long.SIZE
.protected long adaptiveGranularity
granularity
to the current ratio of modified nodes). Must be a multiple of Long.SIZE
.protected double last
protected double current
protected int iteration
protected final java.io.File updateFile
external
is true, the name of the temporary file that will be used to write the update list.protected final java.nio.channels.FileChannel fileChannel
external
is true, a file channel used to write to the update list.protected java.io.RandomAccessFile randomAccessFile
external
is true, the randomaccess file underlying fileChannel
.protected final EliasFanoCumulativeOutdegreeList cumulativeOutdegrees
protected final ProgressLogger pl
null
.protected final java.util.concurrent.locks.ReentrantLock lock
protected final java.util.concurrent.locks.Condition allWaiting
protected final java.util.concurrent.locks.Condition start
public int phase
protected boolean closed
protected final it.unimi.dsi.big.webgraph.algo.HyperBall.IterationThread[] thread
protected final java.util.concurrent.atomic.AtomicLong nodes
protected final java.util.concurrent.atomic.AtomicLong arcs
protected volatile int aliveThreads
protected volatile boolean completed
protected volatile long numberOfWrites
fileChannel
.protected volatile long totalIoMillis
fileChannel
.protected long nextNode
protected long nextArcs
nextNode
.protected final java.util.concurrent.atomic.AtomicLong modified
iterate()
.protected final java.util.concurrent.atomic.AtomicLong unwritten
external
is true, or
the number of counters that did not change their value.protected double relativeIncrement
protected boolean external
protected final long[][] resultBits
external
is false, the arrays where results are stored.protected final LongBigList[] resultRegisters
protected boolean[][] modifiedCounter
LongArrayBitVector
) just for access speed.protected boolean[][] modifiedResultCounter
modifiedCounter
will be updated with the content of this bit vector by the end of the iteration.protected boolean[][] nextMustBeChecked
LongArrayBitVector
) just for access speed.protected boolean[][] mustBeChecked
modifiedCounter
will be updated with the content of this bit vector by the end of the iteration.protected long[] localCheckList
local
is true, the sorted list of nodes that should be scanned.protected final LongSet localNextMustBeChecked
preLocal
is true, the list of nodes that should be scanned on the next iteration. Note that this set is synchronized.protected volatile java.lang.Throwable threadThrowable
public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m, ProgressLogger pl, int numberOfThreads, int bufferSize, int granularity, boolean external) throws java.io.IOException
g
 the graph whose neighbourhood function you want to compute.gt
 the transpose of g
in case you want to perform systolic computations, or null
.log2m
 the logarithm of the number of registers per counter.pl
 a progress logger, or null
.numberOfThreads
 the number of threads to be used (0 for automatic sizing).bufferSize
 the size of an I/O buffer in bytes (0 for DEFAULT_BUFFER_SIZE
).granularity
 the number of node per task in a multicore environment (it will be rounded to the next multiple of 64), or 0 for DEFAULT_GRANULARITY
.external
 if true, results of an iteration will be stored on disk.java.io.IOException
public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m) throws java.io.IOException
g
 the graph whose neighbourhood function you want to compute.gt
 the transpose of g
in case you want to perform systolic computations, or null
.log2m
 the logarithm of the number of registers per counter.java.io.IOException
public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m, ProgressLogger pl) throws java.io.IOException
g
 the graph whose neighbourhood function you want to compute.gt
 the transpose of g
in case you want to perform systolic computations, or null
.log2m
 the logarithm of the number of registers per counter.pl
 a progress logger, or null
.java.io.IOException
public HyperBall(ImmutableGraph g, int log2m) throws java.io.IOException
g
 the graph whose neighbourhood function you want to compute.log2m
 the logarithm of the number of registers per counter.java.io.IOException
public HyperBall(ImmutableGraph g, int log2m, long seed) throws java.io.IOException
g
 the graph whose neighbourhood function you want to compute.log2m
 the logarithm of the number of registers per counter.seed
 the random seed passed to HyperLogLogCounterArray.HyperLogLogCounterArray(long, long, int, long)
.java.io.IOException
public HyperBall(ImmutableGraph g, int log2m, ProgressLogger pl) throws java.io.IOException
g
 the graph whose neighbourhood function you want to compute.log2m
 the logarithm of the number of registers per counter.pl
 a progress logger, or null
.java.io.IOException
public HyperBall(ImmutableGraph g, ImmutableGraph gt, int log2m, ProgressLogger pl, int numberOfThreads, int bufferSize, int granularity, boolean external, boolean doSumOfDistances, boolean doSumOfInverseDistances, Int2DoubleFunction[] discountFunction, long seed) throws java.io.IOException
g
 the graph whose neighbourhood function you want to compute.gt
 the transpose of g
, or null
.log2m
 the logarithm of the number of registers per counter.pl
 a progress logger, or null
.numberOfThreads
 the number of threads to be used (0 for automatic sizing).bufferSize
 the size of an I/O buffer in bytes (0 for DEFAULT_BUFFER_SIZE
).granularity
 the number of node per task in a multicore environment (it will be rounded to the next multiple of 64), or 0 for DEFAULT_GRANULARITY
.external
 if true, results of an iteration will be stored on disk.doSumOfDistances
 whether the sum of distances from each node should be computed.doSumOfInverseDistances
 whether the sum of inverse distances from each node should be computed.discountFunction
 an array (possibly null
) of discount functions.seed
 the random seed passed to HyperLogLogCounterArray.HyperLogLogCounterArray(long, long, int, long)
.java.io.IOException
protected static final int ensureRegisters(int log2m)
public void init()
This method must be call before a series of iterations.
Note that it will not change the seed used by the underlying HyperLogLogCounterArray
.
init(long)
public void init(long seed)
HyperLogLogCounterArray
.
This method must be call before a series of iterations.
seed
 passed to HyperLogLogCounterArray.clear(long)
.public void close() throws java.io.IOException
close
in interface java.io.Closeable
close
in interface java.lang.AutoCloseable
java.io.IOException
protected void finalize() throws java.lang.Throwable
finalize
in class java.lang.Object
java.lang.Throwable
public void iterate() throws java.io.IOException
java.io.IOException
public long modified()
iterate()
.iterate()
.public void run() throws java.io.IOException
modified()
returns false.java.io.IOException
public void run(long upperBound) throws java.io.IOException
upperBound
 an upper bound to the number of iterations.java.io.IOException
public void run(long upperBound, double threshold) throws java.io.IOException
upperBound
 an upper bound to the number of iterations.threshold
 a value that will be used to stop the computation by relative increment if the neighbourhood function is being computed; if you specify 1,
the computation will stop when modified()
returns false.java.io.IOException
public void run(long upperBound, double threshold, long seed) throws java.io.IOException
upperBound
 an upper bound to the number of iterations.threshold
 a value that will be used to stop the computation by relative increment if the neighbourhood function is being computed; if you specify 1,
the computation will stop when modified()
returns false.seed
 the random seed passed to HyperLogLogCounterArray.HyperLogLogCounterArray(long, long, int, long)
.java.io.IOException
public static void main(java.lang.String[] arg) throws java.io.IOException, JSAPException, java.lang.IllegalArgumentException, java.lang.ClassNotFoundException, java.lang.IllegalAccessException, java.lang.reflect.InvocationTargetException, java.lang.InstantiationException, java.lang.NoSuchMethodException
java.io.IOException
JSAPException
java.lang.IllegalArgumentException
java.lang.ClassNotFoundException
java.lang.IllegalAccessException
java.lang.reflect.InvocationTargetException
java.lang.InstantiationException
java.lang.NoSuchMethodException