diff --git a/api/main.py b/api/main.py index 3226ba95d2547110125ef04b94ddcbb3cc3cd698..0be4e5ddfd7d606ef121ca4f450c4774d6d3a6f5 100644 --- a/api/main.py +++ b/api/main.py @@ -1,19 +1,82 @@ -from fastapi import FastAPI, Request, Response -from fastapi.staticfiles import StaticFiles -from requests import request as make_request +from contextlib import asynccontextmanager +from api.optimization import MultiProcessOptimizer from api.routers import runs +from fastapi import BackgroundTasks, FastAPI, Request, Response +from fastapi.staticfiles import StaticFiles +from requests import request as make_request -# from api.optimization import Backend +# see https://github.com/tiangolo/fastapi/issues/3091#issuecomment-821522932 and https://github.com/encode/starlette/issues/1094#issuecomment-730346075 for heavy-load computation DEBUG = True - -app = FastAPI(debug=DEBUG, title="Prompt Optimization Backend") +backend = None + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global backend + # Load the backend (which runs models in a separate process) + backend = MultiProcessOptimizer(debug=DEBUG) + with backend: + # add endpoints from backend + # TODO allow to get dynamically + actions = [("/api/action/evolve", backend.optimizer.evolve)] + for path, target in actions: + app.add_api_route(path, target) + app.openapi_schema = None + app.openapi() + + yield + # Unload the backend freeing used ressources by the separate process + # automatically done when with block is exited + print("Releasing resources") + # release remaining allocations + del backend + # TODO somehow not all ressources are released upon uvicorn reload, need to investigate further.. + + +app = FastAPI(debug=DEBUG, title="Prompt Optimization Backend", lifespan=lifespan) app.include_router(runs.router, prefix="/api/runs") + + +# start optimization +@app.get("/api/run") +def run(num_iterations: int, background_tasks: BackgroundTasks) -> str: + background_tasks.add_task(backend.run_optimization, num_iterations) + return f"Running optimization with {num_iterations} iterations." + + +# get progress +@app.get("/api/progress") +def get_run_progress() -> str: + result = backend.get_progress() + return result + + +# get run state +@app.get("/api/status") +async def get_run_status() -> bool: + return backend._running + + +# get current genealogy of prompts +@app.get("/api/get_tree") +async def get_family() -> dict: + return backend.optimizer.family_tree + + if DEBUG: - @app.get("/{_:path}") + @app.get("/") + @app.get("/static/{_:path}") async def frontent_dev(request: Request): + # response = make_request( + # method=request.method, + # url="http://localhost:3000", + # headers=request.headers, + # stream=True, + # ) + # return Response(content=response.content, status_code=response.status_code) response = make_request( method=request.method, url=f"{request.url.replace(port=3000)}", @@ -29,34 +92,3 @@ if DEBUG: else: app.mount("/", StaticFiles(directory="frontend/build/", html=True), name="frontend") - - -def test(): - pass - - -# @app.get("/test") -# async def test_long_operation(request: Request): -# loop = asyncio.get_event_loop() -# result = await loop.run_in_executor(pool, test) -# return "ok" - - -# start optimization -@app.get("/run/{num_iterations}") -async def run(num_iterations: int) -> str: - # api.run_optimization(num_iterations) - return "ok" - - -# TODO turn actions into router and allow to set actions dynamically -# perform optimizer-specific action -@app.get("/action/evolve/") -async def evolve(prompt1: str, prompt2: str) -> str: - return f"This is the evolved prompt taking prompts {prompt1} and {prompt2} into account." - - -# get current genealogy of prompts -@app.get("/family_tree/get") -async def get_family() -> dict: - return dict() diff --git a/api/optimization.py b/api/optimization.py new file mode 100644 index 0000000000000000000000000000000000000000..39cf021cd38760bc6a7dfc874e8350d8773d706e --- /dev/null +++ b/api/optimization.py @@ -0,0 +1,123 @@ +from argparse import Namespace +from concurrent.futures import ProcessPoolExecutor + +from evolution import GeneticAlgorithm +from models import Llama2, LLMModel +from task import SentimentAnalysis + +# def create_model(): +# global optimizer +# optimizer = Optimizer(debug=DEBUG) + +_evolution_model: LLMModel = None +_evaluation_model: LLMModel = None +# _model_call_type: get_type_hints(LLMModel).get("__call__") + + +def _setup_models() -> None: + global _evolution_model, _evaluation_model + + if _evolution_model is not None: + raise Exception("Evolution model has already been initialized.") + + # currently fix model + options = Namespace(llama_path="./models/llama-2-13b-chat.Q5_K_M.gguf", chat=True) + _evolution_model = Llama2(options) + + if _evaluation_model is not None: + raise Exception("Evaluation model has already been initialized.") + + # currently fix model + _evaluation_model = _evolution_model + + +def _release_models() -> None: + global _evolution_model, _evaluation_model + del _evolution_model + del _evaluation_model + + +def _call_evolution_model(*args, **kwargs): + return _evolution_model(*args, **kwargs) + + +def _call_evaluation_model(*args, **kwargs): + return _evaluation_model(*args, **kwargs) + + +def f(): + pass + + +class MultiProcessOptimizer: + _instance: "MultiProcessOptimizer" = None + _running: bool = False + model_exec: ProcessPoolExecutor = None + + def __new__(cls, *args, **kwargs): + # only allow to create one instance (singleton pattern) + if cls._instance is None: + cls._instance = super(MultiProcessOptimizer, cls).__new__(cls) + return cls._instance + + def __init__(self, *, debug: bool = False) -> None: + # a flag indicating whether optimizer is currently running + self.debug = debug + + def __enter__(self): + # TODO allow to customize optimizer + + # create necessary models + # initialize worker processes; only 1 worker since prediction is memory-intensive + # since we only have 1 worker we just save the state in the global namespace which the single worker accesses + self.model_exec = ProcessPoolExecutor(max_workers=1, initializer=_setup_models) + # make sure that initializer is called + self.model_exec.submit(f).result() + evolution_model = lambda *args, **kwargs: self.model_exec.submit( + _call_evolution_model, *args, **kwargs + ).result() + evaluation_model = lambda *args, **kwargs: self.model_exec.submit( + _call_evaluation_model, *args, **kwargs + ).result() + + # currently fix task + options = Namespace(use_grammar=False, debug=self.debug) + task = SentimentAnalysis( + evaluation_model, + options + ) + + optimizer_class = GeneticAlgorithm + # optimizer_class = DifferentialEvolution + + self.optimizer = optimizer_class( + population_size=10, + task=task, + evolution_model=evolution_model, + evaluation_model=evaluation_model, + ) + + def __exit__(self, exc_type, exc_value, exc_tb): + print(f"Shutting down") + self._submit(_release_models).result() + self.model_exec.shutdown(False) + self.model_exec = None + + def _submit(self, fn, *fn_args, **fn_kwargs): + if self.model_exec is None: + raise RuntimeError( + "Cannot access model executor - you have to use this class as a context manager with the with statement first." + ) + return self.model_exec.submit(fn, *fn_args, **fn_kwargs) + + def run_optimization(self, num_iterations: int) -> str: + self._running = True + self.optimizer.run(num_iterations, debug=self.debug, add_snapshot_dict={}) + self._running = False + + def get_progress(self): + if hasattr(self.optimizer, "iterations_pbar"): + result = str(self.optimizer.iterations_pbar) + else: + result = "Optimization has not run yet." + return result diff --git a/evolution.py b/evolution.py index d2c3e1fcef407ebc3c70e23c6420d9aeb7230062..1dc8f38cdcc1663b665d7dd4a9b0c45e086dd298 100644 --- a/evolution.py +++ b/evolution.py @@ -115,7 +115,10 @@ class EvolutionAlgorithm(PromptOptimization): P = [initial_prompts] # Line 2: - for t in trange(1, num_iterations + 1, desc="iterations", leave=True): + self.iterations_pbar = trange( + 1, num_iterations + 1, desc="iterations", leave=True + ) + for t in self.iterations_pbar: # Line 3: Selection: select a certain number of prompts from current population as parent prompts # pr1,...,prk ∼ Pt−1 prompts_current_evolution = P[t - 1] @@ -186,15 +189,15 @@ class GeneticAlgorithm(EvolutionAlgorithm): shorthand = "ga" + # kwargs is just there for convenience, as evolve function of other optimizers might have different inputs + # @register_action(ignore_args=["kwargs"]) @log_calls("Performing prompt evolution using GA") def evolve( self, prompt_1: str, prompt_2: str, - *, - prompts_current_evolution: list[Prompt], - current_iteration: int, - ): + **kwargs, + ) -> tuple[str, ModelUsage]: # Following the evolutionary operators in GA, a new candidate prompt is generated through # a two-step process based on the selected two parents: # 1) The parent prompts undergo crossover, resulting in a new prompt that diff --git a/models.py b/models.py index bf0d49cad99956612e3ca2a060b8b3bc562d2a54..5a89761db3d83428fde076152b7227694180dd06 100644 --- a/models.py +++ b/models.py @@ -133,7 +133,6 @@ class LlamaModel(LLMModel): response_text = response["choices"][0]["text"] # input(f"Response: {response_text}") usage = ModelUsage(**response["usage"]) - self.usage += usage return response_text, usage @property @@ -215,7 +214,6 @@ class OpenAI(LLMModel): **kwargs, ) usage = ModelUsage(**response.usage.__dict__) - self.usage += usage return response.choices[0].message.content, usage else: response = self.openai_client.completions.create( @@ -230,7 +228,6 @@ class OpenAI(LLMModel): **kwargs, ) usage = ModelUsage(**response.usage.__dict__) - self.usage += usage return response.choices[0].text, usage @classmethod diff --git a/utils.py b/utils.py index c6c014bc5eaf766535fc452c89fefab3db2785c4..ca7eb854c3b9b7f295fb72638f10d0144a3ea1ec 100644 --- a/utils.py +++ b/utils.py @@ -34,7 +34,6 @@ RUNS_DIR = current_directory / "runs" def initialize_run_directory(model: Callable): response, usage = model(None, run_name_prompt, chat=True) - model.usage -= usage run_name_match = re.search(r"^\w+$", response, re.MULTILINE) existing_run_names = os.listdir(RUNS_DIR) if run_name_match is None or run_name_match.group(0) in existing_run_names: