Eenvoudig ontwerp van Kotlin Flow

Flow door Grant Tarrant

In een eerder verhaal over koude stromen, hete kanalen¹ heb ik koude en hete gegevensstromen gedefinieerd en een use-case voor Kotlin Flows getoond - koude asynchrone stromen. Laten we nu eens een kijkje nemen onder de motorkap, hun ontwerp bekijken en zien hoe een combinatie van taalfuncties en een bibliotheek een krachtige abstractie met eenvoudig ontwerp mogelijk maakt.

Een stroom in Kotlin wordt vertegenwoordigd door een interface²:

interface Flow  {
    suspend fun collect (verzamelaar: FlowCollector )
}

Het enige dat een flow bevat, is een enkele verzamelfunctie die een exemplaar van de FlowCollector-interface accepteert met een enkele emit-methode:

interface FlowCollector  {
    suspend fun emit (waarde: T)
}

Een uitzendnaam moet bekend voorkomen bij een lezer van "Koude stromen, hete kanalen". Inderdaad, daar heb ik een voorbeeld getoond van de volgende stroomdefinitie:

val ints: Flow  = flow {
    voor (i in 1..10) {
        vertraging (100)
        emit (i) // <- emit wordt hier genoemd
    }
}

Een handtekening van de flowbuilder gebruikt ook een FlowCollector-interface als ontvanger³, zodat we rechtstreeks kunnen uitzenden vanuit de body van de overeenkomstige lambda:

leuke  flow (blok: suspend FlowCollector . () -> Unit): Flow 

Voor een eenvoudig gebruik van een stroom, wanneer de stroom wordt verzameld, als volgt:

ints.collect {println (it)} // duurt 1 seconde, drukt 10 int af

wat er gebeurt is dat een instantie van FlowCollector is gemaakt op basis van de lambda die is doorgegeven om de functie {…} te verzamelen en deze instantie wordt vervolgens doorgegeven aan de stroom {…} body⁴.

Een interactie tussen een flow-emitter en een flow-collector is dus die van een eenvoudige functieaanroep - een call of emit-functie. Als we deze functieaanroep mentaal inline plaatsen, kunnen we onmiddellijk begrijpen wat er gebeurt als we deze code uitvoeren⁵ - deze komt overeen met:

voor (i in 1..10) {
    vertraging (100)
    println (i) // <- emit werd hier genoemd
}

operators

Een flow builder en een verzamel terminal operator is alles wat we moeten weten om te beginnen met het schrijven van operators die stromen op verschillende manieren transformeren. Een eenvoudige kaartoperator die een gespecificeerde transformatie toepast op elke uitgezonden waarde kan als volgt worden geïmplementeerd:

fun  Flow  .map (transform: suspend (waarde: T) -> R) = flow {
    collect {emit (transform (it))}
}

Met deze operator kunnen we nu ints.map {it * it} doen om een ​​stroom te definiëren met vierkanten van de oorspronkelijke gehele getallen. Elementen stromen nog steeds van de zender naar de collector via functieaanroepen. Er zit nu gewoon nog een functie tussen.

De kotlinx.coroutines-bibliotheek definieert eigenlijk al map en een groot aantal andere algemene operatoren als uitbreidingen op het Flow-type, volgens een extensiegerichte ontwerpaanpak. Wat belangrijk is in dit ontwerp, is dat het vrij eenvoudig is om domein-specifieke operatoren te definiëren. Er is geen onderscheid tussen "ingebouwde" en "door de gebruiker gedefinieerde" operators - alle operators zijn eersteklas.

Tegendruk

Tegendruk in softwaretechnologie wordt gedefinieerd als het vermogen van een dataconsument die inkomende gegevens niet kan bijhouden om een ​​signaal naar de gegevensproducent te sturen om de snelheid van de gegevenselementen te vertragen.

Het traditionele reactieve streams-ontwerp omvat een back-channel om indien nodig meer gegevens van producenten op te vragen. Het beheer van dit aanvraagprotocol leidt tot notoir moeilijke implementaties, zelfs voor eenvoudige operators. We zien deze complexiteit niet in het ontwerp van Kotlin-stromen, noch in de implementatie van operators voor hen, maar Kotlin-stromen ondersteunen wel tegendruk. Hoe komt?

Transparant tegendrukbeheer wordt bereikt in Kotlin-stromen via het gebruik van Kotlin-opschortingsfuncties. Het is je misschien opgevallen dat alle functies en functionele typen in het Kotlin-stroomontwerp zijn gemarkeerd met de modifier voor onderbreken - deze functies hebben een superkracht om de uitvoering van de beller op te schorten zonder een thread te blokkeren⁹. Dus wanneer de verzamelaar van de stroom wordt overweldigd, kan deze de zender eenvoudig onderbreken en later hervatten wanneer deze klaar is om meer elementen te accepteren.

Dit is vrij gelijkaardig aan tegendrukbeheer in traditionele thread-gebaseerde synchrone datapijplijnen, waar een langzame consument automatisch tegendruk op de producent uitoefent door de thread van de producent te blokkeren. Opschortende functies gaan verder dan een enkele thread en komen in het rijk van asynchrone programmering, door transparant tegendruk over de threads te beheren zonder ze te blokkeren. Maar dat moet in een ander verhaal worden verteld.

Verder lezen en voetnoten

  1. ^ Koude stromen, hete kanalen
  2. ^ Flow en gerelateerde typen en functies zijn nog steeds in preview vanaf versie 1.2.1 van de bibliotheek kotlinx.coroutines. Lees hier meer.
  3. ^ Functietypen in Kotlin
  4. ^ Dit is een kleine vereenvoudiging. Er wordt geen rekening gehouden met extra controles om de context te behouden, maar dat onderwerp valt buiten het bestek van dit verhaal. Meer details in de uitvoeringscontext van Kotlin Flows.
  5. ^ Je kunt deze code hier via Kotlin Playground uitvoeren.
  6. ^ Op uitbreiding gericht ontwerp
  7. ^ Reactieve streams
  8. ^ Implementatie van operatoren voor [RxJava] 2.0
  9. ^ Blokkerende draden, hangende coroutines