Skip to content
This repository was archived by the owner on Oct 2, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ project/boot/
project/plugins/project/

# Scala-IDE specific
really-core/.cache
really-core/.classpath
really-core/.project
really-core/.settings
really-io/.classpath
really-io/.project
really-io/.settings
**/.cache
**/.classpath
**/.project
**/.settings
**/.classpath
.really
really-io/.really
really-core/.really
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm

version in Global := "0.1-SNAPSHOT"

scalaVersion in Global := "2.11.4"
scalaVersion in Global := "2.11.5"

scalacOptions in Global ++= Seq("-feature", "-deprecation")

Expand All @@ -19,4 +19,4 @@ lazy val `really-io` = project in file("really-io") settings (IOBuild.settings:

lazy val `really-simple-auth` = project in file("really-simple-auth") settings (AuthBuild.settings: _*) settings (scalariformSettings: _*) enablePlugins(PlayScala) dependsOn `really-utils`

lazy val `really-docs` = project in file("really-docs") settings (DocsBuild.settings: _*)
lazy val `really-docs` = project in file("really-docs") settings (DocsBuild.settings: _*)
2 changes: 0 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ resolvers += "Sonatype Snapshots" at "http://oss.sonatype.org/content/repositori

resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")

addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.3.8")

addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.4.0-M2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ import akka.contrib.pattern.DistributedPubSubExtension
import _root_.io.really.gorilla._
import _root_.io.really.model.materializer.{ MaterializerSharding, CollectionViewMaterializer }
import akka.event.Logging
import _root_.io.really.model.ReadHandler
import io.really.model.{ Model, ReadHandler, CollectionSharding, CollectionActor }
import _root_.io.really.model.persistent.{ ModelRegistry, RequestRouter, PersistentModelStore }
import reactivemongo.api.{ DefaultDB, MongoDriver }
import scala.collection.JavaConversions._

import scala.slick.driver.H2Driver.simple._
import akka.contrib.pattern.ClusterSharding
import _root_.io.really._
import _root_.io.really.model.{ CollectionSharding, CollectionActor }
import play.api.libs.json.JsObject
import _root_.io.really.quickSand.QuickSand

Expand Down Expand Up @@ -121,6 +120,9 @@ class DefaultReallyGlobals(override val config: ReallyConfig) extends ReallyGlob
override def objectSubscriberProps(rSubscription: RSubscription): Props =
Props(classOf[ObjectSubscriber], rSubscription, this)

def querySubscriberProps(subscriptionId: String, model: Model, querySubscription: QuerySubscription): Props =
Props(classOf[QuerySubscriber], this, subscriptionId, model, querySubscription)

def replayerProps(rSubscription: RSubscription, objectSubscriber: ActorRef, maxMarker: Option[Revision]): Props =
Props(classOf[Replayer], this, objectSubscriber, rSubscription, maxMarker)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.actor._
import scala.slick.driver.H2Driver.simple._
import akka.contrib.pattern.DistributedPubSubMediator.Subscribe
import akka.contrib.pattern.{ DistributedPubSubMediator, ShardRegion }
import io.really.gorilla.SubscriptionManager.ObjectSubscribed
import io.really.gorilla.SubscriptionManager.{ QuerySubscribed, ObjectSubscribed }
import io.really.model.{ Model, Helpers }
import io.really._
import scala.slick.jdbc.meta.MTable
Expand Down Expand Up @@ -55,6 +55,12 @@ class GorillaEventCenter(globals: ReallyGlobals)(implicit session: Session) exte
globals.mediator ! Subscribe(rSub.r.toString, replayer)
objectSubscriber ! ReplayerSubscribed(replayer)
sender() ! ObjectSubscribed(rSub, replyTo, objectSubscriber)
case NewQuerySubscription(subscriptionId, pushChannel, ctx, r, query, model, fields) =>
//create the query subscription actor
val querySubscriber = context.actorOf(globals.querySubscriberProps(subscriptionId, model, QuerySubscription(ctx, r, fields, query, pushChannel)))
globals.mediator ! Subscribe(r.noId.toString, querySubscriber)
sender() ! QuerySubscribed(subscriptionId, querySubscriber)

}

private def persistEvent(persistentEvent: PersistentEvent): Unit =
Expand Down
19 changes: 19 additions & 0 deletions really-core/src/main/scala/io/really/gorilla/QuerySubscriber.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.really.gorilla

import akka.actor.{ ActorLogging, Actor }
import io.really.model.Model
import io.really.ReallyGlobals

class QuerySubscriber(globals: ReallyGlobals, subscriptionId: String,
model: Model,
querySubscription: QuerySubscription) extends Actor with ActorLogging {
//todo on start register on model modifications

def receive = {
case PersistentCreatedEvent(event) =>
//validate by query filter
//apply onGet for the object
//filter fields
//push on pushChannel
}
}
3 changes: 0 additions & 3 deletions really-core/src/main/scala/io/really/gorilla/Replayer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,6 @@ class Replayer(globals: ReallyGlobals, objectSubscriber: ActorRef, rSubscription
def servePushUpdates: Receive = _servePushUpdates orElse commonHandler

def _servePushUpdates: Receive = {
case PersistentCreatedEvent(event) =>
//The only case a push update about created event should happen if the client subscribed on an object before
// creation with a previous knowledge about the object ID, Do nothing for now!
case PersistentUpdatedEvent(event, obj) =>
objectSubscriber ! GorillaLogUpdatedEntry(event.r, obj, event.rev, event.modelVersion, event.context.auth,
event.ops)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.actor._
import _root_.io.really.{ R, ReallyGlobals, RequestContext }
import _root_.io.really.rql.RQL.Query
import _root_.io.really.Result
import _root_.io.really.model.FieldKey
import _root_.io.really.model.{ FieldKey, Model }
import _root_.io.really.protocol.SubscriptionFailure
import _root_.io.really.{ R, ReallyGlobals }
import _root_.io.really.Request.{ SubscribeOnObject, UnsubscribeFromObject }
Expand All @@ -19,13 +19,11 @@ import _root_.io.really.Request.{ SubscribeOnObject, UnsubscribeFromObject }
* @param globals
*/
class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLogging {

type SubscriberIdentifier = ActorPath

import SubscriptionManager._

private[gorilla] var rSubscriptions: Map[SubscriberIdentifier, InternalRSubscription] = Map.empty
private[gorilla] var roomSubscriptions: Map[SubscriberIdentifier, InternalRSubscription] = Map.empty
private[gorilla] var querySubscriptions: Map[SubscriptionID, ActorRef] = Map.empty

def failedToRegisterNewSubscription(originalSender: ActorRef, r: R, newSubscriber: ActorRef, reason: String) = {
newSubscriber ! SubscriptionFailure(r, 500, reason)
Expand Down Expand Up @@ -67,6 +65,7 @@ class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLoggin

case request: UnsubscribeFromObject =>
???

case SubscribeOnR(subData) =>
val replyTo = sender()
rSubscriptions.get(subData.pushChannel.path).map {
Expand All @@ -75,6 +74,7 @@ class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLoggin
}.getOrElse {
globals.gorillaEventCenter ! NewSubscription(replyTo, subData)
}

case ObjectSubscribed(subData, replyTo, objectSubscriber) =>
rSubscriptions += subData.pushChannel.path -> InternalRSubscription(objectSubscriber, subData.r)
context.watch(objectSubscriber) //TODO handle death
Expand All @@ -84,12 +84,25 @@ class SubscriptionManager(globals: ReallyGlobals) extends Actor with ActorLoggin
} else {
replyTo ! SubscriptionDone(subData.r)
}

case UnsubscribeFromR(subData) => //TODO Ack the delegate
rSubscriptions.get(subData.pushChannel.path).map {
rSub =>
rSub.objectSubscriber ! Unsubscribe
rSubscriptions -= subData.pushChannel.path
}

case SubscribeOnQuery(requester, pushChannel, ctx, r, query, model, fields, results) =>
// create an id for this subscription
val subscriptionId = globals.quickSand.nextId().toString()
// initiate subscriber actor
globals.gorillaEventCenter ! NewQuerySubscription(subscriptionId, pushChannel, ctx, r, query, model, fields)
// ship results to the requester along with the subscription id
requester ! results.copy(subscription = Some(subscriptionId))

case QuerySubscribed(subscriptionId, querySubscriber) =>
querySubscriptions += (subscriptionId -> querySubscriber)
context.watch(querySubscriber)
}

def roomSubscriptionsHandler: Receive = {
Expand All @@ -112,7 +125,10 @@ object SubscriptionManager {

case class SubscribeOnR(rSubscription: RSubscription)

case class SubscribeOnQuery(requester: ActorRef, ctx: RequestContext, query: Query, passOnResults: Result.ReadResult)
case class SubscribeOnQuery(requester: ActorRef, pushChannel: ActorRef,
ctx: RequestContext, r: R, query: Query,
model: Model, fields: Set[FieldKey],
passOnResults: Result.ReadResult)

case class SubscribeOnRoom(rSubscription: RoomSubscription)

Expand All @@ -126,6 +142,8 @@ object SubscriptionManager {

case class ObjectSubscribed(subData: RSubscription, replyTo: ActorRef, objectSubscriber: ActorRef)

case class QuerySubscribed(subscriptionId: String, querySubscriber: ActorRef)

case class SubscriptionDone(r: R)

}
8 changes: 6 additions & 2 deletions really-core/src/main/scala/io/really/gorilla/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ package io.really
import _root_.io.really.model._
import _root_.io.really.protocol.UpdateOp
import _root_.io.really.model.CollectionActor.CollectionActorEvent
import akka.actor.ActorRef
import akka.actor.{ ActorPath, ActorRef }
import _root_.io.really.rql.RQL.Query
import play.api.libs.json.{ Json, JsObject }

package object gorilla {

type SubscriberIdentifier = ActorPath
type SubscriptionID = String
type PushEventType = String

case class RSubscription(ctx: RequestContext, r: R, fields: Set[FieldKey], rev: Revision,
requestDelegate: ActorRef, pushChannel: ActorRef)

case class QuerySubscription(ctx: RequestContext, r: R, fields: Set[FieldKey], query: Query, pushChannel: ActorRef)

case class RoomSubscription(ctx: RequestContext, r: R, requestDelegate: ActorRef,
pushChannel: ActorRef)

Expand All @@ -26,6 +29,7 @@ package object gorilla {
val r = rSubscription.r
}

case class NewQuerySubscription(subscriptionId: String, pushChannel: ActorRef, ctx: RequestContext, val r: R, query: Query, model: Model, fields: Set[FieldKey]) extends RoutableToGorillaCenter
trait PersistentEvent extends RoutableToGorillaCenter {
def event: CollectionActorEvent
val r = event.r
Expand Down
3 changes: 2 additions & 1 deletion really-core/src/main/scala/io/really/model/ReadHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class ReadHandler(globals: ReallyGlobals) extends Actor with Stash with ActorLog
val readResult = ReadResult(r, result, None)
if (cmdOpts.subscribe)
//forward results to subscription manager
globals.subscriptionManager ! SubscribeOnQuery(requester, ctx, getQuery(r, cmdOpts), readResult)
globals.subscriptionManager ! SubscribeOnQuery(requester, pushChannel,
ctx, r, getQuery(r, cmdOpts), model, getRequestFields(cmdOpts.fields, model), readResult)
else
requester ! readResult
} recover {
Expand Down
6 changes: 4 additions & 2 deletions really-core/src/main/scala/io/really/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ package io {
import play.api.data.validation.ValidationError
import reactivemongo.api.DefaultDB
import akka.actor.{ Props, ActorSystem, ActorRef }
import io.really.model.{ FieldKey, DataObject }
import io.really.model.{ FieldKey, DataObject, Model }
import io.really.quickSand.QuickSand
import io.really.protocol._
import io.really.gorilla.RSubscription
import io.really.gorilla.{ QuerySubscription, RSubscription }
import org.joda.time.DateTime
import play.api.libs.json._

Expand Down Expand Up @@ -56,6 +56,8 @@ package io {

def objectSubscriberProps(rSubscription: RSubscription): Props

def querySubscriberProps(subscriptionId: String, model: Model, querySubscription: QuerySubscription): Props

def replayerProps(rSubscription: RSubscription, objectSubscriber: ActorRef, maxMarker: Option[Revision]): Props

def receptionistProps: Props
Expand Down
7 changes: 7 additions & 0 deletions really-core/src/main/scala/io/really/rql/RQLParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import play.api.libs.json.{ JsValue, JsObject }
import scala.util.parsing.combinator._

object RQLParser {
/**
* The main function in this object, takes a query string and a the json object containing the values and returns
* the Query instance.
* @param filter the query string
* @param values a `JsObject` that holds the values {fieldName -> value}
* @return
*/
def parse(filter: String, values: JsObject): Either[RQL.ParseError, Query] = {
val parser = new RQLParser(values)
try {
Expand Down
6 changes: 3 additions & 3 deletions really-core/src/main/scala/io/really/rql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package io.really.rql

import io.really.model.Field
import play.api.data.validation.ValidationError
import play.api.libs.json._
import play.api.libs.functional.syntax._
import play.api.libs.json._
import scala.util.parsing.input.Positional

object RQL {
Expand Down Expand Up @@ -111,7 +111,6 @@ object RQL {
def isValid: Either[ValidationError, RQLSuccess] = Right(RQLSuccess(this))

def validateObject(obj: JsObject, fields: Set[Field[_]]): Boolean = true

}

case class SimpleQuery(key: Term, op: Operator, termValue: TermValue) extends Query with Positional {
Expand Down Expand Up @@ -164,7 +163,8 @@ object RQL {

private def parseAndValidate(filter: String, values: JsObject): JsResult[Query] =
RQLParser.parse(filter, values) match {
case Right(q) =>
case Right(q) => //q.validate

q.isValid match {
case Right(_) => JsSuccess(q)
case Left(error) => JsError(Seq(JsPath() -> Seq(error)))
Expand Down
36 changes: 36 additions & 0 deletions really-simple-auth/bin/application-logger.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<configuration>
<conversionRule conversionWord="coloredLevel" converterClass="play.api.Logger$ColoredLevel" />

<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${application.home}/logs/application.log</file>
<encoder>
<pattern>%date - [%level] - from %logger in %thread %n%message%n%xException%n</pattern>
</encoder>
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%coloredLevel %logger{15} - %message%n%xException{5}</pattern>
</encoder>
</appender>

<appender name="AKKASTDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%X{akkaTimestamp} %highlight(%-5level) %highlight(%logger{36}) %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>

<!-- Off these ones as they are annoying, and anyway we manage configuration ourself -->
<logger name="com.avaje.ebean.config.PropertyMapLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.core.XmlConfigLoader" level="OFF" />
<logger name="com.avaje.ebeaninternal.server.lib.BackgroundThread" level="OFF" />
<logger name="com.gargoylesoftware.htmlunit.javascript" level="OFF" />

<!-- Our Own Loggers -->
<logger name="play" level="INFO" />
<logger name="application" level="DEBUG" />

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Loading