scala - Akka HTTP: il blocco in un futuro blocca il server




future akka-http (2)

Sto cercando di usare Akka HTTP per autenticare la mia richiesta di base. Accade così che io abbia una risorsa esterna per autenticarmi, quindi devo fare una chiamata di riposo a questa risorsa.

Ciò richiede un po 'di tempo e, mentre è in fase di elaborazione, sembra che il resto della mia API sia bloccato, in attesa di questa chiamata. Ho riprodotto questo con un esempio molto semplice:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

Se pubblico l'endpoint del registro, anche il mio endpoint get viene bloccato in attesa dei 5 secondi, che l'endpoint del registro ha dettato.

È questo comportamento previsto, e se lo è, come faccio a fare operazioni di blocco senza bloccare la mia intera API?


Quello che osservi è un comportamento previsto, ma ovviamente è molto brutto. Bene, esistono soluzioni conosciute e buone pratiche per proteggerci. In questa risposta mi piacerebbe passare un po 'di tempo per spiegare il problema a breve, a lungo, e poi in profondità - buona lettura!

Risposta breve : " non bloccare l'infrastruttura di routing! ", Utilizzare sempre un dispatcher dedicato per le operazioni di blocco!

Causa del sintomo osservato: il problema è che stai usando context.dispatcher come il dispatcher su cui si context.dispatcher i futuri di blocco. Lo stesso dispatcher (che in termini semplici è solo un "grappolo di thread") viene utilizzato dall'infrastruttura di routing per gestire effettivamente le richieste in arrivo, quindi se blocchi tutti i thread disponibili, finisci per affamare l'infrastruttura di routing. (Una cosa su per il dibattito e il benchmarking è che se Akka HTTP potrebbe proteggerlo, lo aggiungerò alla mia lista di ricerche).

Il blocco deve essere trattato con particolare attenzione per non influire sugli altri utenti dello stesso dispatcher (motivo per cui rendiamo così semplice separare l'esecuzione su diversi), come spiegato nella sezione Documenti di Akka: Il blocco richiede un'attenta gestione .

Qualcos'altro che volevo attirare l'attenzione qui è che si dovrebbe evitare di bloccare le API, se possibile - se l'operazione a lungo termine non è in realtà una sola operazione, ma una serie di esse, si potrebbe averle separate su diversi attori, o futuri sequenziati. In ogni caso, volevo solo sottolineare - se possibile, evitare tali chiamate di blocco, ma se necessario - allora la seguente spiega come gestirle adeguatamente.

Analisi e soluzioni approfondite :

Ora che sappiamo cosa è sbagliato, concettualmente, diamo un'occhiata a cosa è esattamente infranto nel codice sopra, e come appare la soluzione giusta a questo problema:

Colore = stato del filo:

  • turchese - DORMIRE
  • arancione - ATTESA
  • verde - RUNNABLE

Ora esaminiamo 3 parti di codice e come l'impatto degli spedizionieri e le prestazioni dell'app. Per forzare questo comportamento l'app è stata messa sotto il seguente carico:

  • [a] continua a richiedere richieste GET (vedi il codice sopra nella prima domanda per quello), non sta bloccando lì
  • [b] poi dopo un po 'di fuoco POST 2000 richieste, che causerà il blocco di 5 secondi prima di restituire il futuro

1) [bad] comportamento del Dispatcher su codice errato :

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

Quindi esporremo la nostra app a [un] caricamento, e puoi già vedere un numero di thread akka.actor.default-dispatcher - stanno gestendo le richieste - piccolo snippet verde, e arancione significa che gli altri sono effettivamente inattivi lì.

Quindi iniziamo il caricamento [b], che causa il blocco di questi thread: è possibile vedere un thread iniziale "default-dispatcher-2,3,4" che va in blocco dopo essere stato inattivo prima. Osserviamo anche che il pool cresce: vengono avviati nuovi thread "default-dispatcher-18,19,20,21 ..." tuttavia vanno a dormire immediatamente (!) - stiamo sprecando risorse preziose qui!

Il numero di tali thread avviati dipende dalla configurazione predefinita del dispatcher, ma probabilmente non supererà i 50 circa. Dato che abbiamo appena attivato 2k blocking ops, facciamo morire di fame l'intero threadpool - le operazioni di blocco dominano in modo tale che l'infra del routing non ha thread disponibili per gestire le altre richieste - molto male!

Facciamo qualcosa a riguardo (che è una Akka best practice btw - isolare sempre il comportamento di blocco come mostrato di seguito):

2) [good!] Comportamento del dispatcher codice / dispatcher ben strutturati :

Nel tuo application.conf configura questo dispatcher dedicato al comportamento di blocco:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

Dovresti leggere di più nella documentazione di Akka Dispatchers , per comprendere le varie opzioni qui. Il punto principale è che abbiamo scelto un ThreadPoolExecutor che ha un limite rigido di thread che mantiene disponibile per le operazioni di blocco. Le impostazioni delle dimensioni dipendono da cosa fa la tua app e da quanti core ha il tuo server.

Quindi dobbiamo usarlo, al posto di quello predefinito:

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

Facciamo pressione sull'app utilizzando lo stesso carico, prima un po 'di richieste normali e poi aggiungiamo quelle di blocco. Questo è il modo in cui ThreadPools si comporterà in questo caso:

Quindi inizialmente le normali richieste sono facilmente gestite dal dispatcher predefinito, è possibile vedere alcune linee verdi lì - questa è l'effettiva esecuzione (non sto davvero mettendo il server sotto carico pesante, quindi è in gran parte inattivo).

Ora, quando iniziamo a pubblicare le operazioni di blocco, my-blocking-dispatcher-* attiva e avvia il numero di thread configurati. Gestisce tutto il sonno in là. Inoltre, dopo un certo periodo di non accadendo nulla su quei fili, li chiude. Se dovessimo colpire il server con un altro gruppo di blocchi, la piscina inizierà nuovi thread che si prenderanno cura del sonno (), ma nel frattempo - non stiamo sprecando le nostre preziose discussioni su "resta lì e fare niente".

Quando si utilizza questa configurazione, il throughput delle normali richieste GET non è stato influenzato, erano ancora felicemente serviti sul dispatcher predefinito (ancora piuttosto gratuito).

Questo è il modo consigliato di affrontare qualsiasi tipo di blocco nelle applicazioni reattive. Spesso viene definito "bulkheading" (o "isolando") le parti che si comportano male di un'app, in questo caso il comportamento errato sta dormendo / bloccando.

3) [workaround-ish] Comportamento del dispatcher durante il blocking applicato correttamente :

In questo esempio utilizziamo lo scaladoc per il metodo scala.concurrent.blocking che può essere d'aiuto quando si affrontano i blocchi di operazioni. In genere, è necessario eseguire un maggior numero di thread per sopravvivere alle operazioni di blocco.

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

L'app si comporterà in questo modo:

Noterai che sono stati creati MOLTI nuovi thread, questo perché i suggerimenti di blocco su "oh, questo verrà bloccato, quindi abbiamo bisogno di più thread". Questo fa sì che il tempo totale in cui siamo bloccati sia minore rispetto all'esempio 1), tuttavia abbiamo centinaia di thread che non fanno nulla dopo che le operazioni di blocco sono finite ... Certo, alla fine verranno chiuse (l'FJP fa questo ), ma per un po 'avremo una grande quantità (incontrollata) di thread in esecuzione, in contrasto con la 2) soluzione, in cui sappiamo esattamente quanti thread stiamo dedicando per i comportamenti di blocco.

Riassunto : non bloccare mai il dispatcher predefinito :-)

La migliore pratica è usare il modello mostrato in 2) , avere un dispatcher per le operazioni di blocco disponibili ed eseguirli lì.

Spero che questo aiuti, felice hakking !

Versione di Akka HTTP discussa : 2.0.1

Profiler utilizzato: molte persone mi hanno chiesto in risposta a questa risposta in privato quale profiler ho usato per visualizzare gli stati Thread nelle foto sopra, quindi aggiungendo queste informazioni qui: Ho usato YourKit che è un fantastico profiler commerciale (gratuito per OSS), sebbene puoi ottenere gli stessi risultati usando il VisualVM gratuito di OpenJDK .


Strano, ma per me tutto funziona bene (nessun blocco). Ecco il codice:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer

import scala.concurrent.Future


object Main {

  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val routes: Route = (post & entity(as[String])) { e =>
    complete {
      Future {
        Thread.sleep(5000)
        e
      }
    }
  } ~
    (get & path(Segment)) { r =>
      complete {
        "get"
      }
    }

  def main(args: Array[String]) {

    Http().bindAndHandle(routes, "0.0.0.0", 9000).onFailure {
      case e =>
        system.shutdown()
    }
  }
}

Inoltre puoi avvolgere il codice asincrono nella direttiva onComplete o onSuccess :

onComplete(Future{Thread.sleep(5000)}){e} 

onSuccess(Future{Thread.sleep(5000)}){complete(e)}




akka-http