Files
CHORUS/test-resetdata-integration.py
anthonyrawlins 9bdcbe0447 Integrate BACKBEAT SDK and resolve KACHING license validation
Major integrations and fixes:
- Added BACKBEAT SDK integration for P2P operation timing
- Implemented beat-aware status tracking for distributed operations
- Added Docker secrets support for secure license management
- Resolved KACHING license validation via HTTPS/TLS
- Updated docker-compose configuration for clean stack deployment
- Disabled rollback policies to prevent deployment failures
- Added license credential storage (CHORUS-DEV-MULTI-001)

Technical improvements:
- BACKBEAT P2P operation tracking with phase management
- Enhanced configuration system with file-based secrets
- Improved error handling for license validation
- Clean separation of KACHING and CHORUS deployment stacks

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-06 07:56:26 +10:00

145 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Test script to validate CHORUS integration with ResetData API
This will test the reasoning endpoint to ensure resetdata is working correctly
"""
import requests
import time
import json
import os
import sys
def wait_for_chorus_health(base_url="http://localhost:8081", timeout=120):
"""Wait for CHORUS to be healthy"""
print(f"🏥 Waiting for CHORUS health endpoint at {base_url}/health...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{base_url}/health", timeout=5)
if response.status_code == 200:
print("✅ CHORUS is healthy!")
return True
except requests.exceptions.RequestException as e:
pass
print("⏳ Waiting for CHORUS to start...")
time.sleep(5)
print("❌ Timeout waiting for CHORUS health endpoint")
return False
def test_reasoning_endpoint(base_url="http://localhost:8080"):
"""Test the reasoning endpoint to ensure ResetData integration works"""
# Test prompt for ResetData
test_prompt = "What is the capital of Australia? Please provide a brief answer."
print(f"🧠 Testing reasoning endpoint with prompt: '{test_prompt}'")
try:
# Make request to CHORUS reasoning endpoint
# Note: This endpoint may not exist yet, this is a placeholder for testing
response = requests.post(
f"{base_url}/api/reasoning/generate",
json={
"prompt": test_prompt,
"model": "meta/llama-3.1-8b-instruct"
},
timeout=30
)
if response.status_code == 200:
result = response.json()
print("✅ ResetData integration test successful!")
print(f"📝 Response: {result.get('response', 'No response field')}")
return True
else:
print(f"❌ API returned status {response.status_code}: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ Request failed: {e}")
return False
def test_direct_resetdata_api():
"""Test ResetData API directly to ensure our API key works"""
print("🌐 Testing ResetData API directly...")
api_key = os.getenv('RESETDATA_API_KEY')
if not api_key:
print("❌ RESETDATA_API_KEY not set")
return False
try:
from openai import OpenAI
client = OpenAI(
base_url="https://models.au-syd.resetdata.ai/v1",
api_key=api_key
)
response = client.chat.completions.create(
model="meta/llama-3.1-8b-instruct:ptu-9f3627a0-4909-4561-8996-272774e91fc8",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is 2+2? Answer briefly."}
],
temperature=0.2,
top_p=0.7,
max_tokens=50
)
if response.choices and response.choices[0].message.content:
print(f"✅ Direct ResetData API test successful!")
print(f"📝 Response: {response.choices[0].message.content}")
return True
else:
print("❌ No valid response from ResetData API")
return False
except Exception as e:
print(f"❌ Direct ResetData API test failed: {e}")
return False
def main():
print("🚀 CHORUS ResetData Integration Test")
print("=" * 50)
# First test ResetData API directly
if not test_direct_resetdata_api():
print("❌ Direct ResetData API test failed - check your API key")
return False
print("\n" + "=" * 50)
# Wait for CHORUS to be healthy
if not wait_for_chorus_health():
print("❌ CHORUS health check failed")
return False
print("\n" + "=" * 50)
# Test CHORUS reasoning endpoint (if it exists)
# Note: This may not be implemented yet, so we'll make it optional
print("🧠 Testing CHORUS reasoning integration...")
print(" Note: Reasoning endpoint may not be implemented yet")
success = test_reasoning_endpoint()
print("\n" + "=" * 50)
print("🎯 Integration Test Summary:")
print(f" Direct ResetData API: ✅")
print(f" CHORUS Health Check: ✅")
print(f" CHORUS Reasoning: {'' if success else '❓ (endpoint may not exist)'}")
return True
if __name__ == "__main__":
try:
success = main()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n⚠️ Test interrupted by user")
sys.exit(1)