longevity

A Persistence Framework for Scala and NoSQL

View project on GitHub

stream by query

The queryToIterator and queryToVector methods we looked at in the previous section leave much to be desired. A streaming approach would be better, but which streaming library should we choose? Scala has many. Currently, longevity supports four of them:

Here’s an example that uses all four:

import akka.NotUsed
import akka.stream.scaladsl.Source
import cats.Eval
import fs2.Stream
import fs2.Task
import io.iteratee.{ Enumerator => CatsEnumerator }
import longevity.persistence.PState
import longevity.persistence.Repo
import play.api.libs.iteratee.{ Enumerator => PlayEnumerator }

val repo: Repo[SomeEffect, DomainModel] = longevityContext.repo

val blog: Blog = getBlogFromSomewhere()

val query: Query[BlogPost] = {
  import com.github.nscala_time.time.Imports._
  import BlogPost.queryDsl._
  import BlogPost.props._
  blogUri eqs blog.blogUri and postDate gt DateTime.now - 1.week
}

val akkaSource: SomeEffect[Source[PState[BlogPost], NotUsed]] =
  repo.queryToAkkaStream(query)

val fs2Stream: SomeEffect[Stream[Task, PState[P]]] =
  repo.queryToFS2(query)

val catsEnumerator: SomeEffect[CatsEnumerator[Eval, PState[P]]] =
  repo.queryToIterateeIo[Eval](query)

val playEnumerator: SomeEffect[PlayEnumerator[PState[P]]] = {
  import scala.concurrent.ExecutionContext.Implicits.global
  repo.queryToPlay(query)
}

You might wonder why all the streams are wrapped in our effect class. After all, the are all streams, and are designed to handle side-effects themselves. The reason is that these Repo methods have to behave well with the other Repo methods. For instance, consider the following example:

def processStream(stream: Source[PState[BlogPost], NotUsed]): SomeEffect[Result] = {
  // ...
}

for {
  _      <- repo.openConnection
  stream <- repo.queryToAkkaStream(query)
  result <- processStream(stream)
  _      <- repo.closeConnection
} yield result

If the stream was obtained outside the effect, then the connection to the database would probably be closed at the time we tried to process the stream.

All the streaming methods will require you to supply artifacts for the streaming libraries in your own build.

prev: retrieval by query
up: queries
next: cassandra query limitations