λÆS Data Structures

The yaes-data module provides a collection of functional data structures that complement the λÆS effects system. These data structures are designed to work seamlessly with λÆS effects and provide efficient, functional alternatives to traditional imperative data structures.

Maven Central javadoc

Installation

Add the following dependency to your build.sbt:

libraryDependencies += "in.rcard.yaes" %% "yaes-data" % "0.5.0"

Available Data Structures

Flow

A cold asynchronous data stream that sequentially emits values, similar to reactive streams but designed for functional programming.


Flow

A Flow is a cold asynchronous data stream that sequentially emits values and completes normally or with an exception. It’s conceptually similar to Iterators from the Collections framework but designed for asynchronous operation.

Key Characteristics

Key Differences from Iterator

Feature Iterator Flow
Execution Synchronous Asynchronous
Transformation Limited Rich operator set
Observation Direct access Collect method
Composition Basic Highly composable

Creating Flows

From Explicit Emissions

import in.rcard.yaes.Flow

val numbersFlow: Flow[Int] = Flow.flow[Int] {
  Flow.emit(1)
  Flow.emit(2)
  Flow.emit(3)
}

From Collections

import in.rcard.yaes.Flow

val listFlow: Flow[Int] = List(1, 2, 3).asFlow()
val setFlow: Flow[String] = Set("a", "b", "c").asFlow()

From Varargs

import in.rcard.yaes.Flow

val flow: Flow[String] = Flow("hello", "world", "!")

Empty and Single Value Flows

import in.rcard.yaes.Flow

val emptyFlow: Flow[Int] = Flow.empty[Int]
val singleFlow: Flow[String] = Flow.just("single value")

Collecting Flow Values

Use the collect method to observe and consume flow values:

import in.rcard.yaes.Flow
import scala.collection.mutable.ArrayBuffer

val result = ArrayBuffer[Int]()
Flow(1, 2, 3).collect { value =>
  result += value
}
// result now contains: [1, 2, 3]

Transformation Operators

map

Transform each emitted value:

import in.rcard.yaes.Flow

val stringFlow = Flow(1, 2, 3)
  .map(_.toString)
  .map("Number: " + _)

// Emits: "Number: 1", "Number: 2", "Number: 3"

filter

Keep only values matching a predicate:

import in.rcard.yaes.Flow

val evenNumbers = Flow(1, 2, 3, 4, 5, 6)
  .filter(_ % 2 == 0)

// Emits: 2, 4, 6

transform

Apply complex transformations with full control:

import in.rcard.yaes.Flow

val expandedFlow = Flow(1, 2, 3)
  .transform { value =>
    Flow.emit(value)
    Flow.emit(value * 10)
  }

// Emits: 1, 10, 2, 20, 3, 30

take

Limit the number of emitted values:

import in.rcard.yaes.Flow

val limitedFlow = Flow(1, 2, 3, 4, 5)
  .take(3)

// Emits: 1, 2, 3

drop

Skip the first n values:

import in.rcard.yaes.Flow

val skippedFlow = Flow(1, 2, 3, 4, 5)
  .drop(2)

// Emits: 3, 4, 5

onEach

Perform side effects without changing the flow:

import in.rcard.yaes.Flow
import scala.collection.mutable.ArrayBuffer

val sideEffects = ArrayBuffer[String]()

val monitoredFlow = Flow(1, 2, 3)
  .onEach { value =>
    sideEffects += s"Processing: $value"
  }
  .map(_ * 2)

// Side effects: ["Processing: 1", "Processing: 2", "Processing: 3"]
// Flow emits: 2, 4, 6

onStart

Execute an action before flow collection starts:

import in.rcard.yaes.Flow

val initFlow = Flow(1, 2, 3)
  .onStart {
    Flow.emit(0) // Prepend 0 to the flow
  }

// Emits: 0, 1, 2, 3

Terminal Operators

fold

Accumulate values to a single result:

import in.rcard.yaes.Flow

val sum = Flow(1, 2, 3, 4, 5)
  .fold(0) { (acc, value) => acc + value }

// Result: 15

val product = Flow(1, 2, 3, 4)
  .fold(1) { (acc, value) => acc * value }

// Result: 24

count

Count the number of emitted values:

import in.rcard.yaes.Flow

val count = Flow("a", "b", "c", "d")
  .filter(_.length > 0)
  .count()

// Result: 4

Practical Examples

Data Processing Pipeline

import in.rcard.yaes.Flow
import scala.collection.mutable.ArrayBuffer

case class User(id: Int, name: String, age: Int)

val users = List(
  User(1, "Alice", 25),
  User(2, "Bob", 17),
  User(3, "Charlie", 30),
  User(4, "Diana", 16)
)

val results = ArrayBuffer[String]()

users.asFlow()
  .filter(_.age >= 18) // Only adults
  .map(user => s"${user.name} (${user.age})")
  .onEach(user => println(s"Processing: $user"))
  .collect { userInfo =>
    results += userInfo
  }

// Results: ["Alice (25)", "Charlie (30)"]

Async Data Transformation

import in.rcard.yaes.Flow
import in.rcard.yaes.Async.*

def processDataAsync(data: List[Int])(using Async): List[String] = {
  val results = scala.collection.mutable.ArrayBuffer[String]()
  
  data.asFlow()
    .map { value =>
      // Simulate async processing
      Async.delay(100.millis)
      s"Processed: $value"
    }
    .collect { result =>
      results += result
    }
  
  results.toList
}

val result = Async.run {
  processDataAsync(List(1, 2, 3, 4, 5))
}

Event Stream Processing

import in.rcard.yaes.Flow
import in.rcard.yaes.Log.*

sealed trait Event
case class UserLogin(userId: Int, timestamp: Long) extends Event
case class UserLogout(userId: Int, timestamp: Long) extends Event
case class PageView(userId: Int, page: String) extends Event

def processEvents(events: List[Event])(using Log): Map[Int, Int] = {
  val logger = Log.getLogger("EventProcessor")
  val userPageViews = scala.collection.mutable.Map[Int, Int]()
  
  events.asFlow()
    .onStart {
      logger.info("Starting event processing")
    }
    .filter {
      case _: PageView => true
      case _ => false
    }
    .map(_.asInstanceOf[PageView])
    .onEach { pageView =>
      logger.debug(s"Processing page view: ${pageView.page} for user ${pageView.userId}")
    }
    .collect { pageView =>
      userPageViews.updateWith(pageView.userId) {
        case Some(count) => Some(count + 1)
        case None => Some(1)
      }
    }
  
  logger.info(s"Processed ${userPageViews.values.sum} page views")
  userPageViews.toMap
}

Complex Data Pipeline

import in.rcard.yaes.Flow
import in.rcard.yaes.Raise.*

case class RawData(value: String)
case class ParsedData(number: Int)
case class ProcessedData(result: String)

def dataProcessingPipeline(
  rawData: List[RawData]
)(using Raise[String]): List[ProcessedData] = {
  val results = scala.collection.mutable.ArrayBuffer[ProcessedData]()
  
  rawData.asFlow()
    .transform { raw =>
      // Parse with error handling
      try {
        val number = raw.value.toInt
        Flow.emit(ParsedData(number))
      } catch {
        case _: NumberFormatException =>
          Raise.raise(s"Invalid number format: ${raw.value}")
      }
    }
    .filter(_.number > 0) // Only positive numbers
    .map { parsed =>
      ProcessedData(s"Result: ${parsed.number * 2}")
    }
    .take(10) // Limit output
    .collect { processed =>
      results += processed
    }
  
  results.toList
}

// Usage
val input = List(RawData("1"), RawData("2"), RawData("invalid"), RawData("3"))

val result = Raise.either {
  dataProcessingPipeline(input)
}

result match {
  case Right(data) => println(s"Processed: $data")
  case Left(error) => println(s"Error: $error")
}

Integration with λÆS Effects

Flow works seamlessly with λÆS effects:

import in.rcard.yaes.Flow
import in.rcard.yaes.Random.*
import in.rcard.yaes.Output.*
import in.rcard.yaes.Log.*

def randomDataProcessor(using Random, Output, Log): List[Int] = {
  val logger = Log.getLogger("RandomProcessor")
  val results = scala.collection.mutable.ArrayBuffer[Int]()
  
  // Generate random data flow
  val randomFlow = Flow.flow[Int] {
    for (i <- 1 to 10) {
      val randomValue = Random.nextInt(100)
      Flow.emit(randomValue)
    }
  }
  
  randomFlow
    .onStart {
      logger.info("Starting random data processing")
      Output.printLn("Processing random data...")
    }
    .filter(_ > 50) // Only values > 50
    .onEach { value =>
      Output.printLn(s"Processing value: $value")
    }
    .map(_ * 2)
    .collect { result =>
      results += result
    }
  
  logger.info(s"Processed ${results.length} values")
  results.toList
}

// Run with effects
val result = Log.run {
  Output.run {
    Random.run {
      randomDataProcessor
    }
  }
}

Best Practices

1. Use Appropriate Operators

Choose the right operator for your use case:

2. Handle Side Effects Properly

Use onEach for side effects that don’t change the flow:

import in.rcard.yaes.Flow

Flow(1, 2, 3)
  .onEach(value => println(s"Debug: $value")) // Side effect
  .map(_ * 2) // Transformation

3. Combine with Error Handling

Use Flow with the Raise effect for robust error handling:

import in.rcard.yaes.Flow
import in.rcard.yaes.Raise.*

def safeProcessing(data: List[String])(using Raise[String]): List[Int] = {
  val results = scala.collection.mutable.ArrayBuffer[Int]()
  
  data.asFlow()
    .transform { str =>
      try {
        Flow.emit(str.toInt)
      } catch {
        case _: NumberFormatException =>
          Raise.raise(s"Invalid number: $str")
      }
    }
    .collect { number =>
      results += number
    }
  
  results.toList
}

4. Performance Considerations

Future Enhancements

The yaes-data module is actively developed. Future versions may include:

Contributing

Contributions to the λÆS Data module are welcome! Areas where help is needed:

See the contributing guide for more information.