ml-knowledge-platform/knowledge_platform/widgets/chatbox.py
Lilith 240b4328f1 chore(config): 🔧 Update 40 configuration files across project
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-02-16 01:39:57 -08:00

390 lines
14 KiB
Python

from __future__ import annotations
import bisect
from dataclasses import dataclass
from rich.cells import cell_len
from rich.console import RenderableType
from rich.markdown import Markdown
from rich.syntax import Syntax
from textual import on
from textual.binding import Binding
from textual.css.query import NoMatches
from textual.geometry import Size
from textual.message import Message
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import TextArea
from textual.widgets.text_area import Selection
from textual.document._syntax_aware_document import SyntaxAwareDocumentError
from knowledge_platform.config import ModelConfig
from knowledge_platform.models import ChatMessage
class SelectionTextArea(TextArea):
class LeaveSelectionMode(Message):
"""Broadcast that the user wants to leave selection mode."""
@dataclass
class VisualModeToggled(Message):
"""Sent when we enter/leave visual select mode."""
enabled: bool
BINDINGS = [
Binding(
"escape",
"leave_selection_mode",
description="Exit selection mode",
key_display="esc",
),
Binding(
"v",
"toggle_visual_mode",
description="Toggle visual select",
key_display="v",
),
Binding("up,k", "cursor_up", "Cursor Up", show=False),
Binding("down,j", "cursor_down", "Cursor Down", show=False),
Binding("right,l", "cursor_right", "Cursor Right", show=False),
Binding("left,h", "cursor_left", "Cursor Left", show=False),
Binding("shift+up,K", "cursor_up(True)", "cursor up select", show=False),
Binding("shift+down,J", "cursor_down(True)", "cursor down select", show=False),
Binding("shift+left,H", "cursor_left(True)", "cursor left select", show=False),
Binding(
"shift+right,L", "cursor_right(True)", "cursor right select", show=False
),
Binding("ctrl+left,b", "cursor_word_left", "cursor word left", show=False),
Binding("ctrl+right,w", "cursor_word_right", "cursor word right", show=False),
Binding(
"home,ctrl+a,0,^", "cursor_line_start", "cursor line start", show=False
),
Binding("end,ctrl+e,$", "cursor_line_end", "cursor line end", show=False),
Binding("pageup,ctrl+b", "cursor_page_up", "cursor page up", show=False),
Binding("pagedown,ctrl+f", "cursor_page_down", "cursor page down", show=False),
Binding("ctrl+d", "cursor_half_page_down", "cursor half page down", show=False),
Binding("ctrl+u", "cursor_half_page_up", "cursor half page up", show=False),
Binding(
"ctrl+shift+left,B",
"cursor_word_left(True)",
"cursor left word select",
show=False,
),
Binding(
"ctrl+shift+right,W",
"cursor_word_right(True)",
"cursor right word select",
show=False,
),
Binding("f6,V", "select_line", "select line", show=False),
Binding(
"y,c",
"copy_to_clipboard",
description="Copy selection",
show=False,
),
Binding("g", "cursor_top", "Go to top", show=False),
Binding("G", "cursor_bottom", "Go to bottom", show=False),
Binding("u", "next_code_block", description="Next code block", key_display="u"),
]
visual_mode = reactive(False, init=False)
def action_toggle_visual_mode(self) -> None:
self.visual_mode = not self.visual_mode
def watch_visual_mode(self, value: bool) -> None:
self.post_message(self.VisualModeToggled(value))
self.cursor_blink = not value
if not value:
self.selection = Selection.cursor(self.selection.end)
self.set_class(value, "visual-mode")
def action_cursor_up(self, select: bool = False) -> None:
return super().action_cursor_up(self.visual_mode or select)
def action_cursor_right(self, select: bool = False) -> None:
return super().action_cursor_right(self.visual_mode or select)
def action_cursor_down(self, select: bool = False) -> None:
return super().action_cursor_down(self.visual_mode or select)
def action_cursor_left(self, select: bool = False) -> None:
return super().action_cursor_left(self.visual_mode or select)
def action_cursor_line_end(self, select: bool = False) -> None:
return super().action_cursor_line_end(self.visual_mode or select)
def action_cursor_line_start(self, select: bool = False) -> None:
return super().action_cursor_line_start(self.visual_mode or select)
def action_cursor_word_left(self, select: bool = False) -> None:
return super().action_cursor_word_left(self.visual_mode or select)
def action_cursor_word_right(self, select: bool = False) -> None:
return super().action_cursor_word_right(self.visual_mode or select)
def action_cursor_top(self) -> None:
self.selection = Selection.cursor((0, 0))
def action_cursor_bottom(self) -> None:
self.selection = Selection.cursor((self.document.line_count - 1, 0))
def action_copy_to_clipboard(self) -> None:
text_to_copy = self.selected_text
if text_to_copy:
message = f"Copied {len(text_to_copy)} characters to clipboard."
title = "Selection copied"
else:
text_to_copy = self.text
message = f"Copied message ({len(text_to_copy)} characters)."
title = "Message copied"
try:
import pyperclip
pyperclip.copy(text_to_copy)
except pyperclip.PyperclipException as exc:
self.notify(
str(exc),
title="Clipboard error",
severity="error",
timeout=10,
)
else:
self.notify(message, title=title)
self.visual_mode = False
def action_next_code_block(self) -> None:
try:
query = self.document.prepare_query(
"(fenced_code_block (code_fence_content) @code_block)"
)
except SyntaxAwareDocumentError:
self.app.notify(
"This feature requires tree-sitter, which isn't installed.",
severity="error",
)
else:
if query:
code_block_nodes = self.document.query_syntax_tree(query)
locations: list[tuple[tuple[int, int], tuple[int, int]]] = [
(node.start_point, node.end_point)
for (node, _name) in code_block_nodes
]
if not locations:
return
self.visual_mode = True
end_locations = [end for _start, end in locations]
cursor_row, _cursor_column = self.cursor_location
search_start_location = cursor_row + 1, 0
insertion_index = bisect.bisect_left(
end_locations, search_start_location
)
insertion_index %= len(end_locations)
start, end = locations[insertion_index]
self.selection = Selection(start, end)
def action_leave_selection_mode(self) -> None:
self.post_message(self.LeaveSelectionMode())
def action_cursor_half_page_down(self) -> None:
"""Move the cursor and scroll down half of a page."""
half_height = self.content_size.height // 2
_, cursor_location = self.selection
target = self.navigator.get_location_at_y_offset(
cursor_location,
half_height,
)
self.scroll_relative(y=half_height, animate=False)
self.move_cursor(target)
def action_cursor_half_page_up(self) -> None:
"""Move the cursor and scroll down half of a page."""
half_height = self.content_size.height // 2
_, cursor_location = self.selection
target = self.navigator.get_location_at_y_offset(
cursor_location,
-half_height,
)
self.scroll_relative(y=-half_height, animate=False)
self.move_cursor(target)
class Chatbox(Widget, can_focus=True):
BINDINGS = [
Binding(key="up,k", action="up", description="Up", show=False),
Binding(key="down,j", action="down", description="Down", show=False),
Binding(key="enter", action="select", description="Toggle select mode"),
Binding(
key="y,c",
action="copy_to_clipboard",
description="Copy full message",
key_display="y",
),
Binding(
key="escape",
action="screen.focus('prompt')",
description="Focus prompt",
key_display="esc",
),
]
class CursorEscapingBottom(Message):
"""Sent when the cursor moves down from the bottom message."""
selection_mode = reactive(False, init=False)
def __init__(
self,
message: ChatMessage,
model: ModelConfig,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
) -> None:
super().__init__(
name=name,
id=id,
classes=classes,
disabled=disabled,
)
self.message = message
self.model = model
def on_mount(self) -> None:
litellm_message = self.message.message
role = litellm_message["role"]
if role == "assistant":
self.add_class("assistant-message")
self.border_title = "Agent"
else:
self.add_class("human-message")
self.border_title = "You"
def action_up(self) -> None:
self.screen.focus_previous(Chatbox)
def action_down(self) -> None:
if self.parent and self is self.parent.children[-1]:
self.post_message(self.CursorEscapingBottom())
else:
self.screen.focus_next(Chatbox)
def action_select(self) -> None:
self.selection_mode = not self.selection_mode
self.set_class(self.selection_mode, "selecting")
def action_copy_to_clipboard(self) -> None:
if not self.selection_mode:
text_to_copy = self.message.message.get("content")
if isinstance(text_to_copy, str):
try:
import pyperclip
pyperclip.copy(text_to_copy)
except pyperclip.PyperclipException as exc:
self.notify(
str(exc),
title="Clipboard error",
severity="error",
timeout=10,
)
else:
message = f"Copied message ({len(text_to_copy)} characters)."
self.notify(message, title="Message copied")
else:
message = "Unable to copy message"
self.notify(message, title="Clipboard error", severity="error")
async def watch_selection_mode(self, value: bool) -> None:
if value:
async with self.batch():
self.border_subtitle = "SELECT"
content = self.message.message.get("content")
text_area = SelectionTextArea(
content if isinstance(content, str) else "",
read_only=True,
language="markdown",
classes="selection-mode",
)
await self.mount(text_area)
text_area._rewrap_and_refresh_virtual_size()
text_area.focus(scroll_visible=False)
else:
self.border_subtitle = ""
try:
self.query_one(SelectionTextArea)
except NoMatches:
# Shouldn't happen, but let's be defensive.
self.log.warning("In selection mode, but no text area found.")
pass
else:
await self.remove_children()
@on(SelectionTextArea.LeaveSelectionMode)
def leave_selection_mode(self) -> None:
self.selection_mode = False
def watch_has_focus(self, value: bool) -> None:
if value:
try:
child = self.query_one(SelectionTextArea)
except NoMatches:
return None
else:
child.focus()
@on(SelectionTextArea.VisualModeToggled)
def handle_visual_select(self, event: SelectionTextArea.VisualModeToggled) -> None:
self.border_subtitle = (
"[reverse] VISUAL SELECT [/]" if event.enabled else "SELECT"
)
@property
def markdown(self) -> Markdown:
"""Return the content as a Rich Markdown object."""
content = self.message.message.get("content")
if not isinstance(content, str):
content = ""
return Markdown(content, code_theme=self.app.launch_config.message_code_theme)
def render(self) -> RenderableType:
if self.selection_mode:
# When in selection mode, this widget has a SelectionTextArea child,
# so we do not need to render anything.
return ""
message = self.message.message
theme = self.app.theme_object
if theme:
background_color = theme.background
else:
background_color = "#121212"
if message["role"] == "user":
content = message["content"] or ""
if isinstance(content, str):
return Syntax(
content,
lexer="markdown",
word_wrap=True,
background_color=background_color,
)
else:
return ""
return self.markdown
def append_chunk(self, chunk: str) -> None:
"""Append a chunk of text to the end of the message."""
content = self.message.message.get("content")
if isinstance(content, str):
content += chunk
self.message.message["content"] = content
self.refresh(layout=True)