freeleaps-ops/apps/gitea-webhook-ambassador-python/test_auth.py

170 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
Authentication feature test script
Demonstrates how to properly use JWT and API key authentication
"""
import asyncio
import aiohttp
import json
from datetime import datetime
BASE_URL = "http://localhost:8000"
def print_divider():
print("-" * 50)
async def test_jwt_authentication():
"""Test JWT authentication"""
print("🔐 Testing JWT authentication")
print_divider()
# Note: In actual applications, JWT tokens should be obtained via the login endpoint
# Here we use a sample token (in real environments, obtain from login endpoint)
# Simulate JWT token (should be obtained from login endpoint in real use)
jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTczMjAwMDAwMH0.test"
async with aiohttp.ClientSession() as session:
# Use JWT token to access admin endpoint
headers = {"Authorization": f"Bearer {jwt_token}"}
# Test access to logs endpoint
async with session.get(f"{BASE_URL}/api/logs", headers=headers) as response:
if response.status == 200:
logs = await response.json()
print("✅ JWT authentication succeeded - logs access")
print(f" Retrieved {len(logs)} logs")
else:
print(f"❌ JWT authentication failed - logs access: {response.status}")
if response.status == 401:
print(" Reason: JWT token is invalid or expired")
print()
async def test_api_key_authentication():
"""Test API key authentication"""
print("🔑 Testing API key authentication")
print_divider()
async with aiohttp.ClientSession() as session:
# First, create an API key (requires admin privileges)
# Note: Here we use a temporary authentication method
# Method 1: Use in-memory API key (for demo only)
# In real applications, API keys should be created via the admin interface
# Simulate a valid API key
api_key = "test_api_key_12345"
headers = {"Authorization": f"Bearer {api_key}"}
# Test access to logs endpoint
async with session.get(f"{BASE_URL}/api/logs", headers=headers) as response:
if response.status == 200:
logs = await response.json()
print("✅ API key authentication succeeded - logs access")
print(f" Retrieved {len(logs)} logs")
else:
print(f"❌ API key authentication failed - logs access: {response.status}")
if response.status == 401:
print(" Reason: API key is invalid or revoked")
print()
async def test_public_endpoints():
"""Test public endpoints (no authentication required)"""
print("🌐 Testing public endpoints")
print_divider()
async with aiohttp.ClientSession() as session:
# Health check endpoint (no authentication required)
async with session.get(f"{BASE_URL}/health") as response:
if response.status == 200:
data = await response.json()
print("✅ Health check endpoint accessed successfully")
print(f" Status: {data['status']}")
else:
print(f"❌ Health check endpoint access failed: {response.status}")
# Webhook endpoint (no authentication required)
webhook_data = {"test": "webhook_data"}
async with session.post(f"{BASE_URL}/webhook/gitea", json=webhook_data) as response:
if response.status == 200:
data = await response.json()
print("✅ Webhook endpoint accessed successfully")
print(f" Response: {data['message']}")
else:
print(f"❌ Webhook endpoint access failed: {response.status}")
print()
async def test_authentication_flow():
"""Test the complete authentication flow"""
print("🔄 Testing complete authentication flow")
print_divider()
print("📋 Authentication flow description:")
print("1. Public endpoints: /health, /webhook/gitea - no authentication required")
print("2. Admin endpoints: /api/admin/* - JWT or API key required")
print("3. Logs endpoints: /api/logs/* - JWT or API key required")
print()
print("🔧 How to obtain authentication tokens:")
print("1. JWT token: Obtain via login endpoint (login feature required)")
print("2. API key: Create via admin interface (admin privileges required)")
print()
print("⚠️ Demo limitations:")
print("- Using simulated authentication tokens")
print("- In real applications, implement full login and key management")
print("- It is recommended to use real authentication systems in production")
print()
async def create_demo_api_key():
"""Create a demo API key"""
print("🔧 Creating demo API key")
print_divider()
# Note: This is a simplified demo
# In real applications, API keys should be created and stored securely
demo_api_key = "demo_api_key_" + str(int(datetime.now().timestamp()))
print(f"✅ Demo API key created: {demo_api_key}")
print("📝 Usage:")
print(f" curl -H 'Authorization: Bearer {demo_api_key}' {BASE_URL}/api/logs")
print()
return demo_api_key
async def main():
"""Main test function"""
print("🚀 Starting authentication feature tests")
print("=" * 60)
print()
try:
# Wait for service to start
await asyncio.sleep(2)
await test_public_endpoints()
await test_jwt_authentication()
await test_api_key_authentication()
await test_authentication_flow()
# Create demo API key
demo_key = await create_demo_api_key()
print("=" * 60)
print("🎉 Authentication feature tests completed!")
print()
print("📚 Next steps:")
print("1. Implement a full login system")
print("2. Add user management features")
print("3. Implement secure API key storage")
print("4. Add permission control mechanisms")
print("5. Implement session management")
except Exception as e:
print(f"❌ Error occurred during testing: {str(e)}")
if __name__ == "__main__":
asyncio.run(main())