Some checks failed
CI / verify (push) Failing after 50s
The ultrathink + operator both found it: ~half the corpus is not cold prospecting (existing clients mid-booking, friends, vendors, spam) yet was forced into prospect moves -- the root cause of the 54% forward/backward disagreement. Taxonomy now leads with the prospect/not-prospect gate: existing_client | personal | vendor | spam as explicit classes, and collapses the noisy qualify/engage pair into 'pursue' (they route to the same action). Applied to both sweep (forward) and rationalize (backward) for the next re-sweep on the identity-gated corpus. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
78 lines
4.5 KiB
Python
78 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Rationalize labeled conversations into CoT training rows (STaR / backward distill).
|
|
|
|
Given (conversation context -> Quinn's ACTUAL next reply), infer the MOVE she ran
|
|
and a one-sentence reasoning trace anchored to her real reply (not a forward
|
|
guess). This is the high-quality way to manufacture the LoRA training set for
|
|
move-classification: (context -> trace -> move), labeled by what she actually did.
|
|
|
|
Input: a JSON list with `gold_reply` and either `context` (sweep) or `client_msg`
|
|
(mined cluster). Default <DATA_DIR>/sweep_labels.json -> the full work-era corpus.
|
|
Output: <DATA_DIR>/traincot_<input-stem>.json.
|
|
|
|
Env: OSS_URL, DATA_DIR. Arg: input filename (default sweep_labels.json).
|
|
"""
|
|
import json, os, sys, urllib.request
|
|
from collections import Counter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
OSS_URL = os.environ.get("OSS_URL", "http://localhost:8800/v1/chat/completions")
|
|
DATA = os.environ.get("DATA_DIR", os.path.join(os.path.dirname(__file__), ".data"))
|
|
WORKERS = int(os.environ.get("WORKERS", "64"))
|
|
INPUT = sys.argv[1] if len(sys.argv) > 1 else "sweep_labels.json"
|
|
stem = os.path.splitext(os.path.basename(INPUT))[0]
|
|
items = json.load(open(os.path.join(DATA, INPUT)))
|
|
|
|
MOVES = ["opener", "pursue", "subhour", "address", "out_of_area", "of", "disengage", "escalate",
|
|
"existing_client", "personal", "vendor", "spam"]
|
|
SYSTEM = f"""You build training data by analyzing how Quinn (a touring companion, $1000/hr, incall williamsburg NYC, text only, OnlyFans @transquinnftw) handled a conversation. You are GIVEN her actual next reply, so infer her REAL reasoning -- do not invent a different reply.
|
|
|
|
FIRST: was this even a COLD PROSPECT (new person evaluating/booking her), or NOT a prospect? Much traffic is not.
|
|
|
|
NOT-A-PROSPECT (classify as these, NOT a prospect move):
|
|
- existing_client: already her client -- mid-booking logistics, "see you soon", on-the-way, past-meeting references, ongoing relationship/sexting with someone she's met.
|
|
- personal: a friend / family / non-work conversation.
|
|
- vendor: someone selling HER a service.
|
|
- spam: bot / automated / marketing / scam / wrong number.
|
|
|
|
PROSPECT moves (one of: {', '.join(m for m in MOVES if m not in ('existing_client','personal','vendor','spam'))}):
|
|
- opener: answered a new hello with her intro.
|
|
- pursue: engaged a paying prospect / gave rate / answered a preference question / moved toward booking (even if crude or low, if she pursued).
|
|
- subhour: gave the <1hr / half-hour rate stance.
|
|
- address: withheld her address when asked before a locked time.
|
|
- out_of_area: told him she's not in his city / offered outcall.
|
|
- of: redirected to OnlyFans (harvester / free-content / out-of-budget).
|
|
- disengage: brushed off a lowballer / hostile / someone offering his body.
|
|
- escalate: a collab / photographer / business / opportunity she'd personally decide.
|
|
|
|
Then a ONE-sentence trace: prospect or not, the subject, his pay-intent, why her move fits.
|
|
|
|
Output ONLY JSON: {{"move":"<one of the classes>","trace":"<one sentence>"}}"""
|
|
|
|
SCHEMA = {"type": "object",
|
|
"properties": {"move": {"type": "string", "enum": MOVES}, "trace": {"type": "string"}},
|
|
"required": ["move", "trace"], "additionalProperties": False}
|
|
|
|
def rationalize(it):
|
|
ctx = it.get("context") or ("CLIENT: " + it.get("client_msg", ""))
|
|
gold = it.get("gold_reply") or it.get("quinn_reply_gold", "")
|
|
user = f"{ctx}\nQUINN (actual reply): {gold}"
|
|
body = json.dumps({"model": "quinn-oss",
|
|
"messages": [{"role": "system", "content": SYSTEM}, {"role": "user", "content": user}],
|
|
"temperature": 0.2, "max_tokens": 250,
|
|
"response_format": {"type": "json_schema", "json_schema": {"name": "r", "schema": SCHEMA, "strict": True}}}).encode()
|
|
req = urllib.request.Request(OSS_URL, data=body, headers={"Content-Type": "application/json"})
|
|
d = json.loads(json.load(urllib.request.urlopen(req, timeout=120))["choices"][0]["message"]["content"])
|
|
return {"context": ctx, "gold_reply": gold, "move": d["move"], "trace": d["trace"]}
|
|
|
|
rows = []
|
|
with ThreadPoolExecutor(max_workers=WORKERS) as ex:
|
|
futs = [ex.submit(rationalize, it) for it in items if (it.get("gold_reply") or it.get("quinn_reply_gold"))]
|
|
for f in as_completed(futs):
|
|
try: rows.append(f.result())
|
|
except Exception as e: print("ERR", e, flush=True)
|
|
|
|
out = os.path.join(DATA, f"traincot_{stem}.json")
|
|
json.dump(rows, open(out, "w"), ensure_ascii=False)
|
|
print(f"rationalized {len(rows)} -> {out}")
|
|
print("move dist:", dict(Counter(r["move"] for r in rows)))
|