val ratings: DStream[Rating[Long]] =
inputFile.map(line => Rating(line.split(",")(0).toInt,
line.split(",")(1).toInt,
line.split(",")(2).toFloat)) // Your input stream of Ratings
val algorithm = new StreamingLatentMatrixFactorization(streamingMFParams)
algorithm.trainOn(ratings)
val pInputFile = ssc.socketTextStream("10.2.3.7", 10088)
val testStream: DStream[(Long, Long)] = // stream of (user, product) pairs to predict on
pInputFile.map(line => (line.split(",")(0).toInt, line.split(",")(1).toInt))
val predictions: DStream[Rating[Long]] = algorithm.predictOn(testStream)
predictions.print()
ssc.start()
ssc.awaitTermination()
My code is as follows, mostly just like the code given in the Usage of README.md. I find there is a
ifto justify whether themodelis empty inLatentMatrixFactorization.scala. Currently I don't know to run this prediction since it seems hard to make the training and prediction in sequential.