Skip to content
kebetsi edited this page Feb 19, 2015 · 7 revisions

The framework is designed in a modular fashion on top of akka's Actor definition, each block must extend the Component class. Component subclasses must implement the receiver() method which describes the processing done when receiving a message, similar to an Actor's receive() method.
Here is an example of the BackLoop component which receives Transactions and delta Orders from the MarketSimulator and sends them on all the outgoing edges that were defined in the use case:

class BackLoop (marketId: Long, p: Persistance[Transaction]) extends Component {
  override def receiver = {
    case t: Transaction => {
      send(t)
      p.save(t) }
    case la: LimitAskOrder => send(la)
    case lb: LimitBidOrder => send(lb)
    case d:  DelOrder      => send(d)
}}

When building a system, a ComponentBuilder instance is used to build the components using the createRef() method. Components are connected together by outgoing edges with the Component.addDestination() method which takes as arguments the destination component and the type of data that will be transmitted on this edge.

addDestination(destination: ComponentRef, data: Class[_])(implicit cb: ComponentBuilder)

There exists two methods to send data on these edges:

send[T: ClassTag](t: T)
send[T](name: String, t: T)

The first one sends the argument of the method to all the edges defined for this type of data, the second one sends te argument to a specific component identified by the name it was built with.
There exists also another version of these methods that sends a list of objects:

send[T: ClassTag](t: List[T])
send[T](name: String, t: List[T])

The ComponentBuilder has a start() and stop() methods that, respectively, send StartSignal and StopSignal to all the components it built. It is commonly used to launch and stop fetchers as well as components that use a scheduler to execute periodic tasks.

A full example of a system build from the TwitterFlowTesterWithStorage class is shown below:

// instantiate builder
implicit val builder = new ComponentBuilder("TwitterPrintSystem")

// Initialize the Interface to DB
val tweetPersistor = new TweetPersistor("twitter-db")

// Create Components
val printer = builder.createRef(Props(classOf[Printer]), "printer")
val persistor = builder.createRef(Props(classOf[Persistor[Tweet]], tweetPersistor, implicitly[ClassTag[Tweet]]), "tweet-persistor")
val fetcher = builder.createRef(Props(classOf[TwitterFetchComponent]), "twitter-fetcher")

// Create the connections
fetcher.addDestination(printer, classOf[Tweet])
fetcher.addDestination(persistor, classOf[Tweet])

// Start the system
builder.start

Clone this wiki locally