From 3cef83950c3a9d6ee867a39f5dddea406c558bb2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Daniel=20Grie=C3=9Fhaber?= <griesshaber@hdm-stuttgart.de>
Date: Tue, 21 May 2024 13:42:10 +0200
Subject: [PATCH] move to thread based separation of backend and optimizer

---
 api/main.py         |  7 +---
 api/optimization.py | 96 ++++++++++++++++++---------------------------
 2 files changed, 39 insertions(+), 64 deletions(-)

diff --git a/api/main.py b/api/main.py
index 007d38a..47752df 100644
--- a/api/main.py
+++ b/api/main.py
@@ -10,7 +10,7 @@ from api.routers import runs
 # see https://github.com/tiangolo/fastapi/issues/3091#issuecomment-821522932 and https://github.com/encode/starlette/issues/1094#issuecomment-730346075 for heavy-load computation
 
 DEBUG = True
-backend = None
+backend: MultiProcessOptimizer | None = None
 
 
 @asynccontextmanager
@@ -28,11 +28,6 @@ async def lifespan(app: FastAPI):
         app.openapi()
 
         yield
-        # Unload the backend freeing used ressources by the separate process
-        # automatically done when with block is exited
-        print("Releasing resources")
-    # release remaining allocations
-    del backend
     # TODO somehow not all ressources are released upon uvicorn reload, need to investigate further..
 
 
diff --git a/api/optimization.py b/api/optimization.py
index 1119810..de5908d 100644
--- a/api/optimization.py
+++ b/api/optimization.py
@@ -1,58 +1,19 @@
 from argparse import Namespace
-from concurrent.futures import ProcessPoolExecutor
+from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
+from functools import partial
+from typing import ClassVar
 
 from evolution import GeneticAlgorithm
 from models import Llama2, LLMModel
 from task import SentimentAnalysis
 
-# def create_model():
-#     global optimizer
-#     optimizer = Optimizer(debug=DEBUG)
-
-_evolution_model: LLMModel | None = None
-_evaluation_model: LLMModel | None = None
-# _model_call_type: get_type_hints(LLMModel).get("__call__")
-
-
-def _setup_models() -> None:
-    global _evolution_model, _evaluation_model
-
-    if _evolution_model is not None:
-        raise Exception("Evolution model has already been initialized.")
-
-    # currently fix model
-    options = Namespace(llama_path="./models/llama-2-13b-chat.Q5_K_M.gguf", chat=True)
-    _evolution_model = Llama2(options)
-
-    if _evaluation_model is not None:
-        raise Exception("Evaluation model has already been initialized.")
-
-    # currently fix model
-    _evaluation_model = _evolution_model
-
-
-def _release_models() -> None:
-    global _evolution_model, _evaluation_model
-    del _evolution_model
-    del _evaluation_model
-
-
-def _call_evolution_model(*args, **kwargs):
-    return _evolution_model(*args, **kwargs)
-
-
-def _call_evaluation_model(*args, **kwargs):
-    return _evaluation_model(*args, **kwargs)
-
-
-def f():
-    pass
-
 
 class MultiProcessOptimizer:
-    _instance: "MultiProcessOptimizer" = None
+    _instance: ClassVar["MultiProcessOptimizer"] = None
     _running: bool = False
     model_exec: ProcessPoolExecutor = None
+    _evolution_model: LLMModel | None = None
+    _evaluation_model: LLMModel | None = None
 
     def __new__(cls, *args, **kwargs):
         # only allow to create one instance (singleton pattern)
@@ -67,17 +28,17 @@ class MultiProcessOptimizer:
     def __enter__(self):
         # TODO allow to customize optimizer
 
-        # create necessary models
         # initialize worker processes; only 1 worker since prediction is memory-intensive
         # since we only have 1 worker we just save the state in the global namespace which the single worker accesses
-        self.model_exec = ProcessPoolExecutor(max_workers=1, initializer=_setup_models)
+        self.model_exec = ThreadPoolExecutor(
+            max_workers=1, initializer=self._setup_models
+        )
         # make sure that initializer is called
-        self.model_exec.submit(f).result()
         evolution_model = lambda *args, **kwargs: self.model_exec.submit(
-            _call_evolution_model, *args, **kwargs
+            self._call_evolution_model, *args, **kwargs
         ).result()
         evaluation_model = lambda *args, **kwargs: self.model_exec.submit(
-            _call_evaluation_model, *args, **kwargs
+            self._call_evaluation_model, *args, **kwargs
         ).result()
 
         # currently fix task
@@ -95,17 +56,36 @@ class MultiProcessOptimizer:
         )
 
     def __exit__(self, exc_type, exc_value, exc_tb):
-        print(f"Shutting down")
-        self._submit(_release_models).result()
+        self.model_exec.submit(self._release_models)
         self.model_exec.shutdown(False)
         self.model_exec = None
 
-    def _submit(self, fn, *fn_args, **fn_kwargs):
-        if self.model_exec is None:
-            raise RuntimeError(
-                "Cannot access model executor - you have to use this class as a context manager with the with statement first."
-            )
-        return self.model_exec.submit(fn, *fn_args, **fn_kwargs)
+    def _setup_models(self) -> None:
+        print("setup models")
+        if self._evolution_model is not None:
+            raise Exception("Evolution model has already been initialized.")
+
+        # currently fix model
+        options = Namespace(
+            llama_path="./models/llama-2-13b-chat.Q5_K_M.gguf", chat=True
+        )
+        self._evolution_model = Llama2(options)
+
+        if self._evaluation_model is not None:
+            raise Exception("Evaluation model has already been initialized.")
+
+        # currently fix model
+        self._evaluation_model = self._evolution_model
+
+    def _release_models(self) -> None:
+        del self._evolution_model
+        del self._evaluation_model
+
+    def _call_evolution_model(self, *args, **kwargs):
+        return self._evolution_model(*args, **kwargs)
+
+    def _call_evaluation_model(self, *args, **kwargs):
+        return self._evaluation_model(*args, **kwargs)
 
     def run_optimization(self, num_iterations: int) -> str:
         self._running = True
-- 
GitLab