stream by query
The queryToIterator
and queryToVector
methods we looked at in the previous
section leave much to be desired. A streaming approach would be better, but which
streaming library should we choose? Scala has many. Currently, longevity supports four of them:
Here’s an example that uses all four:
import akka.NotUsed
import akka.stream.scaladsl.Source
import cats.Eval
import fs2.Stream
import fs2.Task
import io.iteratee.{ Enumerator => CatsEnumerator }
import longevity.persistence.PState
import longevity.persistence.Repo
import play.api.libs.iteratee.{ Enumerator => PlayEnumerator }
val repo: Repo[SomeEffect, DomainModel] = longevityContext.repo
val blog: Blog = getBlogFromSomewhere()
val query: Query[BlogPost] = {
import com.github.nscala_time.time.Imports._
import BlogPost.queryDsl._
import BlogPost.props._
blogUri eqs blog.blogUri and postDate gt DateTime.now - 1.week
}
val akkaSource: SomeEffect[Source[PState[BlogPost], NotUsed]] =
repo.queryToAkkaStream(query)
val fs2Stream: SomeEffect[Stream[Task, PState[P]]] =
repo.queryToFS2(query)
val catsEnumerator: SomeEffect[CatsEnumerator[Eval, PState[P]]] =
repo.queryToIterateeIo[Eval](query)
val playEnumerator: SomeEffect[PlayEnumerator[PState[P]]] = {
import scala.concurrent.ExecutionContext.Implicits.global
repo.queryToPlay(query)
}
You might wonder why all the streams are wrapped in our effect class.
After all, the are all streams, and are designed to handle side-effects themselves. The reason is
that these Repo
methods have to behave well with the other Repo
methods. For instance, consider
the following example:
def processStream(stream: Source[PState[BlogPost], NotUsed]): SomeEffect[Result] = {
// ...
}
for {
_ <- repo.openConnection
stream <- repo.queryToAkkaStream(query)
result <- processStream(stream)
_ <- repo.closeConnection
} yield result
If the stream was obtained outside the effect, then the connection to the database would probably be closed at the time we tried to process the stream.
All the streaming methods will require you to supply artifacts for the streaming libraries in your own build.