Skip to content
This repository was archived by the owner on Jun 28, 2019. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions client/src/main/scala/tmt/views/SubscriptionView.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import rx._
import tmt.framework.Framework._
import tmt.framework.Helpers._
import tmt.shared.models.{Connection, ConnectionSet, RoleMappings}

import monifu.concurrent.Implicits.globalScheduler
import scalatags.JsDom.all._

class SubscriptionView(roleMappings: RoleMappings, connectionSet: ConnectionSet) {
Expand Down Expand Up @@ -50,8 +50,9 @@ class SubscriptionView(roleMappings: RoleMappings, connectionSet: ConnectionSet)
}

def addConnection() = {
subscribe(connection())
connections() = connections() + connection()
subscribe(connection()).onSuccess {
case _ => connections() = connections() + connection()
}
}

def removeConnection(connection: Connection) = {
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/scala/tmt/library/Role.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package tmt.library

import enumeratum.{Enum, EnumEntry}

sealed abstract class Role(maybeConsumes: Option[ItemType], maybeProduces: Option[ItemType]) extends EnumEntry
sealed abstract class Role(val maybeConsumes: Option[ItemType], val maybeProduces: Option[ItemType]) extends EnumEntry
sealed abstract class SourceRole(override val entryName: String, val produces: ItemType) extends Role(None, Some(produces))
sealed abstract class SinkRole(override val entryName: String, val consumes: ItemType) extends Role(Some(consumes), None)
sealed abstract class FlowRole(override val entryName: String, val consumes: ItemType, val produces: ItemType) extends Role(Some(consumes), Some(produces))
Expand Down
7 changes: 5 additions & 2 deletions frontend/app/controllers/StreamController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import javax.inject.{Inject, Singleton}

import common.AppSettings
import play.api.mvc.{Action, Controller}
import services.{ClusterClientService, ConnectionSetService, RoleMappingsService}
import services.{ValidationFailedException, ClusterClientService, ConnectionSetService, RoleMappingsService}
import templates.Page
import upickle.default._

Expand Down Expand Up @@ -46,7 +46,10 @@ class StreamController @Inject()(

def subscribe(serverName: String, topic: String) = Action {
clusterClientService.subscribe(serverName, topic)
Accepted("ok")
.map(_ => Accepted("ok"))
.recover {
case ValidationFailedException(msg) => BadGateway(msg)
}.get
}

def unsubscribe(serverName: String, topic: String) = Action {
Expand Down
26 changes: 23 additions & 3 deletions frontend/app/services/ClusterClientService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.pattern.ask
import akka.util.Timeout
import tmt.common.Messages
import tmt.library.Role
import tmt.shared.Topics
import tmt.shared.models.ConnectionSet

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.{Failure, Success, Try}

@Singleton
class ClusterClientService @Inject()(system: ActorSystem) {
class ClusterClientService @Inject()(system: ActorSystem, roleMappingsService: RoleMappingsService) {

implicit val timeout = Timeout(2.seconds)

Expand All @@ -26,13 +28,31 @@ class ClusterClientService @Inject()(system: ActorSystem) {
mediator ! Publish(Topics.Throttle, Messages.UpdateDelay(serverName, delay))
}

def subscribe(serverName: String, topic: String) = {
mediator ! Publish(Topics.Subscription, Messages.Subscribe(serverName, topic))
def subscribe(serverName: String, topic: String): Try[Unit] = {
validate(serverName, topic) match {
case Some(true) =>
Success(mediator ! Publish(Topics.Subscription, Messages.Subscribe(serverName, topic)))
case _ =>
Failure(new ValidationFailedException("Bad request"))
}
}

def unsubscribe(serverName: String, topic: String) = {
mediator ! Publish(Topics.Subscription, Messages.Unsubscribe(serverName, topic))
}

def allConnections = (connectionStore ? ConnectionStore.GetConnections).mapTo[ConnectionSet]

private def validate(sourceServerName: String, destinationServerName: String) = {
val roleMappings = roleMappingsService.onlineRoleMappings

for {
sourceRoleName <- roleMappings.roleOf(sourceServerName)
destinationRoleName <- roleMappings.roleOf(destinationServerName)
sourceRole = Role.withName(sourceRoleName)
destinationRole = Role.withName(destinationRoleName)
} yield sourceRole.maybeConsumes.isDefined && sourceRole.maybeConsumes == destinationRole.maybeProduces
}
}

case class ValidationFailedException(msg: String) extends RuntimeException(msg)
6 changes: 6 additions & 0 deletions shared/src/main/scala/tmt/shared/models/RoleMappings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ case class RoleMappings(mappings: Map[String, Seq[String]]) {
role -> serverNames.filter(onlineRoles)
}
}
def roleOf(serverName: String) = {
mappings.find { mapping =>
val servers = mapping._2
servers.contains(serverName)
}.map(_._1)
}
}