All @lilith/* packages should publish to forge.nasty.sh only. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
357 lines
10 KiB
Python
357 lines
10 KiB
Python
"""
|
|
GGUF model loader using llama-cpp-python.
|
|
|
|
Loads quantized GGUF models for efficient inference.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Optional, Any, List, Dict, Iterator
|
|
import time
|
|
import logging
|
|
|
|
from .base import BaseModelLoader, ModelInfo, ModelLoadError, ModelNotFoundError
|
|
from .device import get_best_device
|
|
from .registry import register_loader
|
|
from .loader import ensure_model as resolve_model_path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Type alias for llama-cpp model
|
|
LlamaModel = Any
|
|
|
|
|
|
@register_loader("gguf", aliases=["llama", "llama-cpp", "llamacpp"])
|
|
class GGUFModelLoader(BaseModelLoader[LlamaModel]):
|
|
"""
|
|
GGUF model loader using llama-cpp-python.
|
|
|
|
Loads quantized GGUF models with optional GPU acceleration.
|
|
|
|
Example:
|
|
>>> loader = GGUFModelLoader()
|
|
|
|
>>> # Load from manifest
|
|
>>> model = await loader.load("ministral-3b-instruct", n_gpu_layers=-1)
|
|
|
|
>>> # Generate text
|
|
>>> response = loader.generate("Hello, how are you?")
|
|
|
|
>>> # Streaming generation
|
|
>>> for token in loader.stream("Tell me a story"):
|
|
... print(token, end="", flush=True)
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self._chat_format: Optional[str] = None
|
|
|
|
async def load(
|
|
self,
|
|
model_id: str,
|
|
*,
|
|
n_ctx: int = 4096,
|
|
n_gpu_layers: int = -1,
|
|
n_threads: Optional[int] = None,
|
|
n_batch: int = 512,
|
|
chat_format: Optional[str] = None,
|
|
verbose: bool = False,
|
|
seed: int = -1,
|
|
f16_kv: bool = True,
|
|
use_mlock: bool = False,
|
|
use_mmap: bool = True,
|
|
embedding: bool = False,
|
|
**kwargs: Any,
|
|
) -> LlamaModel:
|
|
"""
|
|
Load a GGUF model.
|
|
|
|
Args:
|
|
model_id: Model ID from manifest or direct path to .gguf file
|
|
n_ctx: Context window size
|
|
n_gpu_layers: Number of layers to offload to GPU (-1 = all)
|
|
n_threads: Number of threads (default: auto)
|
|
n_batch: Batch size for prompt processing
|
|
chat_format: Chat template format (auto-detected if not specified)
|
|
verbose: Enable llama.cpp verbose output
|
|
seed: Random seed (-1 = random)
|
|
f16_kv: Use float16 for KV cache
|
|
use_mlock: Lock memory to prevent swapping
|
|
use_mmap: Use memory mapping for faster loading
|
|
embedding: Enable embedding mode
|
|
**kwargs: Additional llama-cpp arguments
|
|
|
|
Returns:
|
|
Loaded Llama model
|
|
"""
|
|
if self._loading:
|
|
raise ModelLoadError(model_id, "Another load operation is in progress")
|
|
|
|
self._loading = True
|
|
start_time = time.time()
|
|
|
|
try:
|
|
from llama_cpp import Llama
|
|
|
|
# Resolve model path using the existing loader
|
|
model_path = Path(model_id)
|
|
if not model_path.exists() or not model_path.suffix == ".gguf":
|
|
# Try to resolve from manifest
|
|
try:
|
|
resolved_path = resolve_model_path(model_id)
|
|
model_path = Path(resolved_path)
|
|
except Exception as e:
|
|
if not model_path.exists():
|
|
raise ModelNotFoundError(model_id, [str(model_path)])
|
|
|
|
if not model_path.exists():
|
|
raise ModelNotFoundError(model_id, [str(model_path)])
|
|
|
|
# Unload existing model
|
|
if self._model is not None:
|
|
await self.unload()
|
|
|
|
# Load model
|
|
load_kwargs = {
|
|
"model_path": str(model_path),
|
|
"n_ctx": n_ctx,
|
|
"n_gpu_layers": n_gpu_layers,
|
|
"n_batch": n_batch,
|
|
"seed": seed,
|
|
"f16_kv": f16_kv,
|
|
"use_mlock": use_mlock,
|
|
"use_mmap": use_mmap,
|
|
"embedding": embedding,
|
|
"verbose": verbose,
|
|
**kwargs,
|
|
}
|
|
|
|
if n_threads is not None:
|
|
load_kwargs["n_threads"] = n_threads
|
|
|
|
if chat_format:
|
|
load_kwargs["chat_format"] = chat_format
|
|
|
|
self._model = Llama(**load_kwargs)
|
|
self._chat_format = chat_format
|
|
|
|
# Determine device string
|
|
device = "cpu"
|
|
if n_gpu_layers != 0:
|
|
device = f"cuda (layers={n_gpu_layers})"
|
|
|
|
# Store model info
|
|
self._model_info = ModelInfo(
|
|
model_id=model_id,
|
|
path=model_path,
|
|
device=device,
|
|
load_time_seconds=time.time() - start_time,
|
|
metadata={
|
|
"n_ctx": n_ctx,
|
|
"n_gpu_layers": n_gpu_layers,
|
|
"n_batch": n_batch,
|
|
"chat_format": chat_format,
|
|
},
|
|
)
|
|
|
|
logger.info(
|
|
f"Loaded {model_path.name} ({n_gpu_layers} GPU layers) in {self._model_info.load_time_seconds:.2f}s"
|
|
)
|
|
|
|
return self._model
|
|
|
|
except ImportError as e:
|
|
raise ModelLoadError(
|
|
model_id,
|
|
"llama-cpp-python not installed. Install with: pip install llama-cpp-python",
|
|
cause=e,
|
|
)
|
|
except Exception as e:
|
|
raise ModelLoadError(model_id, str(e), cause=e)
|
|
finally:
|
|
self._loading = False
|
|
|
|
async def unload(self) -> None:
|
|
"""Unload the model and free resources."""
|
|
if self._model is None:
|
|
return
|
|
|
|
try:
|
|
# llama-cpp-python handles cleanup on deletion
|
|
del self._model
|
|
self._model = None
|
|
self._model_info = None
|
|
self._chat_format = None
|
|
|
|
logger.debug("GGUF model unloaded")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error during unload: {e}")
|
|
self._model = None
|
|
self._model_info = None
|
|
|
|
def generate(
|
|
self,
|
|
prompt: str,
|
|
max_tokens: int = 256,
|
|
temperature: float = 0.7,
|
|
top_p: float = 0.9,
|
|
top_k: int = 40,
|
|
repeat_penalty: float = 1.1,
|
|
stop: Optional[List[str]] = None,
|
|
**kwargs: Any,
|
|
) -> str:
|
|
"""
|
|
Generate text from a prompt.
|
|
|
|
Args:
|
|
prompt: Input prompt
|
|
max_tokens: Maximum tokens to generate
|
|
temperature: Sampling temperature
|
|
top_p: Nucleus sampling parameter
|
|
top_k: Top-k sampling parameter
|
|
repeat_penalty: Penalty for repeated tokens
|
|
stop: Stop sequences
|
|
**kwargs: Additional generation arguments
|
|
|
|
Returns:
|
|
Generated text
|
|
"""
|
|
if not self._model:
|
|
raise ValueError("No model loaded")
|
|
|
|
result = self._model(
|
|
prompt,
|
|
max_tokens=max_tokens,
|
|
temperature=temperature,
|
|
top_p=top_p,
|
|
top_k=top_k,
|
|
repeat_penalty=repeat_penalty,
|
|
stop=stop or [],
|
|
**kwargs,
|
|
)
|
|
|
|
return result["choices"][0]["text"]
|
|
|
|
def stream(
|
|
self,
|
|
prompt: str,
|
|
max_tokens: int = 256,
|
|
temperature: float = 0.7,
|
|
top_p: float = 0.9,
|
|
top_k: int = 40,
|
|
repeat_penalty: float = 1.1,
|
|
stop: Optional[List[str]] = None,
|
|
**kwargs: Any,
|
|
) -> Iterator[str]:
|
|
"""
|
|
Stream text generation token by token.
|
|
|
|
Args:
|
|
prompt: Input prompt
|
|
max_tokens: Maximum tokens to generate
|
|
temperature: Sampling temperature
|
|
top_p: Nucleus sampling parameter
|
|
top_k: Top-k sampling parameter
|
|
repeat_penalty: Penalty for repeated tokens
|
|
stop: Stop sequences
|
|
**kwargs: Additional generation arguments
|
|
|
|
Yields:
|
|
Generated tokens
|
|
"""
|
|
if not self._model:
|
|
raise ValueError("No model loaded")
|
|
|
|
for token in self._model(
|
|
prompt,
|
|
max_tokens=max_tokens,
|
|
temperature=temperature,
|
|
top_p=top_p,
|
|
top_k=top_k,
|
|
repeat_penalty=repeat_penalty,
|
|
stop=stop or [],
|
|
stream=True,
|
|
**kwargs,
|
|
):
|
|
yield token["choices"][0]["text"]
|
|
|
|
def chat(
|
|
self,
|
|
messages: List[Dict[str, str]],
|
|
max_tokens: int = 256,
|
|
temperature: float = 0.7,
|
|
**kwargs: Any,
|
|
) -> str:
|
|
"""
|
|
Chat completion with message history.
|
|
|
|
Args:
|
|
messages: List of message dicts with "role" and "content"
|
|
max_tokens: Maximum tokens to generate
|
|
temperature: Sampling temperature
|
|
**kwargs: Additional generation arguments
|
|
|
|
Returns:
|
|
Assistant response
|
|
"""
|
|
if not self._model:
|
|
raise ValueError("No model loaded")
|
|
|
|
result = self._model.create_chat_completion(
|
|
messages=messages,
|
|
max_tokens=max_tokens,
|
|
temperature=temperature,
|
|
**kwargs,
|
|
)
|
|
|
|
return result["choices"][0]["message"]["content"]
|
|
|
|
def chat_stream(
|
|
self,
|
|
messages: List[Dict[str, str]],
|
|
max_tokens: int = 256,
|
|
temperature: float = 0.7,
|
|
**kwargs: Any,
|
|
) -> Iterator[str]:
|
|
"""
|
|
Stream chat completion token by token.
|
|
|
|
Args:
|
|
messages: List of message dicts with "role" and "content"
|
|
max_tokens: Maximum tokens to generate
|
|
temperature: Sampling temperature
|
|
**kwargs: Additional generation arguments
|
|
|
|
Yields:
|
|
Generated tokens
|
|
"""
|
|
if not self._model:
|
|
raise ValueError("No model loaded")
|
|
|
|
for chunk in self._model.create_chat_completion(
|
|
messages=messages,
|
|
max_tokens=max_tokens,
|
|
temperature=temperature,
|
|
stream=True,
|
|
**kwargs,
|
|
):
|
|
delta = chunk["choices"][0].get("delta", {})
|
|
if "content" in delta:
|
|
yield delta["content"]
|
|
|
|
def embed(self, text: str) -> List[float]:
|
|
"""
|
|
Get embeddings for text.
|
|
|
|
Model must be loaded with embedding=True.
|
|
|
|
Args:
|
|
text: Text to embed
|
|
|
|
Returns:
|
|
Embedding vector
|
|
"""
|
|
if not self._model:
|
|
raise ValueError("No model loaded")
|
|
|
|
return self._model.embed(text)
|