diff --git a/Lewis.py b/Lewis.py new file mode 100644 index 0000000..05e54bd --- /dev/null +++ b/Lewis.py @@ -0,0 +1,423 @@ +import openai +from openai import AzureOpenAI +import requests +import time +import os +import json +import requests +import subprocess +from openai import OpenAI +import random +from typing import List, Tuple, Dict, Any, Optional + +KEYS_DIR = 'keys' +if not os.path.isdir(KEYS_DIR): + os.makedirs(KEYS_DIR,exist_ok=True) + +def convert_openai_tools_to_claude(openai_tools: list) -> list: + claude_tools = [] + for tool in openai_tools: + if tool.get("type") != "function": + raise ValueError(f"Unsupported tool type: {tool.get('type')}") + + fn = tool["function"] + claude_tools.append({ + "name": fn["name"], + "description": fn.get("description", ""), + "input_schema": fn.get("parameters", {"type": "object", "properties": {}}) + }) + return claude_tools + +def normalize_messages_for_tools( + messages: List[Dict[str, Any]], + tools: Optional[List[Dict[str, Any]]] = None, +) -> Tuple[List[Dict[str, Any]], List[str]]: + """ + Detects and corrects common Chat Completions tool-message issues: + 1) In assistant messages, each entry in `tool_calls` must have: + { + "id": "...", + "type": "function", + "function": {"name": "", "arguments": ""} + } + - Moves top-level `name` / `arguments` into `function`. + - Ensures `type == "function"`. + - JSON-serializes non-string `arguments`. + + 2) In tool messages: + - Ensures `content` is a string; JSON-serializes if dict/list. + - Ensures `tool_call_id` exists. If missing, tries to pair with the + most recent unmatched assistant tool_call ID (by order). + + 3) Removes illegal extra fields at `tool_calls` top level. + + Returns: + (fixed_messages, issues) + - fixed_messages: deep-copied, corrected messages list + - issues: human-readable list of detected/corrected problems + """ + fixed = deepcopy(messages) + issues = [] + + # Build a set of valid function names from `tools` (optional validation) + valid_fn_names = set() + if tools: + for t in tools: + try: + if t.get("type") == "function": + fn = t.get("function", {}) + name = fn.get("name") + if isinstance(name, str): + valid_fn_names.add(name) + except Exception: + pass + + # Track assistant tool_calls -> to match subsequent tool results + pending_tool_call_ids = [] + + # First pass: fix assistant tool_calls and record pending IDs + for i, msg in enumerate(fixed): + role = msg.get("role") + if role == "assistant" and isinstance(msg.get("tool_calls"), list): + for j, tc in enumerate(msg["tool_calls"]): + # Ensure container objects exist + if not isinstance(tc, dict): + issues.append(f"[assistant#{i}] tool_calls[{j}] is not an object; replaced with empty object.") + msg["tool_calls"][j] = tc = {} + + # Move name/arguments into function + fn_obj = tc.get("function") or {} + moved = False + + if "name" in tc: + fn_obj["name"] = tc.pop("name") + moved = True + issues.append(f"[assistant#{i}] tool_calls[{j}]: moved top-level 'name' into 'function.name'.") + + if "arguments" in tc: + fn_obj["arguments"] = tc.pop("arguments") + moved = True + issues.append(f"[assistant#{i}] tool_calls[{j}]: moved top-level 'arguments' into 'function.arguments'.") + + # Ensure function object present + if "function" not in tc: + tc["function"] = fn_obj if fn_obj else {} + elif moved: + tc["function"].update(fn_obj) + + # Ensure type is "function" + if tc.get("type") != "function": + tc["type"] = "function" + issues.append(f"[assistant#{i}] tool_calls[{j}]: set 'type' to 'function'.") + + # Ensure arguments is a string + if "arguments" in tc["function"]: + args_val = tc["function"]["arguments"] + if not isinstance(args_val, str): + try: + tc["function"]["arguments"] = json.dumps(args_val, ensure_ascii=False) + issues.append(f"[assistant#{i}] tool_calls[{j}]: JSON-serialized non-string 'function.arguments'.") + except Exception: + tc["function"]["arguments"] = "{}" + issues.append(f"[assistant#{i}] tool_calls[{j}]: failed to serialize arguments; defaulted to '{{}}'.") + + else: + # Provide default empty JSON object + tc["function"]["arguments"] = "{}" + issues.append(f"[assistant#{i}] tool_calls[{j}]: added default empty 'function.arguments'.") + + # Validate function name if possible + fn_name = tc.get("function", {}).get("name") + if isinstance(fn_name, str): + if valid_fn_names and fn_name not in valid_fn_names: + issues.append(f"[assistant#{i}] tool_calls[{j}]: unknown function '{fn_name}' (not in tools).") + else: + issues.append(f"[assistant#{i}] tool_calls[{j}]: missing 'function.name'.") + + # Track pending tool_call_id for pairing + tc_id = tc.get("id") + if isinstance(tc_id, str): + pending_tool_call_ids.append(tc_id) + else: + # If missing id, synthesize a stable one + tc_id = f"call_{i}_{j}" + tc["id"] = tc_id + pending_tool_call_ids.append(tc_id) + issues.append(f"[assistant#{i}] tool_calls[{j}]: synthesized missing 'id' -> '{tc_id}'.") + + # Remove illegal top-level keys except allowed + allowed = {"id", "type", "function"} + extraneous = [k for k in list(tc.keys()) if k not in allowed] + for k in extraneous: + tc.pop(k, None) + issues.append(f"[assistant#{i}] tool_calls[{j}]: removed unsupported top-level field '{k}'.") + + # Second pass: fix tool messages (pair to pending assistant calls) + # We'll consume from the front of pending_tool_call_ids in order. + for i, msg in enumerate(fixed): + if msg.get("role") == "tool": + # tool_call_id + if not msg.get("tool_call_id"): + if pending_tool_call_ids: + inferred = pending_tool_call_ids.pop(0) + msg["tool_call_id"] = inferred + issues.append(f"[tool#{i}]: added missing 'tool_call_id' -> '{inferred}'.") + else: + issues.append(f"[tool#{i}]: missing 'tool_call_id' and none could be inferred.") + + # content must be string + content = msg.get("content") + if not isinstance(content, str): + try: + msg["content"] = json.dumps(content, ensure_ascii=False) + issues.append(f"[tool#{i}]: JSON-serialized non-string 'content'.") + except Exception: + msg["content"] = "" + issues.append(f"[tool#{i}]: failed to serialize content; set to empty string.") + + # Remove fields illegal for tool role (defensive) + for bad in ("name", "type", "function"): + if bad in msg: + msg.pop(bad, None) + issues.append(f"[tool#{i}]: removed illegal field '{bad}'.") + + # If someone mistakenly returned a tool result as role='assistant' with tool_call_id, + # quietly convert it to role='tool' (optional but handy). + if msg.get("role") == "assistant" and "tool_call_id" in msg: + msg["role"] = "tool" + issues.append(f"[assistant#{i}]: message had 'tool_call_id'; converted role to 'tool'.") + + return fixed, issues + +def convert_openai_messages_to_claude(openai_messages): + claude_messages = [] + for m in openai_messages: + if "tool_calls" in m: + m['content'] += '\n\n'+str(m["tool_calls"]) + m.pop("tool_calls") + claude_messages.append(m) + elif m['role']=='tool': + claude_messages.append({ + "role": 'user', + "content": "Tool call result: "+m['content'] + }) + else: + claude_messages.append(m) + return claude_messages + +def get_openai_token(p_token_url, p_client_id, p_client_secret, p_scope, **kwargs): + try: + with open(os.path.join(KEYS_DIR,f'openai_key.json')) as f: + key = json.load(f) + if time.time()