prospector/tooling/eval/sweep.py
Natalie 882a46e812
Some checks failed
CI / verify (push) Failing after 3m13s
feat(eval): explicit prospect-first CoT step (is_prospect)
Per operator: the FIRST CoT step must be 'is this a prospect or not?' — someone
saying 'hi' is usually a prospect but a friend/vendor/existing-client/bot is not.
Added is_prospect as the enforced first output field (json_schema strict
generates in property order, so the model commits to prospect-or-not before the
move). is_prospect=false IFF move in existing_client/personal/vendor/spam.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 12:52:02 -04:00

88 lines
5.4 KiB
Python

#!/usr/bin/env python3
"""Semantic sweep: classify chat.db decision points into the move taxonomy at scale.
Uses the shared burst-aware, 1:1-only extraction (lib.py). For each CLIENT->QUINN
decision point, the OSS CoT classifier picks the move + a one-sentence trace;
Quinn's actual reply is kept as gold for rationalization. Sparse semantic classes
(escalate/photographer) are flagged for review.
Scale: WORKERS (client concurrency, vertical), MAX_PER_HANDLE (decision points per
conversation), LIMIT (overall cap). PII stays under gitignored DATA_DIR.
Env: OSS_URL, DATA_DIR, WORKERS, MAX_PER_HANDLE, LIMIT.
"""
import os, json, urllib.request
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
import lib
OSS_URL = os.environ.get("OSS_URL", "http://localhost:8800/v1/chat/completions")
DATA = os.environ.get("DATA_DIR", os.path.join(os.path.dirname(__file__), ".data"))
WORKERS = int(os.environ.get("WORKERS", "64"))
MAX_PER_HANDLE = int(os.environ.get("MAX_PER_HANDLE", "1"))
LIMIT = int(os.environ.get("LIMIT", "0")) or None
COLD_ONLY = os.environ.get("COLD_ONLY", "1") != "0" # identity gate: cold prospects only (default on)
os.makedirs(DATA, exist_ok=True)
threads = []
for t in lib.load_threads(cold_only=COLD_ONLY):
for ctx, gold in lib.decision_points(t["turns"], max_per_handle=MAX_PER_HANDLE):
threads.append({"context": ctx, "gold_reply": gold})
if LIMIT:
threads = threads[:LIMIT]
print(f"decision points to sweep: {len(threads)} (1:1, burst-collapsed, max/handle={MAX_PER_HANDLE})", flush=True)
MOVES = ["opener", "pursue", "subhour", "address", "out_of_area", "of", "disengage", "escalate",
"existing_client", "personal", "vendor", "spam"]
SYSTEM = """Classify the last message to Quinn (a touring companion, $1000/hr, incall williamsburg NYC, text only, OnlyFans @transquinnftw).
FIRST decide: is this a COLD PROSPECT (a new/unknown person evaluating or booking her for the first time), or NOT a prospect at all? Most non-prospect traffic must NOT get a prospecting move.
NOT-A-PROSPECT classes (detect these first):
- existing_client: already a client — mid-booking logistics, "see you soon", on-the-way coordination, references to past meetings, ongoing relationship/sexting with someone she's met. NOT a new-prospect decision.
- personal: a friend / family / non-work conversation (her real life, plans, people she knows).
- vendor: someone selling HER a service (ad platform, hotel, rideshare, salon) — not a collab she'd want.
- spam: bot / automated / marketing / scam / wrong number.
PROSPECT moves (only when it really is a cold prospect):
- opener: new hello / general interest, no specifics yet.
- pursue: real booking interest / preference question / asks rate / scheduling -> engage warmly and move toward booking (even if crude or starting low, if he'd pay).
- subhour: asks for a <1hr / half-hour rate.
- address: asks her address before a time is locked -> withhold.
- out_of_area: asks if she's in another city (she pursues with outcall/FMTY if viable).
- of: harvester (wants free pics) or clearly out of budget -> OnlyFans. (Note: she sometimes sells sexting herself = pursue, not of.)
- disengage: lowballer insulting the rate / someone offering HIS body for free / hostile -> brief brush off.
- escalate: a photographer/collab/business/opportunity she'd want to personally decide -> hold and surface to her.
FIRST decide is_prospect — someone saying "hi" is USUALLY a prospect, but a friend, vendor, existing client, or bot saying "hi" is NOT. is_prospect is false IFF move is one of existing_client/personal/vendor/spam.
Output ONLY JSON, is_prospect FIRST: {"is_prospect": <bool>, "move":"<one of the classes>", "trace":"<one sentence: prospect or not, and why this class>"}"""
SCHEMA = {"type": "object",
"properties": {"is_prospect": {"type": "boolean"},
"move": {"type": "string", "enum": MOVES},
"trace": {"type": "string"}},
"required": ["is_prospect", "move", "trace"], "additionalProperties": False}
def classify(t):
body = json.dumps({"model": "quinn-oss",
"messages": [{"role": "system", "content": SYSTEM}, {"role": "user", "content": t["context"]}],
"temperature": 0.2, "max_tokens": 250,
"response_format": {"type": "json_schema", "json_schema": {"name": "m", "schema": SCHEMA, "strict": True}}}).encode()
req = urllib.request.Request(OSS_URL, data=body, headers={"Content-Type": "application/json"})
d = json.loads(json.load(urllib.request.urlopen(req, timeout=120))["choices"][0]["message"]["content"])
return {**t, "is_prospect": d["is_prospect"], "move": d["move"], "trace": d["trace"]}
labels = []
with ThreadPoolExecutor(max_workers=WORKERS) as ex:
futs = [ex.submit(classify, t) for t in threads]
for f in as_completed(futs):
try: labels.append(f.result())
except Exception as e: print("ERR", e, flush=True)
json.dump(labels, open(os.path.join(DATA, "sweep_labels.json"), "w"), ensure_ascii=False)
print(f"swept {len(labels)} decision points", flush=True)
print("move dist:", dict(Counter(x["move"] for x in labels)))
flagged = [x for x in labels if x["move"] == "escalate"]
print(f"\nescalate/photographer flagged: {len(flagged)}")
for x in flagged[:8]:
last = [l for l in x["context"].split(chr(10)) if l.startswith("CLIENT")][-1]
print(f" {last[7:97]} | {x['trace'][:60]}")