X

Mein Profil. Hier einloggen oder registrieren.

25. Januar 2019

Mob Programming: Stateful Streaming

Mob Programming: Stateful Streaming

Am 29.09.18 fand auf dem Otto Campus ein Never Code Alone Event statt, bei dem sich über 30 Scala Enthusiasten trafen, um in interaktiven Mob Programming Sessions gemeinsam zu programmieren.

Insgesamt gab es vier Sessions über den ganzen Tag verteilt. Am Anfang jeder Session gab es eine kurze Einleitung zum Thema. Anschließend wurde dem ersten Mutigen im Publikum eine Funktastatur überreicht und es ging los mit dem Mob Programming.

Die Gestaltung der Sessions war den Speakern frei überlassen, die einzige Prämisse: Die Tastatur soll möglichst schnell in die Reihen der Besucher gehen.

Einer der vier Talks war von uns: Sebastian Schröder und Frederik Mars

mob01

Unsere Session

Das Ziel unseres Talks war es, den Teilnehmern einen Einblick in die Arbeit mit Apache Kafka zu geben und wie man darauf aufbauend einen stateful-streaming Service bauen kann. Als Beispiel haben wir uns die Berechnung der Artikelseiten-Aufrufe pro Sekunde von Otto.de herausgesucht. Diese Metrik sollte dann in ein Metrics-Topic geschrieben werden. Ein besonderes Detail dabei war die Sicherstellung einer at-least-once Delivery Semantic (dazu später mehr).

Ein paar Worte zu Kafka
Die offizielle Kafka-Dokumentation erklärt in ihrem Intro gut und 
knapp die Grundkonzepte von Apache Kafka.

Um unserer Implementierung folgen zu können, sollte man in der Kafka-Dokumentation etwas über Partitionen und Topics gelesen haben.

Der Einfachheit halber arbeiteten wir in unserer Session nur mit einer Partition. Es wären aber nur kleine Anpassungen notwendig, um zum Beispiel 24 Partitionen eines Topics zu konsumieren.

Unsere Session haben wir mit einer Kurzeinführung in Kafka begonnen und sind dann in die gemeinsame Implementierung übergegangen. Basis hierfür war ein von uns vorbereiteter Microservice, welcher Daten aus einem Kafka Topic konsumiert und diese auf der Konsole ausgibt. Die Daten liegen in folgender Struktur vor:

case class Click(
  clickType: ClickType,    // Page || Action
  name: String,
  timestamp: Long,
  browser: String,
  tabId: Option[String] = None
){
    val epochTimestamp: Long = timestamp / 1000
}

Um die Daten möglichst einfach zu konsumieren, haben wir eine Poll-Loop vorbereitet.

while (running.get()) {
  consumer
    .poll(100)
    .flatMap(record => Click.parse(record.value()))
    .foreach(click => 
      // Hier wird im Folgenden die Logik implementiert.
      logger.info(s"$click")
    )
}

Die erste Aufgabe war nun das einfache Zählen der Artikelseiten-Aufrufe. An dieser Stelle haben wir die Tastatur nun ins Publikum gereicht.

mob02

.Zusammen entschieden wir uns zunächst für das einfache Erstellen einer globalen Variable, deren Wert wir in jedem Durchlauf des Poll-Loops aktualisiert haben

var pageViewCounter: Long = 0
[...]
.foreach(click =>
  if(click.clickType == Page) {
    pageViewCounter += 1
  }
)

Mittels einer Log-Ausgabe konnten wir nun mit ansehen, wie unser Counter die Seitenaufrufe von Otto.de mitzählt. Die Ausgangsfragestellung erforderte jedoch das Messen der Seitenaufrufe pro Sekunde, also als Rate.

Im nächsten Schritt haben wir den Microservice deswegen so modifiziert, dass  wir diese Rate messen konnten. Dazu haben wir eine Map von Sekunde auf Anzahl Nachrichten (Long => Int) erzeugt, die wir mit jeder Nachricht aktualisiert haben. Hierzu haben wir die Anzahl der Nachrichten unter dem Timestamp der Nachricht inkrementiert.

var pageViewMap = Map.empty[Long, Int]
var currentTime: Long = 0
[...] 
.foreach(click =>
  if (click.clickType == Page) {
    val time = click.epochTimestamp
    val count = pageViewMap.getOrElse(time, 0)
    pageViewMap = pageViewMap + (time -> (count + 1)) // inkrementieren
    if (time != currentTime) { // abgeschlossene Sekunde
      val pageViewsOfSecond = pageViewMap.getOrElse(currentTime, 0)
      logger.info(pageViewsOfSecond)
      pageViewMap = pageViewMap - currentTime
      currentTime = time
    }
  }
)

Die ermittelten Metriken wollen wir nun anderen Konsumenten bereitstellen. Dafür schreiben wir die Raten der jeweils abgeschlossenen Sekunde in ein weiteres Kafka-Topic. Um den Wechsel zu einer neuen Sekunde festzustellen, prüfen wir, ob die Sekunde der aktuellen Nachricht eine Sekunden-Grenze überschreitet. Hierzu nutzen wir die persistierte Zeit der Nachricht aus dem vorherigen Durchlauf.

Wenn nun eine neue Sekunde auftritt, bedeutet das also, dass wir alle Seitenaufrufe der letzten Sekunde gezählt haben. In dem Moment wollen wir die Metrik in das andere Kafka-Topic schreiben. Das Senden der Nachricht erfolgt mittels Kafka-Producer-API:

producer.send(
  new ProducerRecord[String, String](
    metricsTopic,
    KafkaConfig.Partition,
    currentTime.toString, // Key
    pageViewsOfSecond.toString // Value
))

Damit haben wir das Messen der Daten bereits in der Qualität vorliegen, wie wir es uns gewünscht haben. Nun müssen wir noch sicherstellen, dass wir jede Nachricht erfolgreich konsumieren und in unsere Messung einfließen lassen.

Bei einem unerwarteten Absturz der Anwendung kann es jedoch dazu führen, dass wir Daten verlieren: Angenommen wir holen der einfachheitshalber pro poll je eine Nachricht, dann kriegen wir mit dem ersten poll die Nachricht n0. Der nächste poll-loop holt die Nachricht n1 und committed die Offsets fürn0 (die Standardeinstellung). n1 könnte nun ein producer.send auslösen. Angenommen unsere App crashed bei diesemsendwürde sie nach ihrem Neustart bei n1w ieder aufsetzen, da die Offsets vorher committed wurden. Somit hätten wir die Daten aus n0 verloren.

Um dieses Problem aufzulösen, werden wir die automatische Verwaltung der Offsets deaktivieren und diese manuell durchführen.

producer.send(
  new ProducerRecord[String, String](
    metricsTopic, 
    KafkaConfig.Partition, 
    currentTime.toString, 
    pageViewsOfSecond.toString),
  new Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {						           Option(exception) match {
        case None => 
          consumer.commitAsync(Map(new TopicPartition(sourceTopic, KafkaConfig.Partition) -> savedOffset))
        case Some(e) => logger.error(e.getMessage)
      }
    }
  })

Sobald eine Sekunde abgeschlossen ist, senden wir die Nachricht in unser Ziel-Topic. War das Schreiben erfolgreich committen wir die Leseposition der vorherigen Nachricht. Somit fangen wir bei einem Neustart immer an einer Sekunden-Grenze an und ermitteln dadurch korrekte Daten.

Stürzt unser Service nach dem erfolgreichen Absenden der Nachricht und vor dem Committen des Offsets, werden Nachrichten potentiell doppelt geschrieben (at least once semantics), wir werden aber niemals Daten verlieren.

Nach 90 Minuten sind wir dann am Ende unserer Session angekommen. Das Endergebnis konnte sich sehen lassen: Ein Microservice, der die Artikelseiten-Aufrufe von Otto.de misst und die gemessenen Werte in ein weiteres Topic schreibt, so dass weitere Services diese Metriken benutzen könnten.

mob03

Conclusion

Uns hat die Session sehr viel Spaß gemacht. Alle haben sehr aktiv mitgemacht, mitprogrammiert und Fragen gestellt. Es führt immer wieder zu einem coolen Ergebnis, wenn viele Leute, die aus unterschiedlichen Richtungen kommen, auf dem gleichen Problem rum denken und sich austauschen. Gerade die entstandenen Diskussionen haben uns nochmal verdeutlicht, dass eine Mob-Programming-Session sehr geeignet ist, um Wissen zu vermitteln und gemeinsam auf Problemen herum zu denken. Diese Erfahrung nehmen wir auch mit in unsere Teams, mit dem Ziel, Mob-Programming in solchen Situationen in Zukunft öfters einzusetzen.

Ein paar Stimmen zu dem Event wurden hier eingefangen.

Hier ist der Code, den wir in der Session zusammen geschrieben haben.

0Noch keine Kommentare

Dein Kommentar
Antwort auf:  Direkt auf das Thema antworten
1590 - 4
Geschrieben von
Frederik Mars
Development

Diese Webseite verwendet Cookies. Durch die Nutzung der Webseite stimmen Sie der Verwendung von Cookies zu. Weitere Informationen: Datenschutzinformationen