[Python] Tornado mit ThreadPoolExecutor


Answers

Question

Ich habe Setup, das Tornado als http-Server und benutzerdefinierte http-Framework verwendet. Idee ist es, einen einzelnen Tornado-Handler zu haben und jede Anfrage, die ankommt, sollte nur an ThreadPoolExecutor und Tornado , um auf neue Anfragen zu hören. Sobald der Thread die Verarbeitungsanforderung beendet, wird ein Rückruf aufgerufen, der eine Antwort an den Client im selben Thread sendet, in dem die E / A-Schleife ausgeführt wird.

Stripped, Code sieht in etwa so aus. Basis-HTTP-Serverklasse:

class HttpServer():
    def __init__(self, router, port, max_workers):
        self.router = router
        self.port = port
        self.max_workers = max_workers

    def run(self):
        raise NotImplementedError()

Tornado unterstützte Implementierung von HttpServer:

class TornadoServer(HttpServer):
    def run(self):
        executor = futures.ThreadPoolExecutor(max_workers=self.max_workers)

        def submit(callback, **kwargs):
            future = executor.submit(Request(**kwargs))
            future.add_done_callback(callback)
            return future

        application = web.Application([
            (r'(.*)', MainHandler, {
                'submit': submit,
                'router': self.router   
            })
        ])

        application.listen(self.port)

        ioloop.IOLoop.instance().start()

Haupt-Handler, der alle Tornado-Anfragen behandelt (implementiert nur GET, aber andere wären gleich):

class MainHandler():
    def initialize(self, submit, router):
        self.submit = submit
        self.router = router

    def worker(self, request):
        responder, kwargs = self.router.resolve(request)
        response = responder(**kwargs)
        return res

    def on_response(self, response):
        # when this is called response should already have result
        if isinstance(response, Future):
            response = response.result()
        # response is my own class, just write returned content to client
        self.write(response.data)
        self.flush()
        self.finish()

    def _on_response_ready(self, response):
        # schedule response processing in ioloop, to be on ioloop thread
        ioloop.IOLoop.current().add_callback(
            partial(self.on_response, response)
        )

    @web.asynchronous
    def get(self, url):
        self.submit(
            self._on_response_ready, # callback
            url=url, method='post', original_request=self.request
        )

Der Server wird gestartet mit:

router = Router()
server = TornadoServer(router, 1111, max_workers=50)
server.run()

Wie Sie sehen können, _on_response_ready der Haupt-Handler jede Anfrage an den Thread-Pool und wenn die Verarbeitung abgeschlossen ist, wird der Callback aufgerufen ( _on_response_ready ), der die _on_response_ready der Anfrage für die IO-Schleife plant (um sicherzustellen, dass sie auf demselben Thread ausgeführt wird) wo die IO-Schleife ausgeführt wird).

Das funktioniert. Zumindest sieht es so aus.

Mein Problem hier ist die Leistung in Bezug auf maximale Worker in ThreadPoolExecutor.

Alle Handler sind IO-gebunden, es gibt keine Berechnungen (sie warten meistens auf DB oder externe Dienste), so würde ich bei 50 Arbeitern 50 concurent-Anfragen 50-mal schneller als 50 concurent-Anfragen mit nur einem Worker beenden.

Aber das ist nicht der Fall. Was ich sehe, ist fast identische Anfragen pro Sekunde, wenn ich 50 Arbeiter im Thread-Pool und 1 Arbeiter habe.

Zum Messen verwende ich Apache-Bench mit etwas wie:

ab -n 100 -c 10 http://localhost:1111/some_url

Hat jemand eine Idee, was mache ich falsch? Habe ich falsch verstanden, wie Tornado oder ThreadPool funktioniert? Oder Kombination?