longevity

A Persistence Framework for Scala and NoSQL

View project on GitHub

stream by query

The queryToIterator and queryToFutureVec methods we looked at in the previous section leave much to be desired. A streaming approach would be better, but which streaming library should we choose? Scala has many. Currently, longevity supports four of them:

Here’s an example that uses all four:

import akka.NotUsed
import akka.stream.scaladsl.Source
import cats.Eval
import fs2.Stream
import fs2.Task
import io.iteratee.{ Enumerator => CatsEnumerator }
import longevity.persistence.PState
import play.api.libs.iteratee.{ Enumerator => PlayEnumerator }
 
val blog: Blog = getBlogFromSomewhere()

val query: Query[BlogPost] = {
  import com.github.nscala_time.time.Imports._
  import BlogPost.queryDsl._
  import BlogPost.props._
  blogUri eqs blog.blogUri and postDate gt DateTime.now - 1.week
}

val akkaSource: Source[PState[BlogPost], NotUsed] =
  blogPostRepo.queryToAkkaStream(query)

val fs2Stream: Stream[Task, PState[P]] =
  blogPostRepo.queryToFS2(query)

val catsEnumerator: CatsEnumerator[Eval, PState[P]] =
  blogPostRepo.queryToIterateeIo[Eval](query)

val playEnumerator: PlayEnumerator[PState[P]] = {
  import scala.concurrent.ExecutionContext.Implicits.global
  blogPostRepo.queryToPlay(query)
}

All four of these methods will require you to supply artifacts for the streaming libraries in your own build.

prev: retrieval by query
up: queries
next: cassandra query limitations