diff --git a/main.py b/main.py new file mode 100644 index 0000000..af89fd5 --- /dev/null +++ b/main.py @@ -0,0 +1,97 @@ +""" +Reference implementation of node2vec for text data. + +Author: Hanseok Jo + +For more details, refer to the paper: +node2vec: Scalable Feature Learning for Networks +Aditya Grover and Jure Leskovec +Knowledge Discovery and Data Mining (KDD), 2016 + +and github page of original node2vec: +https://github.com/aditya-grover/node2vec +""" + +import argparse +import node2vec + + +def parse_args(): + """ + Parses the node2vec arguments. + """ + parser = argparse.ArgumentParser(description="Run node2vec.") + + parser.add_argument('--input', nargs='?', default='graph/karate.edgelist', + help='Input graph path') + + parser.add_argument('--output', nargs='?', default='emb/karate.emb', + help='Embeddings path') + + parser.add_argument('--model_type', type=str, default='w2v', + help='Node2vec uses word2vec(fasttext). Default is word2vec.') + + parser.add_argument('--dimensions', type=int, default=128, + help='Number of dimensions. Default is 128.') + + parser.add_argument('--walk-length', type=int, default=40, + help='Length of walk per source. Default is 80.') + + parser.add_argument('--num-walks', type=int, default=10, + help='Number of walks per source. Default is 10.') + + parser.add_argument('--window-size', type=int, default=2, + help='Context size for optimization. Default is 10.') + + parser.add_argument('--iter', default=10, type=int, + help='Number of epochs in SGD') + + parser.add_argument('--workers', type=int, default=8, + help='Number of parallel workers. Default is 8.') + + parser.add_argument('--p', type=float, default=1, + help='Return hyperparameter. Default is 1.') + + parser.add_argument('--q', type=float, default=1, + help='Inout hyperparameter. Default is 1.') + + parser.add_argument('--weighted', dest='weighted', action='store_true', + help='Boolean specifying (un)weighted. Default is unweighted.') + parser.add_argument('--unweighted', dest='unweighted', action='store_false') + parser.set_defaults(weighted=False) + + parser.add_argument('--directed', dest='directed', action='store_true', + help='Graph is (un)directed. Default is undirected.') + parser.add_argument('--undirected', dest='undirected', action='store_false') + parser.set_defaults(directed=False) + + return parser.parse_args() + + +def main(args): + """ + Pipeline for representational learning for all nodes in a graph. + """ + input_path = args.input + output_path = args.output + model_type = args.model_type + dimensions = args.dimensions + walk_length = args.walk_length + num_walks = args.num_walks + window_size = args.window_size + itr = args.iter + workers = args.workers + p = args.p + q = args.q + is_weighted = args.weighted + is_directed = args.directed + + graph = node2vec.Node2Vec(dimensions=dimensions, walk_length=walk_length, num_walks=num_walks, + window_size=window_size, itr=itr, workers=workers, p=p, q=q, + is_weighted=is_weighted, is_directed=is_directed) + graph.train(inputs=input_path, model_type=model_type) + graph.save(output_path=output_path) + +if __name__ == "__main__": + args = parse_args() + main(args) diff --git a/node2vec.py b/node2vec.py new file mode 100644 index 0000000..172138f --- /dev/null +++ b/node2vec.py @@ -0,0 +1,223 @@ +import os +import random +import networkx as nx +from datetime import datetime + +import numpy as np +from gensim.models.word2vec import Word2Vec +from gensim.models.fasttext import FastText + + +class Node2Vec: + def __init__(self, dimensions=128, walk_length=40, num_walks=10, window_size=2, min_count=0, sg=1, itr=1, workers=8, + p=1, q=1, is_weighted=False, is_directed=False): + self.dimensions = dimensions + self.walk_length = walk_length + self.num_walks = num_walks + self.window_size = window_size + self.min_count = min_count + self.sg = sg + self.iter = itr + self.workers = workers + self.p = p + self.q = q + self.is_weighted = is_weighted + self.is_directed = is_directed + + self.graph = None + self.alias_nodes = None + self.alias_edges = None + self.emb_model = None + + def _read_graph(self, edgelist): + """ + Reads the input network in networkx + """ + graph = self._parse_edgelist(edgelist) + + if not self.is_directed: + graph = graph.to_undirected() + + self.graph = graph + + def _parse_edgelist(self, edgelist): + parse = None + if isinstance(edgelist, list): + parse = nx.parse_edgelist + elif isinstance(edgelist, str): + if os.path.isfile(edgelist): + parse = nx.read_edgelist + else: + raise FileNotFoundError('Edgelist file is not found.') + + if self.is_weighted: + graph = parse(edgelist, nodetype=str, data=(('weight', float),), create_using=nx.DiGraph()) + else: + graph = parse(edgelist, nodetype=str, create_using=nx.DiGraph()) + for edge in graph.edges(): + graph[edge[0]][edge[1]]['weight'] = 1 + return graph + + def _node2vec_walk(self, start_node): + """ + Simulate a random walk starting from start node. + """ + graph = self.graph + alias_nodes = self.alias_nodes + alias_edges = self.alias_edges + + walk = [start_node] + + while len(walk) < self.walk_length: + cur = walk[-1] + cur_nbrs = sorted(graph.neighbors(cur)) + if len(cur_nbrs) > 0: + if len(walk) == 1: + walk.append(cur_nbrs[alias_draw(alias_nodes[cur][0], alias_nodes[cur][1])]) + else: + prev = walk[-2] + next = cur_nbrs[alias_draw(alias_edges[(prev, cur)][0], alias_edges[(prev, cur)][1])] + walk.append(next) + else: + break + + return [str(i) for i in walk] + + def _simulate_walks(self): + """ + Repeatedly simulate random walks from each node. + """ + graph = self.graph + walks = [] + nodes = list(graph.nodes()) + print('Walk iteration:') + for walk_iter in range(self.num_walks): + print(datetime.now(), str(walk_iter+1), '/', str(self.num_walks)) + random.shuffle(nodes) + for node in nodes: + walks.append(self._node2vec_walk(start_node=node)) + + return walks + + def _get_alias_edge(self, src, dst): + """ + Get the alias edge setup lists for a given edge. + """ + graph = self.graph + p = self.p + q = self.q + + unnormalized_probs = [] + for dst_nbr in sorted(graph.neighbors(dst)): + if dst_nbr == src: + unnormalized_probs.append(graph[dst][dst_nbr]['weight']/p) + elif graph.has_edge(dst_nbr, src): + unnormalized_probs.append(graph[dst][dst_nbr]['weight']) + else: + unnormalized_probs.append(graph[dst][dst_nbr]['weight']/q) + norm_const = sum(unnormalized_probs) + normalized_probs = [float(u_prob)/norm_const for u_prob in unnormalized_probs] + + return alias_setup(normalized_probs) + + def _preprocess_transition_probs(self): + """ + Preprocessing of transition probabilities for guiding the random walks. + """ + graph = self.graph + is_directed = self.is_directed + + alias_nodes = {} + for node in graph.nodes(): + unnormalized_probs = [graph[node][nbr]['weight'] for nbr in sorted(graph.neighbors(node))] + norm_const = sum(unnormalized_probs) + normalized_probs = [float(u_prob)/norm_const for u_prob in unnormalized_probs] + alias_nodes[node] = alias_setup(normalized_probs) + + alias_edges = {} + + if is_directed: + for edge in graph.edges(): + alias_edges[edge] = self._get_alias_edge(edge[0], edge[1]) + else: + for edge in graph.edges(): + alias_edges[edge] = self._get_alias_edge(edge[0], edge[1]) + alias_edges[(edge[1], edge[0])] = self._get_alias_edge(edge[1], edge[0]) + + self.alias_nodes = alias_nodes + self.alias_edges = alias_edges + + def _learn_embeddings(self, walks, model_type): + if model_type == 'w2v': + model = Word2Vec(walks, size=self.dimensions, window=self.window_size, min_count=self.min_count, + sg=self.sg, workers=self.workers, iter=self.iter) + elif model_type == 'ft': + model = FastText(walks, size=self.dimensions, window=self.window_size, min_count=self.min_count, + sg=self.sg, workers=self.workers, iter=self.iter) + else: + raise TypeError('Model must be \'w2v\' or \'ft\'.') + return model + + def train(self, inputs, model_type, max_walks_length): + print(datetime.now(), 'Start reading graph') + self._read_graph(edgelist=inputs) + + print(datetime.now(), 'Preprocess transition probs') + self._preprocess_transition_probs() + + print(datetime.now(), 'Simulate walks') + walks = self._simulate_walks() + + print(datetime.now(), 'Start embedding. # of walk: %d' % len(walks)) + random.shuffle(walks) + self.emb_model = self._learn_embeddings(walks[:max_walks_length], model_type) + print(datetime.now(), 'End embedding') + + def save(self, output_path): + self.emb_model.save(output_path) + + +def alias_setup(probs): + """ + Compute utility lists for non-uniform sampling from discrete distributions. + Refer to https://hips.seas.harvard.edu/blog/2013/03/03/the-alias-method-efficient-sampling-with-many-discrete-outcomes/ + for details + """ + k = len(probs) + q = np.zeros(k) + j = np.zeros(k, dtype=np.int) + + smaller = [] + larger = [] + for kk, prob in enumerate(probs): + q[kk] = k*prob + if q[kk] < 1.0: + smaller.append(kk) + else: + larger.append(kk) + + while len(smaller) > 0 and len(larger) > 0: + small = smaller.pop() + large = larger.pop() + + j[small] = large + q[large] = q[large] + q[small] - 1.0 + if q[large] < 1.0: + smaller.append(large) + else: + larger.append(large) + + return j, q + + +def alias_draw(j, q): + """ + Draw sample from a non-uniform discrete distribution using alias sampling. + """ + k = len(j) + + kk = int(np.floor(np.random.rand()*k)) + if np.random.rand() < q[kk]: + return kk + else: + return j[kk] diff --git a/node2vec_spark/README.md b/node2vec_spark/README.md deleted file mode 100644 index 43f6684..0000000 --- a/node2vec_spark/README.md +++ /dev/null @@ -1,139 +0,0 @@ -# node2vec on spark - -This library is a implementation using scala for running on spark of *node2vec* as described in the paper: -> node2vec: Scalable Feature Learning for Networks. -> Aditya Grover and Jure Leskovec. -> Knowledge Discovery and Data Mining, 2016. -> - -The *node2vec* algorithm learns continuous representations for nodes in any (un)directed, (un)weighted graph. Please check the [project page](https://snap.stanford.edu/node2vec/) for more details. - - -### Building node2vec_spark -**In order to build node2vec_spark, use the following:** - -``` -$ git clone https://github.com/Skarface-/node2vec.git -$ mvn clean package -``` - -**and requires:**
-Maven 3.0.5 or newer
-Java 7+
-Scala 2.10 or newer. - -This will produce jar file in "node2vec_spark/target/" - -### Examples -This library has two functions: *randomwalk* and *embedding*.
-These were described in these papers [node2vec: Scalable Feature Learning for Networks](http://arxiv.org/abs/1607.00653) and [Efficient Estimation of Word Representations in Vector Space](https://arxiv.org/abs/1301.3781). - -### Random walk -Example: - - ./spark-submit --class com.navercorp.Main \ - ./node2vec_spark/target/node2vec-0.0.1-SNAPSHOT.jar \ - --cmd randomwalk --p 100.0 --q 100.0 --walkLength 40 \ - --input --output - -#### Options -Invoke a command without arguments to list available arguments and their default values: - -``` ---cmd COMMAND - Functions: randomwalk or embedding. If you want to execute all functions "randomwalk" and "embedding" sequentially input "node2vec". Default "node2vec" ---input [INPUT] - Input edgelist path. The supported input format is an edgelist: "node1_id_int node2_id_int " ---output [OUTPUT] - Random paths path. ---walkLength WALK_LENGTH - Length of walk per source. Default is 80. ---numWalks NUM_WALKS - Number of walks per source. Default is 10. ---p P - Return hyperparaemter. Default is 1.0. ---q Q - Inout hyperparameter. Default is 1.0. ---weighted Boolean - Specifying (un)weighted. Default is true. ---directed Boolean - Specifying (un)directed. Default is false. ---degree UPPER_BOUND_OF_NUMBER_OF_NEIGHBORS - Specifying upper bound of number of neighbors. Default is 30. ---indexed Boolean - Specifying whether nodes in edgelist are indexed or not. Default is true. -``` - -* If "indexed" is set to false, *node2vec_spark* index nodes in input edgelist, example:
- **unindexed edgelist:**
- node1 node2 1.0
- node2 node7 1.0
- - **indexed:**
- 1 2 1.0
- 2 3 1.0
- - 1 node1
- 2 node2
- 3 node7 - -#### Input -The supported input format is an edgelist: - - node1_id_int node2_id_int - or - node1_str node2_str , Please set the option "indexed" to false - - -#### Output -The output file (number of nodes)*numWalks random paths as follows: - - src_node_id_int node1_id_int node2_id_int ... noden_id_int - - -### Embedding random paths -Example: - - ./spark-submit --class com.navercorp.Main \ - ./node2vec_spark/target/node2vec-0.0.1-SNAPSHOT.jar \ - --cmd embedding --dim 50 --iter 20 \ - --input --nodePath --output - -#### Options -Invoke a command without arguments to list available arguments and their default values: - -``` ---cmd COMMAND - embedding. If you want to execute sequentially all functions: "randomwalk" and "embedding", input "node2vec". default "node2vec" ---input [INPUT] - Input random paths. The supported input format is an random paths: "src_node_id_int node1_id_int ... noden_id_int" ---output [OUTPUT] - word2vec model(.bin) and embeddings(.emb). ---nodePath [NODE\_PATH] - Input node2index path. The supported input format: "node1_str node1_id_int" ---iter ITERATION - Number of epochs in SGD. Default 10. ---dim DIMENSION - Number of dimensions. Default is 128. ---window WINDOW_SIZE - Context size for optimization. Default is 10. - -``` - -#### Input -The supported input format is an random paths: - - src_node_id_int node1_id_int ... noden_id_int - -#### Output -The output files are **embeddings and word2vec model.** The embeddings file has the following format: - - node1_str dim1 dim2 ... dimd - -where dim1, ... , dimd is the d-dimensional representation learned by word2vec. - -the output file *word2vec model* has the spark word2vec model format. please reference to https://spark.apache.org/docs/1.5.2/mllib-feature-extraction.html#word2vec - -## References -1. [node2vec: Scalable Feature Learning for Networks](http://arxiv.org/abs/1607.00653) -2. [Efficient Estimation of Word Representations in Vector Space](https://arxiv.org/abs/1301.3781) \ No newline at end of file diff --git a/node2vec_spark/pom.xml b/node2vec_spark/pom.xml deleted file mode 100644 index b958576..0000000 --- a/node2vec_spark/pom.xml +++ /dev/null @@ -1,129 +0,0 @@ - - - - 4.0.0 - - com.navercorp - node2vec - jar - 0.0.1-SNAPSHOT - - node2vec_spark - http://snap.stanford.edu/node2vec/ - - - UTF-8 - bin - 2.4.3 - 1.4.0 - 1.7 - 2.10 - - - - - - org.scala-tools - maven-scala-plugin - 2.15.2 - - - - compile - - - - - - org.apache.maven.plugins - maven-dependency-plugin - 2.4 - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/lib - - - - - - org.apache.maven.plugins - maven-shade-plugin - 1.6 - - - package - - shade - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 2.3.2 - - 1.7 - 1.7 - UTF-8 - - - - org.apache.maven.plugins - maven-surefire-plugin - - false - - - - - - - - org.apache.hadoop - hadoop-hdfs - 2.7.1 - - - org.scala-lang - scala-library - ${scala.binary.version}.5 - provided - - - org.apache.spark - spark-core_${scala.binary.version} - 1.6.1 - provided - - - org.apache.spark - spark-mllib_${scala.binary.version} - 1.6.1 - provided - - - com.github.scopt - scopt_${scala.binary.version} - 3.3.0 - - - org.scala-lang - scala-library - - - - - com.google.guava - guava - 19.0 - - - - diff --git a/node2vec_spark/src/main/resources/log4j2.properties b/node2vec_spark/src/main/resources/log4j2.properties deleted file mode 100644 index d941e1b..0000000 --- a/node2vec_spark/src/main/resources/log4j2.properties +++ /dev/null @@ -1,9 +0,0 @@ - -appender.out.type = Console -appender.out.name = out -appender.out.layout.type = PatternLayout -appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n -logger.springframework.name = org.springframework -logger.springframework.level = WARN -rootLogger.level = INFO -rootLogger.appenderRef.out.ref = out diff --git a/node2vec_spark/src/main/scala/com/navercorp/Main.scala b/node2vec_spark/src/main/scala/com/navercorp/Main.scala deleted file mode 100644 index f3494e5..0000000 --- a/node2vec_spark/src/main/scala/com/navercorp/Main.scala +++ /dev/null @@ -1,119 +0,0 @@ -package com.navercorp - -import java.io.Serializable -import org.apache.spark.{SparkContext, SparkConf} -import scopt.OptionParser -import com.navercorp.lib.AbstractParams - -object Main { - object Command extends Enumeration { - type Command = Value - val node2vec, randomwalk, embedding = Value - } - import Command._ - - case class Params(iter: Int = 10, - lr: Double = 0.025, - numPartition: Int = 10, - dim: Int = 128, - window: Int = 10, - walkLength: Int = 80, - numWalks: Int = 10, - p: Double = 1.0, - q: Double = 1.0, - weighted: Boolean = true, - directed: Boolean = false, - degree: Int = 30, - indexed: Boolean = true, - nodePath: String = null, - input: String = null, - output: String = null, - cmd: Command = Command.node2vec) extends AbstractParams[Params] with Serializable - val defaultParams = Params() - - val parser = new OptionParser[Params]("Node2Vec_Spark") { - head("Main") - opt[Int]("walkLength") - .text(s"walkLength: ${defaultParams.walkLength}") - .action((x, c) => c.copy(walkLength = x)) - opt[Int]("numWalks") - .text(s"numWalks: ${defaultParams.numWalks}") - .action((x, c) => c.copy(numWalks = x)) - opt[Double]("p") - .text(s"return parameter p: ${defaultParams.p}") - .action((x, c) => c.copy(p = x)) - opt[Double]("q") - .text(s"in-out parameter q: ${defaultParams.q}") - .action((x, c) => c.copy(q = x)) - opt[Boolean]("weighted") - .text(s"weighted: ${defaultParams.weighted}") - .action((x, c) => c.copy(weighted = x)) - opt[Boolean]("directed") - .text(s"directed: ${defaultParams.directed}") - .action((x, c) => c.copy(directed = x)) - opt[Int]("degree") - .text(s"degree: ${defaultParams.degree}") - .action((x, c) => c.copy(degree = x)) - opt[Boolean]("indexed") - .text(s"Whether nodes are indexed or not: ${defaultParams.indexed}") - .action((x, c) => c.copy(indexed = x)) - opt[String]("nodePath") - .text("Input node2index file path: empty") - .action((x, c) => c.copy(nodePath = x)) - opt[String]("input") - .required() - .text("Input edge file path: empty") - .action((x, c) => c.copy(input = x)) - opt[String]("output") - .required() - .text("Output path: empty") - .action((x, c) => c.copy(output = x)) - opt[String]("cmd") - .required() - .text(s"command: ${defaultParams.cmd.toString}") - .action((x, c) => c.copy(cmd = Command.withName(x))) - note( - """ - |For example, the following command runs this app on a synthetic dataset: - | - | bin/spark-submit --class com.nhn.sunny.vegapunk.ml.model.Node2vec \ - """.stripMargin + - s"| --lr ${defaultParams.lr}" + - s"| --iter ${defaultParams.iter}" + - s"| --numPartition ${defaultParams.numPartition}" + - s"| --dim ${defaultParams.dim}" + - s"| --window ${defaultParams.window}" + - s"| --input " + - s"| --node " + - s"| --output " - ) - } - - def main(args: Array[String]) = { - parser.parse(args, defaultParams).map { param => - val conf = new SparkConf().setAppName("Node2Vec") - val context: SparkContext = new SparkContext(conf) - - Node2vec.setup(context, param) - - param.cmd match { - case Command.node2vec => Node2vec.load() - .initTransitionProb() - .randomWalk() - .embedding() - .save() - case Command.randomwalk => Node2vec.load() - .initTransitionProb() - .randomWalk() - .saveRandomPath() - case Command.embedding => { - val randomPaths = Word2vec.setup(context, param).read(param.input) - Word2vec.fit(randomPaths).save(param.output) - Node2vec.loadNode2Id(param.nodePath).saveVectors() - } - } - } getOrElse { - sys.exit(1) - } - } -} diff --git a/node2vec_spark/src/main/scala/com/navercorp/Node2vec.scala b/node2vec_spark/src/main/scala/com/navercorp/Node2vec.scala deleted file mode 100644 index 07ec21a..0000000 --- a/node2vec_spark/src/main/scala/com/navercorp/Node2vec.scala +++ /dev/null @@ -1,281 +0,0 @@ -package com.navercorp - - -import java.io.Serializable -import scala.util.Try -import scala.collection.mutable.ArrayBuffer -import org.slf4j.{Logger, LoggerFactory} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.graphx.{EdgeTriplet, Graph, _} -import com.navercorp.graph.{GraphOps, EdgeAttr, NodeAttr} - -object Node2vec extends Serializable { - lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName); - - var context: SparkContext = null - var config: Main.Params = null - var node2id: RDD[(String, Long)] = null - var indexedEdges: RDD[Edge[EdgeAttr]] = _ - var indexedNodes: RDD[(VertexId, NodeAttr)] = _ - var graph: Graph[NodeAttr, EdgeAttr] = _ - var randomWalkPaths: RDD[(Long, ArrayBuffer[Long])] = null - - def setup(context: SparkContext, param: Main.Params): this.type = { - this.context = context - this.config = param - - this - } - - def load(): this.type = { - val bcMaxDegree = context.broadcast(config.degree) - val bcEdgeCreator = config.directed match { - case true => context.broadcast(GraphOps.createDirectedEdge) - case false => context.broadcast(GraphOps.createUndirectedEdge) - } - - val inputTriplets: RDD[(Long, Long, Double)] = config.indexed match { - case true => readIndexedGraph(config.input) - case false => indexingGraph(config.input) - } - - indexedNodes = inputTriplets.flatMap { case (srcId, dstId, weight) => - bcEdgeCreator.value.apply(srcId, dstId, weight) - }.reduceByKey(_++_).map { case (nodeId, neighbors: Array[(VertexId, Double)]) => - var neighbors_ = neighbors - if (neighbors_.length > bcMaxDegree.value) { - neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value) - } - - (nodeId, NodeAttr(neighbors = neighbors_.distinct)) - }.repartition(200).cache - - indexedEdges = indexedNodes.flatMap { case (srcId, clickNode) => - clickNode.neighbors.map { case (dstId, weight) => - Edge(srcId, dstId, EdgeAttr()) - } - }.repartition(200).cache - - this - } - - def initTransitionProb(): this.type = { - val bcP = context.broadcast(config.p) - val bcQ = context.broadcast(config.q) - - graph = Graph(indexedNodes, indexedEdges) - .mapVertices[NodeAttr] { case (vertexId, clickNode) => - val (j, q) = GraphOps.setupAlias(clickNode.neighbors) - val nextNodeIndex = GraphOps.drawAlias(j, q) - clickNode.path = Array(vertexId, clickNode.neighbors(nextNodeIndex)._1) - - clickNode - } - .mapTriplets { edgeTriplet: EdgeTriplet[NodeAttr, EdgeAttr] => - val (j, q) = GraphOps.setupEdgeAlias(bcP.value, bcQ.value)(edgeTriplet.srcId, edgeTriplet.srcAttr.neighbors, edgeTriplet.dstAttr.neighbors) - edgeTriplet.attr.J = j - edgeTriplet.attr.q = q - edgeTriplet.attr.dstNeighbors = edgeTriplet.dstAttr.neighbors.map(_._1) - - edgeTriplet.attr - }.cache - - this - } - - def randomWalk(): this.type = { - val edge2attr = graph.triplets.map { edgeTriplet => - (s"${edgeTriplet.srcId}${edgeTriplet.dstId}", edgeTriplet.attr) - }.repartition(200).cache - edge2attr.first - - for (iter <- 0 until config.numWalks) { - var prevWalk: RDD[(Long, ArrayBuffer[Long])] = null - var randomWalk = graph.vertices.map { case (nodeId, clickNode) => - val pathBuffer = new ArrayBuffer[Long]() - pathBuffer.append(clickNode.path:_*) - (nodeId, pathBuffer) - }.cache - var activeWalks = randomWalk.first - graph.unpersist(blocking = false) - graph.edges.unpersist(blocking = false) - for (walkCount <- 0 until config.walkLength) { - prevWalk = randomWalk - randomWalk = randomWalk.map { case (srcNodeId, pathBuffer) => - val prevNodeId = pathBuffer(pathBuffer.length - 2) - val currentNodeId = pathBuffer.last - - (s"$prevNodeId$currentNodeId", (srcNodeId, pathBuffer)) - }.join(edge2attr).map { case (edge, ((srcNodeId, pathBuffer), attr)) => - try { - val nextNodeIndex = GraphOps.drawAlias(attr.J, attr.q) - val nextNodeId = attr.dstNeighbors(nextNodeIndex) - pathBuffer.append(nextNodeId) - - (srcNodeId, pathBuffer) - } catch { - case e: Exception => throw new RuntimeException(e.getMessage) - } - }.cache - - activeWalks = randomWalk.first() - prevWalk.unpersist(blocking=false) - } - - - if (randomWalkPaths != null) { - val prevRandomWalkPaths = randomWalkPaths - randomWalkPaths = randomWalkPaths.union(randomWalk).cache() - randomWalkPaths.first - prevRandomWalkPaths.unpersist(blocking = false) - } else { - randomWalkPaths = randomWalk - } - } - - this - } - - def embedding(): this.type = { - val randomPaths = randomWalkPaths.map { case (vertexId, pathBuffer) => - Try(pathBuffer.map(_.toString).toIterable).getOrElse(null) - }.filter(_!=null) - - Word2vec.setup(context, config).fit(randomPaths) - - this - } - - def save(): this.type = { - this.saveRandomPath() - .saveModel() - .saveVectors() - } - - def saveRandomPath(): this.type = { - randomWalkPaths - .map { case (vertexId, pathBuffer) => - Try(pathBuffer.mkString("\t")).getOrElse(null) - } - .filter(x => x != null && x.replaceAll("\\s", "").length > 0) - .repartition(200) - .saveAsTextFile(config.output) - - this - } - - def saveModel(): this.type = { - Word2vec.save(config.output) - - this - } - - def saveVectors(): this.type = { - val node2vector = context.parallelize(Word2vec.getVectors.toList) - .map { case (nodeId, vector) => - (nodeId.toLong, vector.mkString(",")) - } - - if (this.node2id != null) { - val id2Node = this.node2id.map{ case (strNode, index) => - (index, strNode) - } - - node2vector.join(id2Node) - .map { case (nodeId, (vector, name)) => s"$name\t$vector" } - .repartition(200) - .saveAsTextFile(s"${config.output}.emb") - } else { - node2vector.map { case (nodeId, vector) => s"$nodeId\t$vector" } - .repartition(200) - .saveAsTextFile(s"${config.output}.emb") - } - - this - } - - def cleanup(): this.type = { - node2id.unpersist(blocking = false) - indexedEdges.unpersist(blocking = false) - indexedNodes.unpersist(blocking = false) - graph.unpersist(blocking = false) - randomWalkPaths.unpersist(blocking = false) - - this - } - - def loadNode2Id(node2idPath: String): this.type = { - try { - this.node2id = context.textFile(config.nodePath).map { node2index => - val Array(strNode, index) = node2index.split("\\s") - (strNode, index.toLong) - } - } catch { - case e: Exception => logger.info("Failed to read node2index file.") - this.node2id = null - } - - this - } - - def readIndexedGraph(tripletPath: String) = { - val bcWeighted = context.broadcast(config.weighted) - - val rawTriplets = context.textFile(tripletPath) - if (config.nodePath == null) { - this.node2id = createNode2Id(rawTriplets.map { triplet => - val parts = triplet.split("\\s") - (parts.head, parts(1), -1) - }) - } else { - loadNode2Id(config.nodePath) - } - - rawTriplets.map { triplet => - val parts = triplet.split("\\s") - val weight = bcWeighted.value match { - case true => Try(parts.last.toDouble).getOrElse(1.0) - case false => 1.0 - } - - (parts.head.toLong, parts(1).toLong, weight) - } - } - - - def indexingGraph(rawTripletPath: String): RDD[(Long, Long, Double)] = { - val rawEdges = context.textFile(rawTripletPath).map { triplet => - val parts = triplet.split("\\s") - - Try { - (parts.head, parts(1), Try(parts.last.toDouble).getOrElse(1.0)) - }.getOrElse(null) - }.filter(_!=null) - - this.node2id = createNode2Id(rawEdges) - - rawEdges.map { case (src, dst, weight) => - (src, (dst, weight)) - }.join(node2id).map { case (src, (edge: (String, Double), srcIndex: Long)) => - try { - val (dst: String, weight: Double) = edge - (dst, (srcIndex, weight)) - } catch { - case e: Exception => null - } - }.filter(_!=null).join(node2id).map { case (dst, (edge: (Long, Double), dstIndex: Long)) => - try { - val (srcIndex, weight) = edge - (srcIndex, dstIndex, weight) - } catch { - case e: Exception => null - } - }.filter(_!=null) - } - - def createNode2Id[T <: Any](triplets: RDD[(String, String, T)]) = triplets.flatMap { case (src, dst, weight) => - Try(Array(src, dst)).getOrElse(Array.empty[String]) - }.distinct().zipWithIndex() - -} diff --git a/node2vec_spark/src/main/scala/com/navercorp/Word2vec.scala b/node2vec_spark/src/main/scala/com/navercorp/Word2vec.scala deleted file mode 100644 index aa209cf..0000000 --- a/node2vec_spark/src/main/scala/com/navercorp/Word2vec.scala +++ /dev/null @@ -1,55 +0,0 @@ -package com.navercorp - -import org.apache.spark.SparkContext -import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} -import org.apache.spark.rdd.RDD - -object Word2vec extends Serializable { - var context: SparkContext = null - var word2vec = new Word2Vec() - var model: Word2VecModel = null - - def setup(context: SparkContext, param: Main.Params): this.type = { - this.context = context - /** - * model = sg - * update = hs - */ - word2vec.setLearningRate(param.lr) - .setNumIterations(param.iter) - .setNumPartitions(param.numPartition) - .setMinCount(0) - .setVectorSize(param.dim) - - val word2vecWindowField = word2vec.getClass.getDeclaredField("org$apache$spark$mllib$feature$Word2Vec$$window") - word2vecWindowField.setAccessible(true) - word2vecWindowField.setInt(word2vec, param.window) - - this - } - - def read(path: String): RDD[Iterable[String]] = { - context.textFile(path).repartition(200).map(_.split("\\s").toSeq) - } - - def fit(input: RDD[Iterable[String]]): this.type = { - model = word2vec.fit(input) - - this - } - - def save(outputPath: String): this.type = { - model.save(context, s"$outputPath.bin") - this - } - - def load(path: String): this.type = { - model = Word2VecModel.load(context, path) - - this - } - - def getVectors = this.model.getVectors - -} - diff --git a/node2vec_spark/src/main/scala/com/navercorp/graph/GraphOps.scala b/node2vec_spark/src/main/scala/com/navercorp/graph/GraphOps.scala deleted file mode 100644 index 960fa8c..0000000 --- a/node2vec_spark/src/main/scala/com/navercorp/graph/GraphOps.scala +++ /dev/null @@ -1,69 +0,0 @@ -package com.navercorp.graph - -import scala.collection.mutable.ArrayBuffer - -object GraphOps { - def setupAlias(nodeWeights: Array[(Long, Double)]): (Array[Int], Array[Double]) = { - val K = nodeWeights.length - val J = Array.fill(K)(0) - val q = Array.fill(K)(0.0) - - val smaller = new ArrayBuffer[Int]() - val larger = new ArrayBuffer[Int]() - - val sum = nodeWeights.map(_._2).sum - nodeWeights.zipWithIndex.foreach { case ((nodeId, weight), i) => - q(i) = K * weight / sum - if (q(i) < 1.0) { - smaller.append(i) - } else { - larger.append(i) - } - } - - while (smaller.nonEmpty && larger.nonEmpty) { - val small = smaller.remove(smaller.length - 1) - val large = larger.remove(larger.length - 1) - - J(small) = large - q(large) = q(large) + q(small) - 1.0 - if (q(large) < 1.0) smaller.append(large) - else larger.append(large) - } - - (J, q) - } - - def setupEdgeAlias(p: Double = 1.0, q: Double = 1.0)(srcId: Long, srcNeighbors: Array[(Long, Double)], dstNeighbors: Array[(Long, Double)]): (Array[Int], Array[Double]) = { - val neighbors_ = dstNeighbors.map { case (dstNeighborId, weight) => - var unnormProb = weight / q - if (srcId == dstNeighborId) unnormProb = weight / p - else if (srcNeighbors.exists(_._1 == dstNeighborId)) unnormProb = weight - - (dstNeighborId, unnormProb) - } - - setupAlias(neighbors_) - } - - def drawAlias(J: Array[Int], q: Array[Double]): Int = { - val K = J.length - val kk = math.floor(math.random * K).toInt - - if (math.random < q(kk)) kk - else J(kk) - } - - lazy val createUndirectedEdge = (srcId: Long, dstId: Long, weight: Double) => { - Array( - (srcId, Array((dstId, weight))), - (dstId, Array((srcId, weight))) - ) - } - - lazy val createDirectedEdge = (srcId: Long, dstId: Long, weight: Double) => { - Array( - (srcId, Array((dstId, weight))) - ) - } -} diff --git a/node2vec_spark/src/main/scala/com/navercorp/graph/package.scala b/node2vec_spark/src/main/scala/com/navercorp/graph/package.scala deleted file mode 100644 index 1b83969..0000000 --- a/node2vec_spark/src/main/scala/com/navercorp/graph/package.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.navercorp - -import java.io.Serializable - -package object graph { - case class NodeAttr(var neighbors: Array[(Long, Double)] = Array.empty[(Long, Double)], - var path: Array[Long] = Array.empty[Long]) extends Serializable - - case class EdgeAttr(var dstNeighbors: Array[Long] = Array.empty[Long], - var J: Array[Int] = Array.empty[Int], - var q: Array[Double] = Array.empty[Double]) extends Serializable -} diff --git a/node2vec_spark/src/main/scala/com/navercorp/lib/AbstractParams.scala b/node2vec_spark/src/main/scala/com/navercorp/lib/AbstractParams.scala deleted file mode 100644 index 0790ab9..0000000 --- a/node2vec_spark/src/main/scala/com/navercorp/lib/AbstractParams.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.navercorp.lib - -import scala.reflect.runtime.universe._ - -/** - * Abstract class for parameter case classes. - * This overrides the [[toString]] method to print all case class fields by name and value. - * @tparam T Concrete parameter class. - */ -abstract class AbstractParams[T: TypeTag] { - - private def tag: TypeTag[T] = typeTag[T] - - /** - * Finds all case class fields in concrete class instance, and outputs them in JSON-style format: - * { - * [field name]:\t[field value]\n - * [field name]:\t[field value]\n - * ... - * } - */ - override def toString: String = { - val tpe = tag.tpe - val allAccessors = tpe.declarations.collect { - case m: MethodSymbol if m.isCaseAccessor => m - } - val mirror = runtimeMirror(getClass.getClassLoader) - val instanceMirror = mirror.reflect(this) - allAccessors.map { f => - val paramName = f.name.toString - val fieldMirror = instanceMirror.reflectField(f) - val paramValue = fieldMirror.get - s" $paramName:\t$paramValue" - }.mkString("{\n", ",\n", "\n}") - } -} \ No newline at end of file diff --git a/src/main.py b/src/main.py deleted file mode 100644 index 82ac735..0000000 --- a/src/main.py +++ /dev/null @@ -1,104 +0,0 @@ -''' -Reference implementation of node2vec. - -Author: Aditya Grover - -For more details, refer to the paper: -node2vec: Scalable Feature Learning for Networks -Aditya Grover and Jure Leskovec -Knowledge Discovery and Data Mining (KDD), 2016 -''' - -import argparse -import numpy as np -import networkx as nx -import node2vec -from gensim.models import Word2Vec - -def parse_args(): - ''' - Parses the node2vec arguments. - ''' - parser = argparse.ArgumentParser(description="Run node2vec.") - - parser.add_argument('--input', nargs='?', default='graph/karate.edgelist', - help='Input graph path') - - parser.add_argument('--output', nargs='?', default='emb/karate.emb', - help='Embeddings path') - - parser.add_argument('--dimensions', type=int, default=128, - help='Number of dimensions. Default is 128.') - - parser.add_argument('--walk-length', type=int, default=80, - help='Length of walk per source. Default is 80.') - - parser.add_argument('--num-walks', type=int, default=10, - help='Number of walks per source. Default is 10.') - - parser.add_argument('--window-size', type=int, default=10, - help='Context size for optimization. Default is 10.') - - parser.add_argument('--iter', default=1, type=int, - help='Number of epochs in SGD') - - parser.add_argument('--workers', type=int, default=8, - help='Number of parallel workers. Default is 8.') - - parser.add_argument('--p', type=float, default=1, - help='Return hyperparameter. Default is 1.') - - parser.add_argument('--q', type=float, default=1, - help='Inout hyperparameter. Default is 1.') - - parser.add_argument('--weighted', dest='weighted', action='store_true', - help='Boolean specifying (un)weighted. Default is unweighted.') - parser.add_argument('--unweighted', dest='unweighted', action='store_false') - parser.set_defaults(weighted=False) - - parser.add_argument('--directed', dest='directed', action='store_true', - help='Graph is (un)directed. Default is undirected.') - parser.add_argument('--undirected', dest='undirected', action='store_false') - parser.set_defaults(directed=False) - - return parser.parse_args() - -def read_graph(): - ''' - Reads the input network in networkx. - ''' - if args.weighted: - G = nx.read_edgelist(args.input, nodetype=int, data=(('weight',float),), create_using=nx.DiGraph()) - else: - G = nx.read_edgelist(args.input, nodetype=int, create_using=nx.DiGraph()) - for edge in G.edges(): - G[edge[0]][edge[1]]['weight'] = 1 - - if not args.directed: - G = G.to_undirected() - - return G - -def learn_embeddings(walks): - ''' - Learn embeddings by optimizing the Skipgram objective using SGD. - ''' - walks = [map(str, walk) for walk in walks] - model = Word2Vec(walks, size=args.dimensions, window=args.window_size, min_count=0, sg=1, workers=args.workers, iter=args.iter) - model.save_word2vec_format(args.output) - - return - -def main(args): - ''' - Pipeline for representational learning for all nodes in a graph. - ''' - nx_G = read_graph() - G = node2vec.Graph(nx_G, args.directed, args.p, args.q) - G.preprocess_transition_probs() - walks = G.simulate_walks(args.num_walks, args.walk_length) - learn_embeddings(walks) - -if __name__ == "__main__": - args = parse_args() - main(args) diff --git a/src/node2vec.py b/src/node2vec.py deleted file mode 100644 index 0293411..0000000 --- a/src/node2vec.py +++ /dev/null @@ -1,149 +0,0 @@ -import numpy as np -import networkx as nx -import random - - -class Graph(): - def __init__(self, nx_G, is_directed, p, q): - self.G = nx_G - self.is_directed = is_directed - self.p = p - self.q = q - - def node2vec_walk(self, walk_length, start_node): - ''' - Simulate a random walk starting from start node. - ''' - G = self.G - alias_nodes = self.alias_nodes - alias_edges = self.alias_edges - - walk = [start_node] - - while len(walk) < walk_length: - cur = walk[-1] - cur_nbrs = sorted(G.neighbors(cur)) - if len(cur_nbrs) > 0: - if len(walk) == 1: - walk.append(cur_nbrs[alias_draw(alias_nodes[cur][0], alias_nodes[cur][1])]) - else: - prev = walk[-2] - next = cur_nbrs[alias_draw(alias_edges[(prev, cur)][0], - alias_edges[(prev, cur)][1])] - walk.append(next) - else: - break - - return walk - - def simulate_walks(self, num_walks, walk_length): - ''' - Repeatedly simulate random walks from each node. - ''' - G = self.G - walks = [] - nodes = list(G.nodes()) - print 'Walk iteration:' - for walk_iter in range(num_walks): - print str(walk_iter+1), '/', str(num_walks) - random.shuffle(nodes) - for node in nodes: - walks.append(self.node2vec_walk(walk_length=walk_length, start_node=node)) - - return walks - - def get_alias_edge(self, src, dst): - ''' - Get the alias edge setup lists for a given edge. - ''' - G = self.G - p = self.p - q = self.q - - unnormalized_probs = [] - for dst_nbr in sorted(G.neighbors(dst)): - if dst_nbr == src: - unnormalized_probs.append(G[dst][dst_nbr]['weight']/p) - elif G.has_edge(dst_nbr, src): - unnormalized_probs.append(G[dst][dst_nbr]['weight']) - else: - unnormalized_probs.append(G[dst][dst_nbr]['weight']/q) - norm_const = sum(unnormalized_probs) - normalized_probs = [float(u_prob)/norm_const for u_prob in unnormalized_probs] - - return alias_setup(normalized_probs) - - def preprocess_transition_probs(self): - ''' - Preprocessing of transition probabilities for guiding the random walks. - ''' - G = self.G - is_directed = self.is_directed - - alias_nodes = {} - for node in G.nodes(): - unnormalized_probs = [G[node][nbr]['weight'] for nbr in sorted(G.neighbors(node))] - norm_const = sum(unnormalized_probs) - normalized_probs = [float(u_prob)/norm_const for u_prob in unnormalized_probs] - alias_nodes[node] = alias_setup(normalized_probs) - - alias_edges = {} - triads = {} - - if is_directed: - for edge in G.edges(): - alias_edges[edge] = self.get_alias_edge(edge[0], edge[1]) - else: - for edge in G.edges(): - alias_edges[edge] = self.get_alias_edge(edge[0], edge[1]) - alias_edges[(edge[1], edge[0])] = self.get_alias_edge(edge[1], edge[0]) - - self.alias_nodes = alias_nodes - self.alias_edges = alias_edges - - return - - -def alias_setup(probs): - ''' - Compute utility lists for non-uniform sampling from discrete distributions. - Refer to https://hips.seas.harvard.edu/blog/2013/03/03/the-alias-method-efficient-sampling-with-many-discrete-outcomes/ - for details - ''' - K = len(probs) - q = np.zeros(K) - J = np.zeros(K, dtype=np.int) - - smaller = [] - larger = [] - for kk, prob in enumerate(probs): - q[kk] = K*prob - if q[kk] < 1.0: - smaller.append(kk) - else: - larger.append(kk) - - while len(smaller) > 0 and len(larger) > 0: - small = smaller.pop() - large = larger.pop() - - J[small] = large - q[large] = q[large] + q[small] - 1.0 - if q[large] < 1.0: - smaller.append(large) - else: - larger.append(large) - - return J, q - -def alias_draw(J, q): - ''' - Draw sample from a non-uniform discrete distribution using alias sampling. - ''' - K = len(J) - - kk = int(np.floor(np.random.rand()*K)) - if np.random.rand() < q[kk]: - return kk - else: - return J[kk] \ No newline at end of file