diff --git a/.gitignore b/.gitignore index 2540b4c..ed59e6c 100644 --- a/.gitignore +++ b/.gitignore @@ -15,13 +15,11 @@ project/boot/ project/plugins/project/ # Scala-IDE specific -really-core/.cache -really-core/.classpath -really-core/.project -really-core/.settings -really-io/.classpath -really-io/.project -really-io/.settings +**/.cache +**/.classpath +**/.project +**/.settings +**/.classpath .really really-io/.really really-core/.really diff --git a/build.sbt b/build.sbt index ca3992d..87cf1f1 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm version in Global := "0.1-SNAPSHOT" -scalaVersion in Global := "2.11.4" +scalaVersion in Global := "2.11.5" scalacOptions in Global ++= Seq("-feature", "-deprecation") @@ -19,4 +19,4 @@ lazy val `really-io` = project in file("really-io") settings (IOBuild.settings: lazy val `really-simple-auth` = project in file("really-simple-auth") settings (AuthBuild.settings: _*) settings (scalariformSettings: _*) enablePlugins(PlayScala) dependsOn `really-utils` -lazy val `really-docs` = project in file("really-docs") settings (DocsBuild.settings: _*) \ No newline at end of file +lazy val `really-docs` = project in file("really-docs") settings (DocsBuild.settings: _*) diff --git a/project/plugins.sbt b/project/plugins.sbt index bbf7a92..a3d28cf 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,8 +6,6 @@ resolvers += "Sonatype Snapshots" at "http://oss.sonatype.org/content/repositori resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/" -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0") - addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.8") addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.4.0-M2") diff --git a/really-core/src/main/scala/io/really/boot/DefaultReallyGlobals.scala b/really-core/src/main/scala/io/really/boot/DefaultReallyGlobals.scala index e9aa317..f575545 100644 --- a/really-core/src/main/scala/io/really/boot/DefaultReallyGlobals.scala +++ b/really-core/src/main/scala/io/really/boot/DefaultReallyGlobals.scala @@ -9,7 +9,7 @@ import akka.contrib.pattern.DistributedPubSubExtension import _root_.io.really.gorilla._ import _root_.io.really.model.materializer.{ MaterializerSharding, CollectionViewMaterializer } import akka.event.Logging -import _root_.io.really.model.ReadHandler +import io.really.model.{ Model, ReadHandler, CollectionSharding, CollectionActor } import _root_.io.really.model.persistent.{ ModelRegistry, RequestRouter, PersistentModelStore } import reactivemongo.api.{ DefaultDB, MongoDriver } import scala.collection.JavaConversions._ @@ -17,7 +17,6 @@ import scala.collection.JavaConversions._ import scala.slick.driver.H2Driver.simple._ import akka.contrib.pattern.ClusterSharding import _root_.io.really._ -import _root_.io.really.model.{ CollectionSharding, CollectionActor } import play.api.libs.json.JsObject import _root_.io.really.quickSand.QuickSand @@ -121,6 +120,9 @@ class DefaultReallyGlobals(override val config: ReallyConfig) extends ReallyGlob override def objectSubscriberProps(rSubscription: RSubscription): Props = Props(classOf[ObjectSubscriber], rSubscription, this) + def querySubscriberProps(subscriptionId: String, model: Model, querySubscription: QuerySubscription): Props = + Props(classOf[QuerySubscriber], this, subscriptionId, model, querySubscription) + def replayerProps(rSubscription: RSubscription, objectSubscriber: ActorRef, maxMarker: Option[Revision]): Props = Props(classOf[Replayer], this, objectSubscriber, rSubscription, maxMarker) diff --git a/really-core/src/main/scala/io/really/gorilla/GorillaEventCenter.scala b/really-core/src/main/scala/io/really/gorilla/GorillaEventCenter.scala index 3977367..5338028 100644 --- a/really-core/src/main/scala/io/really/gorilla/GorillaEventCenter.scala +++ b/really-core/src/main/scala/io/really/gorilla/GorillaEventCenter.scala @@ -7,7 +7,7 @@ import akka.actor._ import scala.slick.driver.H2Driver.simple._ import akka.contrib.pattern.DistributedPubSubMediator.Subscribe import akka.contrib.pattern.{ DistributedPubSubMediator, ShardRegion } -import io.really.gorilla.SubscriptionManager.ObjectSubscribed +import io.really.gorilla.SubscriptionManager.{ QuerySubscribed, ObjectSubscribed } import io.really.model.{ Model, Helpers } import io.really._ import scala.slick.jdbc.meta.MTable @@ -55,6 +55,12 @@ class GorillaEventCenter(globals: ReallyGlobals)(implicit session: Session) exte globals.mediator ! Subscribe(rSub.r.toString, replayer) objectSubscriber ! ReplayerSubscribed(replayer) sender() ! ObjectSubscribed(rSub, replyTo, objectSubscriber) + case NewQuerySubscription(subscriptionId, pushChannel, ctx, r, query, model, fields) => + //create the query subscription actor + val querySubscriber = context.actorOf(globals.querySubscriberProps(subscriptionId, model, QuerySubscription(ctx, r, fields, query, pushChannel))) + globals.mediator ! Subscribe(r.noId.toString, querySubscriber) + sender() ! QuerySubscribed(subscriptionId, querySubscriber) + } private def persistEvent(persistentEvent: PersistentEvent): Unit = diff --git a/really-core/src/main/scala/io/really/gorilla/QuerySubscriber.scala b/really-core/src/main/scala/io/really/gorilla/QuerySubscriber.scala new file mode 100644 index 0000000..c529166 --- /dev/null +++ b/really-core/src/main/scala/io/really/gorilla/QuerySubscriber.scala @@ -0,0 +1,19 @@ +package io.really.gorilla + +import akka.actor.{ ActorLogging, Actor } +import io.really.model.Model +import io.really.ReallyGlobals + +class QuerySubscriber(globals: ReallyGlobals, subscriptionId: String, + model: Model, + querySubscription: QuerySubscription) extends Actor with ActorLogging { + //todo on start register on model modifications + + def receive = { + case PersistentCreatedEvent(event) => + //validate by query filter + //apply onGet for the object + //filter fields + //push on pushChannel + } +} \ No newline at end of file diff --git a/really-core/src/main/scala/io/really/gorilla/Replayer.scala b/really-core/src/main/scala/io/really/gorilla/Replayer.scala index 94d0216..ec42d4a 100644 --- a/really-core/src/main/scala/io/really/gorilla/Replayer.scala +++ b/really-core/src/main/scala/io/really/gorilla/Replayer.scala @@ -145,9 +145,6 @@ class Replayer(globals: ReallyGlobals, objectSubscriber: ActorRef, rSubscription def servePushUpdates: Receive = _servePushUpdates orElse commonHandler def _servePushUpdates: Receive = { - case PersistentCreatedEvent(event) => - //The only case a push update about created event should happen if the client subscribed on an object before - // creation with a previous knowledge about the object ID, Do nothing for now! case PersistentUpdatedEvent(event, obj) => objectSubscriber ! GorillaLogUpdatedEntry(event.r, obj, event.rev, event.modelVersion, event.context.auth, event.ops) diff --git a/really-core/src/main/scala/io/really/gorilla/SubscriptionManager.scala b/really-core/src/main/scala/io/really/gorilla/SubscriptionManager.scala index a0c2ad8..118f58b 100644 --- a/really-core/src/main/scala/io/really/gorilla/SubscriptionManager.scala +++ b/really-core/src/main/scala/io/really/gorilla/SubscriptionManager.scala @@ -9,7 +9,7 @@ import akka.actor._ import _root_.io.really.{ R, ReallyGlobals, RequestContext } import _root_.io.really.rql.RQL.Query import _root_.io.really.Result -import _root_.io.really.model.FieldKey +import _root_.io.really.model.{ FieldKey, Model } import _root_.io.really.protocol.SubscriptionFailure import _root_.io.really.{ R, ReallyGlobals } import _root_.io.really.Request.{ SubscribeOnObject, UnsubscribeFromObject } @@ -19,13 +19,11 @@ import _root_.io.really.Request.{ SubscribeOnObject, UnsubscribeFromObject } * @param globals */ class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLogging { - - type SubscriberIdentifier = ActorPath - import SubscriptionManager._ private[gorilla] var rSubscriptions: Map[SubscriberIdentifier, InternalRSubscription] = Map.empty private[gorilla] var roomSubscriptions: Map[SubscriberIdentifier, InternalRSubscription] = Map.empty + private[gorilla] var querySubscriptions: Map[SubscriptionID, ActorRef] = Map.empty def failedToRegisterNewSubscription(originalSender: ActorRef, r: R, newSubscriber: ActorRef, reason: String) = { newSubscriber ! SubscriptionFailure(r, 500, reason) @@ -67,6 +65,7 @@ class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLoggin case request: UnsubscribeFromObject => ??? + case SubscribeOnR(subData) => val replyTo = sender() rSubscriptions.get(subData.pushChannel.path).map { @@ -75,6 +74,7 @@ class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLoggin }.getOrElse { globals.gorillaEventCenter ! NewSubscription(replyTo, subData) } + case ObjectSubscribed(subData, replyTo, objectSubscriber) => rSubscriptions += subData.pushChannel.path -> InternalRSubscription(objectSubscriber, subData.r) context.watch(objectSubscriber) //TODO handle death @@ -84,12 +84,25 @@ class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLoggin } else { replyTo ! SubscriptionDone(subData.r) } + case UnsubscribeFromR(subData) => //TODO Ack the delegate rSubscriptions.get(subData.pushChannel.path).map { rSub => rSub.objectSubscriber ! Unsubscribe rSubscriptions -= subData.pushChannel.path } + + case SubscribeOnQuery(requester, pushChannel, ctx, r, query, model, fields, results) => + // create an id for this subscription + val subscriptionId = globals.quickSand.nextId().toString() + // initiate subscriber actor + globals.gorillaEventCenter ! NewQuerySubscription(subscriptionId, pushChannel, ctx, r, query, model, fields) + // ship results to the requester along with the subscription id + requester ! results.copy(subscription = Some(subscriptionId)) + + case QuerySubscribed(subscriptionId, querySubscriber) => + querySubscriptions += (subscriptionId -> querySubscriber) + context.watch(querySubscriber) } def roomSubscriptionsHandler: Receive = { @@ -112,7 +125,10 @@ object SubscriptionManager { case class SubscribeOnR(rSubscription: RSubscription) - case class SubscribeOnQuery(requester: ActorRef, ctx: RequestContext, query: Query, passOnResults: Result.ReadResult) + case class SubscribeOnQuery(requester: ActorRef, pushChannel: ActorRef, + ctx: RequestContext, r: R, query: Query, + model: Model, fields: Set[FieldKey], + passOnResults: Result.ReadResult) case class SubscribeOnRoom(rSubscription: RoomSubscription) @@ -126,6 +142,8 @@ object SubscriptionManager { case class ObjectSubscribed(subData: RSubscription, replyTo: ActorRef, objectSubscriber: ActorRef) + case class QuerySubscribed(subscriptionId: String, querySubscriber: ActorRef) + case class SubscriptionDone(r: R) } diff --git a/really-core/src/main/scala/io/really/gorilla/package.scala b/really-core/src/main/scala/io/really/gorilla/package.scala index c3e2767..517a568 100644 --- a/really-core/src/main/scala/io/really/gorilla/package.scala +++ b/really-core/src/main/scala/io/really/gorilla/package.scala @@ -6,17 +6,20 @@ package io.really import _root_.io.really.model._ import _root_.io.really.protocol.UpdateOp import _root_.io.really.model.CollectionActor.CollectionActorEvent -import akka.actor.ActorRef +import akka.actor.{ ActorPath, ActorRef } +import _root_.io.really.rql.RQL.Query import play.api.libs.json.{ Json, JsObject } package object gorilla { - + type SubscriberIdentifier = ActorPath type SubscriptionID = String type PushEventType = String case class RSubscription(ctx: RequestContext, r: R, fields: Set[FieldKey], rev: Revision, requestDelegate: ActorRef, pushChannel: ActorRef) + case class QuerySubscription(ctx: RequestContext, r: R, fields: Set[FieldKey], query: Query, pushChannel: ActorRef) + case class RoomSubscription(ctx: RequestContext, r: R, requestDelegate: ActorRef, pushChannel: ActorRef) @@ -26,6 +29,7 @@ package object gorilla { val r = rSubscription.r } + case class NewQuerySubscription(subscriptionId: String, pushChannel: ActorRef, ctx: RequestContext, val r: R, query: Query, model: Model, fields: Set[FieldKey]) extends RoutableToGorillaCenter trait PersistentEvent extends RoutableToGorillaCenter { def event: CollectionActorEvent val r = event.r diff --git a/really-core/src/main/scala/io/really/model/ReadHandler.scala b/really-core/src/main/scala/io/really/model/ReadHandler.scala index 49ea2b0..efde8fc 100644 --- a/really-core/src/main/scala/io/really/model/ReadHandler.scala +++ b/really-core/src/main/scala/io/really/model/ReadHandler.scala @@ -57,7 +57,8 @@ class ReadHandler(globals: ReallyGlobals) extends Actor with Stash with ActorLog val readResult = ReadResult(r, result, None) if (cmdOpts.subscribe) //forward results to subscription manager - globals.subscriptionManager ! SubscribeOnQuery(requester, ctx, getQuery(r, cmdOpts), readResult) + globals.subscriptionManager ! SubscribeOnQuery(requester, pushChannel, + ctx, r, getQuery(r, cmdOpts), model, getRequestFields(cmdOpts.fields, model), readResult) else requester ! readResult } recover { diff --git a/really-core/src/main/scala/io/really/package.scala b/really-core/src/main/scala/io/really/package.scala index ca2810b..b78a4f5 100644 --- a/really-core/src/main/scala/io/really/package.scala +++ b/really-core/src/main/scala/io/really/package.scala @@ -10,10 +10,10 @@ package io { import play.api.data.validation.ValidationError import reactivemongo.api.DefaultDB import akka.actor.{ Props, ActorSystem, ActorRef } - import io.really.model.{ FieldKey, DataObject } + import io.really.model.{ FieldKey, DataObject, Model } import io.really.quickSand.QuickSand import io.really.protocol._ - import io.really.gorilla.RSubscription + import io.really.gorilla.{ QuerySubscription, RSubscription } import org.joda.time.DateTime import play.api.libs.json._ @@ -56,6 +56,8 @@ package io { def objectSubscriberProps(rSubscription: RSubscription): Props + def querySubscriberProps(subscriptionId: String, model: Model, querySubscription: QuerySubscription): Props + def replayerProps(rSubscription: RSubscription, objectSubscriber: ActorRef, maxMarker: Option[Revision]): Props def receptionistProps: Props diff --git a/really-core/src/main/scala/io/really/rql/RQLParser.scala b/really-core/src/main/scala/io/really/rql/RQLParser.scala index d6e1bbc..0874866 100644 --- a/really-core/src/main/scala/io/really/rql/RQLParser.scala +++ b/really-core/src/main/scala/io/really/rql/RQLParser.scala @@ -9,6 +9,13 @@ import play.api.libs.json.{ JsValue, JsObject } import scala.util.parsing.combinator._ object RQLParser { + /** + * The main function in this object, takes a query string and a the json object containing the values and returns + * the Query instance. + * @param filter the query string + * @param values a `JsObject` that holds the values {fieldName -> value} + * @return + */ def parse(filter: String, values: JsObject): Either[RQL.ParseError, Query] = { val parser = new RQLParser(values) try { diff --git a/really-core/src/main/scala/io/really/rql/package.scala b/really-core/src/main/scala/io/really/rql/package.scala index a93a7c2..122fcf2 100644 --- a/really-core/src/main/scala/io/really/rql/package.scala +++ b/really-core/src/main/scala/io/really/rql/package.scala @@ -5,8 +5,8 @@ package io.really.rql import io.really.model.Field import play.api.data.validation.ValidationError -import play.api.libs.json._ import play.api.libs.functional.syntax._ +import play.api.libs.json._ import scala.util.parsing.input.Positional object RQL { @@ -111,7 +111,6 @@ object RQL { def isValid: Either[ValidationError, RQLSuccess] = Right(RQLSuccess(this)) def validateObject(obj: JsObject, fields: Set[Field[_]]): Boolean = true - } case class SimpleQuery(key: Term, op: Operator, termValue: TermValue) extends Query with Positional { @@ -164,7 +163,8 @@ object RQL { private def parseAndValidate(filter: String, values: JsObject): JsResult[Query] = RQLParser.parse(filter, values) match { - case Right(q) => + case Right(q) => //q.validate + q.isValid match { case Right(_) => JsSuccess(q) case Left(error) => JsError(Seq(JsPath() -> Seq(error))) diff --git a/really-simple-auth/bin/application-logger.xml b/really-simple-auth/bin/application-logger.xml new file mode 100644 index 0000000..82d52f3 --- /dev/null +++ b/really-simple-auth/bin/application-logger.xml @@ -0,0 +1,36 @@ + + + + + ${application.home}/logs/application.log + + %date - [%level] - from %logger in %thread %n%message%n%xException%n + + + + + + %coloredLevel %logger{15} - %message%n%xException{5} + + + + + + %X{akkaTimestamp} %highlight(%-5level) %highlight(%logger{36}) %X{akkaSource} - %msg%n + + + + + + + + + + + + + + + + + diff --git a/really-simple-auth/bin/application.conf b/really-simple-auth/bin/application.conf new file mode 100644 index 0000000..66f20ed --- /dev/null +++ b/really-simple-auth/bin/application.conf @@ -0,0 +1,74 @@ +# This is the main configuration file for the application. +# ~~~~~ + +# Secret key +# ~~~~~ +# The secret key is used to secure cryptographics functions. +# +# This must be changed for production, but we recommend not changing it in this file. +# +# See http://www.playframework.com/documentation/latest/ApplicationSecret for more details. +application.secret="0M>W=R5aaBSuUhiDjkXorP5Ow + + + + + + %coloredLevel %logger{15} - %X{akkaTimestamp} - %X{akkaSource}- %message%n%xException{5} + + + + + + %X{akkaTimestamp} %highlight(%-5level) %highlight(%logger{36}) %X{akkaSource} - %msg%n + + + + + + + + + + + + + + + + +