Skip to content
Snippets Groups Projects
Commit 3cef8395 authored by Grießhaber Daniel's avatar Grießhaber Daniel :squid:
Browse files

move to thread based separation of backend and optimizer

parent 8226548c
No related branches found
No related tags found
No related merge requests found
...@@ -10,7 +10,7 @@ from api.routers import runs ...@@ -10,7 +10,7 @@ from api.routers import runs
# see https://github.com/tiangolo/fastapi/issues/3091#issuecomment-821522932 and https://github.com/encode/starlette/issues/1094#issuecomment-730346075 for heavy-load computation # see https://github.com/tiangolo/fastapi/issues/3091#issuecomment-821522932 and https://github.com/encode/starlette/issues/1094#issuecomment-730346075 for heavy-load computation
DEBUG = True DEBUG = True
backend = None backend: MultiProcessOptimizer | None = None
@asynccontextmanager @asynccontextmanager
...@@ -28,11 +28,6 @@ async def lifespan(app: FastAPI): ...@@ -28,11 +28,6 @@ async def lifespan(app: FastAPI):
app.openapi() app.openapi()
yield yield
# Unload the backend freeing used ressources by the separate process
# automatically done when with block is exited
print("Releasing resources")
# release remaining allocations
del backend
# TODO somehow not all ressources are released upon uvicorn reload, need to investigate further.. # TODO somehow not all ressources are released upon uvicorn reload, need to investigate further..
......
from argparse import Namespace from argparse import Namespace
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from functools import partial
from typing import ClassVar
from evolution import GeneticAlgorithm from evolution import GeneticAlgorithm
from models import Llama2, LLMModel from models import Llama2, LLMModel
from task import SentimentAnalysis from task import SentimentAnalysis
# def create_model():
# global optimizer
# optimizer = Optimizer(debug=DEBUG)
_evolution_model: LLMModel | None = None
_evaluation_model: LLMModel | None = None
# _model_call_type: get_type_hints(LLMModel).get("__call__")
def _setup_models() -> None:
global _evolution_model, _evaluation_model
if _evolution_model is not None:
raise Exception("Evolution model has already been initialized.")
# currently fix model
options = Namespace(llama_path="./models/llama-2-13b-chat.Q5_K_M.gguf", chat=True)
_evolution_model = Llama2(options)
if _evaluation_model is not None:
raise Exception("Evaluation model has already been initialized.")
# currently fix model
_evaluation_model = _evolution_model
def _release_models() -> None:
global _evolution_model, _evaluation_model
del _evolution_model
del _evaluation_model
def _call_evolution_model(*args, **kwargs):
return _evolution_model(*args, **kwargs)
def _call_evaluation_model(*args, **kwargs):
return _evaluation_model(*args, **kwargs)
def f():
pass
class MultiProcessOptimizer: class MultiProcessOptimizer:
_instance: "MultiProcessOptimizer" = None _instance: ClassVar["MultiProcessOptimizer"] = None
_running: bool = False _running: bool = False
model_exec: ProcessPoolExecutor = None model_exec: ProcessPoolExecutor = None
_evolution_model: LLMModel | None = None
_evaluation_model: LLMModel | None = None
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
# only allow to create one instance (singleton pattern) # only allow to create one instance (singleton pattern)
...@@ -67,17 +28,17 @@ class MultiProcessOptimizer: ...@@ -67,17 +28,17 @@ class MultiProcessOptimizer:
def __enter__(self): def __enter__(self):
# TODO allow to customize optimizer # TODO allow to customize optimizer
# create necessary models
# initialize worker processes; only 1 worker since prediction is memory-intensive # initialize worker processes; only 1 worker since prediction is memory-intensive
# since we only have 1 worker we just save the state in the global namespace which the single worker accesses # since we only have 1 worker we just save the state in the global namespace which the single worker accesses
self.model_exec = ProcessPoolExecutor(max_workers=1, initializer=_setup_models) self.model_exec = ThreadPoolExecutor(
max_workers=1, initializer=self._setup_models
)
# make sure that initializer is called # make sure that initializer is called
self.model_exec.submit(f).result()
evolution_model = lambda *args, **kwargs: self.model_exec.submit( evolution_model = lambda *args, **kwargs: self.model_exec.submit(
_call_evolution_model, *args, **kwargs self._call_evolution_model, *args, **kwargs
).result() ).result()
evaluation_model = lambda *args, **kwargs: self.model_exec.submit( evaluation_model = lambda *args, **kwargs: self.model_exec.submit(
_call_evaluation_model, *args, **kwargs self._call_evaluation_model, *args, **kwargs
).result() ).result()
# currently fix task # currently fix task
...@@ -95,17 +56,36 @@ class MultiProcessOptimizer: ...@@ -95,17 +56,36 @@ class MultiProcessOptimizer:
) )
def __exit__(self, exc_type, exc_value, exc_tb): def __exit__(self, exc_type, exc_value, exc_tb):
print(f"Shutting down") self.model_exec.submit(self._release_models)
self._submit(_release_models).result()
self.model_exec.shutdown(False) self.model_exec.shutdown(False)
self.model_exec = None self.model_exec = None
def _submit(self, fn, *fn_args, **fn_kwargs): def _setup_models(self) -> None:
if self.model_exec is None: print("setup models")
raise RuntimeError( if self._evolution_model is not None:
"Cannot access model executor - you have to use this class as a context manager with the with statement first." raise Exception("Evolution model has already been initialized.")
)
return self.model_exec.submit(fn, *fn_args, **fn_kwargs) # currently fix model
options = Namespace(
llama_path="./models/llama-2-13b-chat.Q5_K_M.gguf", chat=True
)
self._evolution_model = Llama2(options)
if self._evaluation_model is not None:
raise Exception("Evaluation model has already been initialized.")
# currently fix model
self._evaluation_model = self._evolution_model
def _release_models(self) -> None:
del self._evolution_model
del self._evaluation_model
def _call_evolution_model(self, *args, **kwargs):
return self._evolution_model(*args, **kwargs)
def _call_evaluation_model(self, *args, **kwargs):
return self._evaluation_model(*args, **kwargs)
def run_optimization(self, num_iterations: int) -> str: def run_optimization(self, num_iterations: int) -> str:
self._running = True self._running = True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment