diff --git a/.gitignore b/.gitignore index 0d20b64..69fa6ec 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,14 @@ *.pyc +.DS_Store +target +bin +build +.gradle +*.iml +*.ipr +*.iws +*.log +.classpath +.project +.settings +.idea \ No newline at end of file diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..af4a91f --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Aditya Grover + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/node2vec_spark/README.md b/node2vec_spark/README.md new file mode 100644 index 0000000..43f6684 --- /dev/null +++ b/node2vec_spark/README.md @@ -0,0 +1,139 @@ +# node2vec on spark + +This library is a implementation using scala for running on spark of *node2vec* as described in the paper: +> node2vec: Scalable Feature Learning for Networks. +> Aditya Grover and Jure Leskovec. +> Knowledge Discovery and Data Mining, 2016. +> + +The *node2vec* algorithm learns continuous representations for nodes in any (un)directed, (un)weighted graph. Please check the [project page](https://snap.stanford.edu/node2vec/) for more details. + + +### Building node2vec_spark +**In order to build node2vec_spark, use the following:** + +``` +$ git clone https://github.com/Skarface-/node2vec.git +$ mvn clean package +``` + +**and requires:**
+Maven 3.0.5 or newer
+Java 7+
+Scala 2.10 or newer. + +This will produce jar file in "node2vec_spark/target/" + +### Examples +This library has two functions: *randomwalk* and *embedding*.
+These were described in these papers [node2vec: Scalable Feature Learning for Networks](http://arxiv.org/abs/1607.00653) and [Efficient Estimation of Word Representations in Vector Space](https://arxiv.org/abs/1301.3781). + +### Random walk +Example: + + ./spark-submit --class com.navercorp.Main \ + ./node2vec_spark/target/node2vec-0.0.1-SNAPSHOT.jar \ + --cmd randomwalk --p 100.0 --q 100.0 --walkLength 40 \ + --input --output + +#### Options +Invoke a command without arguments to list available arguments and their default values: + +``` +--cmd COMMAND + Functions: randomwalk or embedding. If you want to execute all functions "randomwalk" and "embedding" sequentially input "node2vec". Default "node2vec" +--input [INPUT] + Input edgelist path. The supported input format is an edgelist: "node1_id_int node2_id_int " +--output [OUTPUT] + Random paths path. +--walkLength WALK_LENGTH + Length of walk per source. Default is 80. +--numWalks NUM_WALKS + Number of walks per source. Default is 10. +--p P + Return hyperparaemter. Default is 1.0. +--q Q + Inout hyperparameter. Default is 1.0. +--weighted Boolean + Specifying (un)weighted. Default is true. +--directed Boolean + Specifying (un)directed. Default is false. +--degree UPPER_BOUND_OF_NUMBER_OF_NEIGHBORS + Specifying upper bound of number of neighbors. Default is 30. +--indexed Boolean + Specifying whether nodes in edgelist are indexed or not. Default is true. +``` + +* If "indexed" is set to false, *node2vec_spark* index nodes in input edgelist, example:
+ **unindexed edgelist:**
+ node1 node2 1.0
+ node2 node7 1.0
+ + **indexed:**
+ 1 2 1.0
+ 2 3 1.0
+ + 1 node1
+ 2 node2
+ 3 node7 + +#### Input +The supported input format is an edgelist: + + node1_id_int node2_id_int + or + node1_str node2_str , Please set the option "indexed" to false + + +#### Output +The output file (number of nodes)*numWalks random paths as follows: + + src_node_id_int node1_id_int node2_id_int ... noden_id_int + + +### Embedding random paths +Example: + + ./spark-submit --class com.navercorp.Main \ + ./node2vec_spark/target/node2vec-0.0.1-SNAPSHOT.jar \ + --cmd embedding --dim 50 --iter 20 \ + --input --nodePath --output + +#### Options +Invoke a command without arguments to list available arguments and their default values: + +``` +--cmd COMMAND + embedding. If you want to execute sequentially all functions: "randomwalk" and "embedding", input "node2vec". default "node2vec" +--input [INPUT] + Input random paths. The supported input format is an random paths: "src_node_id_int node1_id_int ... noden_id_int" +--output [OUTPUT] + word2vec model(.bin) and embeddings(.emb). +--nodePath [NODE\_PATH] + Input node2index path. The supported input format: "node1_str node1_id_int" +--iter ITERATION + Number of epochs in SGD. Default 10. +--dim DIMENSION + Number of dimensions. Default is 128. +--window WINDOW_SIZE + Context size for optimization. Default is 10. + +``` + +#### Input +The supported input format is an random paths: + + src_node_id_int node1_id_int ... noden_id_int + +#### Output +The output files are **embeddings and word2vec model.** The embeddings file has the following format: + + node1_str dim1 dim2 ... dimd + +where dim1, ... , dimd is the d-dimensional representation learned by word2vec. + +the output file *word2vec model* has the spark word2vec model format. please reference to https://spark.apache.org/docs/1.5.2/mllib-feature-extraction.html#word2vec + +## References +1. [node2vec: Scalable Feature Learning for Networks](http://arxiv.org/abs/1607.00653) +2. [Efficient Estimation of Word Representations in Vector Space](https://arxiv.org/abs/1301.3781) \ No newline at end of file diff --git a/node2vec_spark/pom.xml b/node2vec_spark/pom.xml new file mode 100644 index 0000000..b958576 --- /dev/null +++ b/node2vec_spark/pom.xml @@ -0,0 +1,129 @@ + + + + 4.0.0 + + com.navercorp + node2vec + jar + 0.0.1-SNAPSHOT + + node2vec_spark + http://snap.stanford.edu/node2vec/ + + + UTF-8 + bin + 2.4.3 + 1.4.0 + 1.7 + 2.10 + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 2.4 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/lib + + + + + + org.apache.maven.plugins + maven-shade-plugin + 1.6 + + + package + + shade + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.7 + 1.7 + UTF-8 + + + + org.apache.maven.plugins + maven-surefire-plugin + + false + + + + + + + + org.apache.hadoop + hadoop-hdfs + 2.7.1 + + + org.scala-lang + scala-library + ${scala.binary.version}.5 + provided + + + org.apache.spark + spark-core_${scala.binary.version} + 1.6.1 + provided + + + org.apache.spark + spark-mllib_${scala.binary.version} + 1.6.1 + provided + + + com.github.scopt + scopt_${scala.binary.version} + 3.3.0 + + + org.scala-lang + scala-library + + + + + com.google.guava + guava + 19.0 + + + + diff --git a/node2vec_spark/src/main/resources/log4j2.properties b/node2vec_spark/src/main/resources/log4j2.properties new file mode 100644 index 0000000..d941e1b --- /dev/null +++ b/node2vec_spark/src/main/resources/log4j2.properties @@ -0,0 +1,9 @@ + +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n +logger.springframework.name = org.springframework +logger.springframework.level = WARN +rootLogger.level = INFO +rootLogger.appenderRef.out.ref = out diff --git a/node2vec_spark/src/main/scala/com/navercorp/Main.scala b/node2vec_spark/src/main/scala/com/navercorp/Main.scala new file mode 100644 index 0000000..f3494e5 --- /dev/null +++ b/node2vec_spark/src/main/scala/com/navercorp/Main.scala @@ -0,0 +1,119 @@ +package com.navercorp + +import java.io.Serializable +import org.apache.spark.{SparkContext, SparkConf} +import scopt.OptionParser +import com.navercorp.lib.AbstractParams + +object Main { + object Command extends Enumeration { + type Command = Value + val node2vec, randomwalk, embedding = Value + } + import Command._ + + case class Params(iter: Int = 10, + lr: Double = 0.025, + numPartition: Int = 10, + dim: Int = 128, + window: Int = 10, + walkLength: Int = 80, + numWalks: Int = 10, + p: Double = 1.0, + q: Double = 1.0, + weighted: Boolean = true, + directed: Boolean = false, + degree: Int = 30, + indexed: Boolean = true, + nodePath: String = null, + input: String = null, + output: String = null, + cmd: Command = Command.node2vec) extends AbstractParams[Params] with Serializable + val defaultParams = Params() + + val parser = new OptionParser[Params]("Node2Vec_Spark") { + head("Main") + opt[Int]("walkLength") + .text(s"walkLength: ${defaultParams.walkLength}") + .action((x, c) => c.copy(walkLength = x)) + opt[Int]("numWalks") + .text(s"numWalks: ${defaultParams.numWalks}") + .action((x, c) => c.copy(numWalks = x)) + opt[Double]("p") + .text(s"return parameter p: ${defaultParams.p}") + .action((x, c) => c.copy(p = x)) + opt[Double]("q") + .text(s"in-out parameter q: ${defaultParams.q}") + .action((x, c) => c.copy(q = x)) + opt[Boolean]("weighted") + .text(s"weighted: ${defaultParams.weighted}") + .action((x, c) => c.copy(weighted = x)) + opt[Boolean]("directed") + .text(s"directed: ${defaultParams.directed}") + .action((x, c) => c.copy(directed = x)) + opt[Int]("degree") + .text(s"degree: ${defaultParams.degree}") + .action((x, c) => c.copy(degree = x)) + opt[Boolean]("indexed") + .text(s"Whether nodes are indexed or not: ${defaultParams.indexed}") + .action((x, c) => c.copy(indexed = x)) + opt[String]("nodePath") + .text("Input node2index file path: empty") + .action((x, c) => c.copy(nodePath = x)) + opt[String]("input") + .required() + .text("Input edge file path: empty") + .action((x, c) => c.copy(input = x)) + opt[String]("output") + .required() + .text("Output path: empty") + .action((x, c) => c.copy(output = x)) + opt[String]("cmd") + .required() + .text(s"command: ${defaultParams.cmd.toString}") + .action((x, c) => c.copy(cmd = Command.withName(x))) + note( + """ + |For example, the following command runs this app on a synthetic dataset: + | + | bin/spark-submit --class com.nhn.sunny.vegapunk.ml.model.Node2vec \ + """.stripMargin + + s"| --lr ${defaultParams.lr}" + + s"| --iter ${defaultParams.iter}" + + s"| --numPartition ${defaultParams.numPartition}" + + s"| --dim ${defaultParams.dim}" + + s"| --window ${defaultParams.window}" + + s"| --input " + + s"| --node " + + s"| --output " + ) + } + + def main(args: Array[String]) = { + parser.parse(args, defaultParams).map { param => + val conf = new SparkConf().setAppName("Node2Vec") + val context: SparkContext = new SparkContext(conf) + + Node2vec.setup(context, param) + + param.cmd match { + case Command.node2vec => Node2vec.load() + .initTransitionProb() + .randomWalk() + .embedding() + .save() + case Command.randomwalk => Node2vec.load() + .initTransitionProb() + .randomWalk() + .saveRandomPath() + case Command.embedding => { + val randomPaths = Word2vec.setup(context, param).read(param.input) + Word2vec.fit(randomPaths).save(param.output) + Node2vec.loadNode2Id(param.nodePath).saveVectors() + } + } + } getOrElse { + sys.exit(1) + } + } +} diff --git a/node2vec_spark/src/main/scala/com/navercorp/Node2vec.scala b/node2vec_spark/src/main/scala/com/navercorp/Node2vec.scala new file mode 100644 index 0000000..07ec21a --- /dev/null +++ b/node2vec_spark/src/main/scala/com/navercorp/Node2vec.scala @@ -0,0 +1,281 @@ +package com.navercorp + + +import java.io.Serializable +import scala.util.Try +import scala.collection.mutable.ArrayBuffer +import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.graphx.{EdgeTriplet, Graph, _} +import com.navercorp.graph.{GraphOps, EdgeAttr, NodeAttr} + +object Node2vec extends Serializable { + lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName); + + var context: SparkContext = null + var config: Main.Params = null + var node2id: RDD[(String, Long)] = null + var indexedEdges: RDD[Edge[EdgeAttr]] = _ + var indexedNodes: RDD[(VertexId, NodeAttr)] = _ + var graph: Graph[NodeAttr, EdgeAttr] = _ + var randomWalkPaths: RDD[(Long, ArrayBuffer[Long])] = null + + def setup(context: SparkContext, param: Main.Params): this.type = { + this.context = context + this.config = param + + this + } + + def load(): this.type = { + val bcMaxDegree = context.broadcast(config.degree) + val bcEdgeCreator = config.directed match { + case true => context.broadcast(GraphOps.createDirectedEdge) + case false => context.broadcast(GraphOps.createUndirectedEdge) + } + + val inputTriplets: RDD[(Long, Long, Double)] = config.indexed match { + case true => readIndexedGraph(config.input) + case false => indexingGraph(config.input) + } + + indexedNodes = inputTriplets.flatMap { case (srcId, dstId, weight) => + bcEdgeCreator.value.apply(srcId, dstId, weight) + }.reduceByKey(_++_).map { case (nodeId, neighbors: Array[(VertexId, Double)]) => + var neighbors_ = neighbors + if (neighbors_.length > bcMaxDegree.value) { + neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value) + } + + (nodeId, NodeAttr(neighbors = neighbors_.distinct)) + }.repartition(200).cache + + indexedEdges = indexedNodes.flatMap { case (srcId, clickNode) => + clickNode.neighbors.map { case (dstId, weight) => + Edge(srcId, dstId, EdgeAttr()) + } + }.repartition(200).cache + + this + } + + def initTransitionProb(): this.type = { + val bcP = context.broadcast(config.p) + val bcQ = context.broadcast(config.q) + + graph = Graph(indexedNodes, indexedEdges) + .mapVertices[NodeAttr] { case (vertexId, clickNode) => + val (j, q) = GraphOps.setupAlias(clickNode.neighbors) + val nextNodeIndex = GraphOps.drawAlias(j, q) + clickNode.path = Array(vertexId, clickNode.neighbors(nextNodeIndex)._1) + + clickNode + } + .mapTriplets { edgeTriplet: EdgeTriplet[NodeAttr, EdgeAttr] => + val (j, q) = GraphOps.setupEdgeAlias(bcP.value, bcQ.value)(edgeTriplet.srcId, edgeTriplet.srcAttr.neighbors, edgeTriplet.dstAttr.neighbors) + edgeTriplet.attr.J = j + edgeTriplet.attr.q = q + edgeTriplet.attr.dstNeighbors = edgeTriplet.dstAttr.neighbors.map(_._1) + + edgeTriplet.attr + }.cache + + this + } + + def randomWalk(): this.type = { + val edge2attr = graph.triplets.map { edgeTriplet => + (s"${edgeTriplet.srcId}${edgeTriplet.dstId}", edgeTriplet.attr) + }.repartition(200).cache + edge2attr.first + + for (iter <- 0 until config.numWalks) { + var prevWalk: RDD[(Long, ArrayBuffer[Long])] = null + var randomWalk = graph.vertices.map { case (nodeId, clickNode) => + val pathBuffer = new ArrayBuffer[Long]() + pathBuffer.append(clickNode.path:_*) + (nodeId, pathBuffer) + }.cache + var activeWalks = randomWalk.first + graph.unpersist(blocking = false) + graph.edges.unpersist(blocking = false) + for (walkCount <- 0 until config.walkLength) { + prevWalk = randomWalk + randomWalk = randomWalk.map { case (srcNodeId, pathBuffer) => + val prevNodeId = pathBuffer(pathBuffer.length - 2) + val currentNodeId = pathBuffer.last + + (s"$prevNodeId$currentNodeId", (srcNodeId, pathBuffer)) + }.join(edge2attr).map { case (edge, ((srcNodeId, pathBuffer), attr)) => + try { + val nextNodeIndex = GraphOps.drawAlias(attr.J, attr.q) + val nextNodeId = attr.dstNeighbors(nextNodeIndex) + pathBuffer.append(nextNodeId) + + (srcNodeId, pathBuffer) + } catch { + case e: Exception => throw new RuntimeException(e.getMessage) + } + }.cache + + activeWalks = randomWalk.first() + prevWalk.unpersist(blocking=false) + } + + + if (randomWalkPaths != null) { + val prevRandomWalkPaths = randomWalkPaths + randomWalkPaths = randomWalkPaths.union(randomWalk).cache() + randomWalkPaths.first + prevRandomWalkPaths.unpersist(blocking = false) + } else { + randomWalkPaths = randomWalk + } + } + + this + } + + def embedding(): this.type = { + val randomPaths = randomWalkPaths.map { case (vertexId, pathBuffer) => + Try(pathBuffer.map(_.toString).toIterable).getOrElse(null) + }.filter(_!=null) + + Word2vec.setup(context, config).fit(randomPaths) + + this + } + + def save(): this.type = { + this.saveRandomPath() + .saveModel() + .saveVectors() + } + + def saveRandomPath(): this.type = { + randomWalkPaths + .map { case (vertexId, pathBuffer) => + Try(pathBuffer.mkString("\t")).getOrElse(null) + } + .filter(x => x != null && x.replaceAll("\\s", "").length > 0) + .repartition(200) + .saveAsTextFile(config.output) + + this + } + + def saveModel(): this.type = { + Word2vec.save(config.output) + + this + } + + def saveVectors(): this.type = { + val node2vector = context.parallelize(Word2vec.getVectors.toList) + .map { case (nodeId, vector) => + (nodeId.toLong, vector.mkString(",")) + } + + if (this.node2id != null) { + val id2Node = this.node2id.map{ case (strNode, index) => + (index, strNode) + } + + node2vector.join(id2Node) + .map { case (nodeId, (vector, name)) => s"$name\t$vector" } + .repartition(200) + .saveAsTextFile(s"${config.output}.emb") + } else { + node2vector.map { case (nodeId, vector) => s"$nodeId\t$vector" } + .repartition(200) + .saveAsTextFile(s"${config.output}.emb") + } + + this + } + + def cleanup(): this.type = { + node2id.unpersist(blocking = false) + indexedEdges.unpersist(blocking = false) + indexedNodes.unpersist(blocking = false) + graph.unpersist(blocking = false) + randomWalkPaths.unpersist(blocking = false) + + this + } + + def loadNode2Id(node2idPath: String): this.type = { + try { + this.node2id = context.textFile(config.nodePath).map { node2index => + val Array(strNode, index) = node2index.split("\\s") + (strNode, index.toLong) + } + } catch { + case e: Exception => logger.info("Failed to read node2index file.") + this.node2id = null + } + + this + } + + def readIndexedGraph(tripletPath: String) = { + val bcWeighted = context.broadcast(config.weighted) + + val rawTriplets = context.textFile(tripletPath) + if (config.nodePath == null) { + this.node2id = createNode2Id(rawTriplets.map { triplet => + val parts = triplet.split("\\s") + (parts.head, parts(1), -1) + }) + } else { + loadNode2Id(config.nodePath) + } + + rawTriplets.map { triplet => + val parts = triplet.split("\\s") + val weight = bcWeighted.value match { + case true => Try(parts.last.toDouble).getOrElse(1.0) + case false => 1.0 + } + + (parts.head.toLong, parts(1).toLong, weight) + } + } + + + def indexingGraph(rawTripletPath: String): RDD[(Long, Long, Double)] = { + val rawEdges = context.textFile(rawTripletPath).map { triplet => + val parts = triplet.split("\\s") + + Try { + (parts.head, parts(1), Try(parts.last.toDouble).getOrElse(1.0)) + }.getOrElse(null) + }.filter(_!=null) + + this.node2id = createNode2Id(rawEdges) + + rawEdges.map { case (src, dst, weight) => + (src, (dst, weight)) + }.join(node2id).map { case (src, (edge: (String, Double), srcIndex: Long)) => + try { + val (dst: String, weight: Double) = edge + (dst, (srcIndex, weight)) + } catch { + case e: Exception => null + } + }.filter(_!=null).join(node2id).map { case (dst, (edge: (Long, Double), dstIndex: Long)) => + try { + val (srcIndex, weight) = edge + (srcIndex, dstIndex, weight) + } catch { + case e: Exception => null + } + }.filter(_!=null) + } + + def createNode2Id[T <: Any](triplets: RDD[(String, String, T)]) = triplets.flatMap { case (src, dst, weight) => + Try(Array(src, dst)).getOrElse(Array.empty[String]) + }.distinct().zipWithIndex() + +} diff --git a/node2vec_spark/src/main/scala/com/navercorp/Word2vec.scala b/node2vec_spark/src/main/scala/com/navercorp/Word2vec.scala new file mode 100644 index 0000000..aa209cf --- /dev/null +++ b/node2vec_spark/src/main/scala/com/navercorp/Word2vec.scala @@ -0,0 +1,55 @@ +package com.navercorp + +import org.apache.spark.SparkContext +import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} +import org.apache.spark.rdd.RDD + +object Word2vec extends Serializable { + var context: SparkContext = null + var word2vec = new Word2Vec() + var model: Word2VecModel = null + + def setup(context: SparkContext, param: Main.Params): this.type = { + this.context = context + /** + * model = sg + * update = hs + */ + word2vec.setLearningRate(param.lr) + .setNumIterations(param.iter) + .setNumPartitions(param.numPartition) + .setMinCount(0) + .setVectorSize(param.dim) + + val word2vecWindowField = word2vec.getClass.getDeclaredField("org$apache$spark$mllib$feature$Word2Vec$$window") + word2vecWindowField.setAccessible(true) + word2vecWindowField.setInt(word2vec, param.window) + + this + } + + def read(path: String): RDD[Iterable[String]] = { + context.textFile(path).repartition(200).map(_.split("\\s").toSeq) + } + + def fit(input: RDD[Iterable[String]]): this.type = { + model = word2vec.fit(input) + + this + } + + def save(outputPath: String): this.type = { + model.save(context, s"$outputPath.bin") + this + } + + def load(path: String): this.type = { + model = Word2VecModel.load(context, path) + + this + } + + def getVectors = this.model.getVectors + +} + diff --git a/node2vec_spark/src/main/scala/com/navercorp/graph/GraphOps.scala b/node2vec_spark/src/main/scala/com/navercorp/graph/GraphOps.scala new file mode 100644 index 0000000..960fa8c --- /dev/null +++ b/node2vec_spark/src/main/scala/com/navercorp/graph/GraphOps.scala @@ -0,0 +1,69 @@ +package com.navercorp.graph + +import scala.collection.mutable.ArrayBuffer + +object GraphOps { + def setupAlias(nodeWeights: Array[(Long, Double)]): (Array[Int], Array[Double]) = { + val K = nodeWeights.length + val J = Array.fill(K)(0) + val q = Array.fill(K)(0.0) + + val smaller = new ArrayBuffer[Int]() + val larger = new ArrayBuffer[Int]() + + val sum = nodeWeights.map(_._2).sum + nodeWeights.zipWithIndex.foreach { case ((nodeId, weight), i) => + q(i) = K * weight / sum + if (q(i) < 1.0) { + smaller.append(i) + } else { + larger.append(i) + } + } + + while (smaller.nonEmpty && larger.nonEmpty) { + val small = smaller.remove(smaller.length - 1) + val large = larger.remove(larger.length - 1) + + J(small) = large + q(large) = q(large) + q(small) - 1.0 + if (q(large) < 1.0) smaller.append(large) + else larger.append(large) + } + + (J, q) + } + + def setupEdgeAlias(p: Double = 1.0, q: Double = 1.0)(srcId: Long, srcNeighbors: Array[(Long, Double)], dstNeighbors: Array[(Long, Double)]): (Array[Int], Array[Double]) = { + val neighbors_ = dstNeighbors.map { case (dstNeighborId, weight) => + var unnormProb = weight / q + if (srcId == dstNeighborId) unnormProb = weight / p + else if (srcNeighbors.exists(_._1 == dstNeighborId)) unnormProb = weight + + (dstNeighborId, unnormProb) + } + + setupAlias(neighbors_) + } + + def drawAlias(J: Array[Int], q: Array[Double]): Int = { + val K = J.length + val kk = math.floor(math.random * K).toInt + + if (math.random < q(kk)) kk + else J(kk) + } + + lazy val createUndirectedEdge = (srcId: Long, dstId: Long, weight: Double) => { + Array( + (srcId, Array((dstId, weight))), + (dstId, Array((srcId, weight))) + ) + } + + lazy val createDirectedEdge = (srcId: Long, dstId: Long, weight: Double) => { + Array( + (srcId, Array((dstId, weight))) + ) + } +} diff --git a/node2vec_spark/src/main/scala/com/navercorp/graph/package.scala b/node2vec_spark/src/main/scala/com/navercorp/graph/package.scala new file mode 100644 index 0000000..1b83969 --- /dev/null +++ b/node2vec_spark/src/main/scala/com/navercorp/graph/package.scala @@ -0,0 +1,12 @@ +package com.navercorp + +import java.io.Serializable + +package object graph { + case class NodeAttr(var neighbors: Array[(Long, Double)] = Array.empty[(Long, Double)], + var path: Array[Long] = Array.empty[Long]) extends Serializable + + case class EdgeAttr(var dstNeighbors: Array[Long] = Array.empty[Long], + var J: Array[Int] = Array.empty[Int], + var q: Array[Double] = Array.empty[Double]) extends Serializable +} diff --git a/node2vec_spark/src/main/scala/com/navercorp/lib/AbstractParams.scala b/node2vec_spark/src/main/scala/com/navercorp/lib/AbstractParams.scala new file mode 100644 index 0000000..0790ab9 --- /dev/null +++ b/node2vec_spark/src/main/scala/com/navercorp/lib/AbstractParams.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.lib + +import scala.reflect.runtime.universe._ + +/** + * Abstract class for parameter case classes. + * This overrides the [[toString]] method to print all case class fields by name and value. + * @tparam T Concrete parameter class. + */ +abstract class AbstractParams[T: TypeTag] { + + private def tag: TypeTag[T] = typeTag[T] + + /** + * Finds all case class fields in concrete class instance, and outputs them in JSON-style format: + * { + * [field name]:\t[field value]\n + * [field name]:\t[field value]\n + * ... + * } + */ + override def toString: String = { + val tpe = tag.tpe + val allAccessors = tpe.declarations.collect { + case m: MethodSymbol if m.isCaseAccessor => m + } + val mirror = runtimeMirror(getClass.getClassLoader) + val instanceMirror = mirror.reflect(this) + allAccessors.map { f => + val paramName = f.name.toString + val fieldMirror = instanceMirror.reflectField(f) + val paramValue = fieldMirror.get + s" $paramName:\t$paramValue" + }.mkString("{\n", ",\n", "\n}") + } +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..faa710c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +networkx==1.11 +numpy==1.11.2 +gensim==0.13.3 diff --git a/src/main.py b/src/main.py index 4a95fd6..82ac735 100644 --- a/src/main.py +++ b/src/main.py @@ -99,9 +99,6 @@ def main(args): walks = G.simulate_walks(args.num_walks, args.walk_length) learn_embeddings(walks) -args = parse_args() -main(args) - - - - +if __name__ == "__main__": + args = parse_args() + main(args)