The engine includes short-term memory capabilities for maintaining context during conversations:
class ContextStore:
def __init__(self, max_length=5):
"""
Initialize the context store.
:param max_length: Maximum length of context history to maintain
"""
self.max_length = max_length
self.history = []
def add(self, user_input, model_output):
"""
Add new conversation to history.
:param user_input: User input
:param model_output: Model output
"""
self.history.append({"user": user_input, "model": model_output})
# If history exceeds max length, remove oldest entry
if len(self.history) > self.max_length:
self.history.pop(0)
def get_context(self):
"""
Get current conversation context formatted as string.
:return: Current conversation context
"""
context = ""
for exchange in self.history:
context += f"User: {exchange['user']}\n"
context += f"Model: {exchange['model']}\n"
return context
def clear(self):
"""
Clear all history.
"""
self.history = []
Stream Processing
The engine supports multiple output streams including HTTP, WebSocket, and local file logging:
class Stream:
"""
Stream class that manages multiple output streams.
Supports HTTP, local file, and WebSocket output streams.
"""
def __init__(self, stream_type="local"):
"""
Initialize Stream with specified stream type.
Args:
stream_type (str): Type of stream to initialize ("http", "local", or "websocket")
"""
self.streams = []
if stream_type == "http":
self.add_stream(HTTPStream("http://localhost:8000"))
elif stream_type == "local":
self.add_stream(LocalStream())
elif stream_type == "websocket":
self.add_stream(WebsocketStream("ws://localhost:8765"))
else:
raise ValueError(f"Invalid stream type: {stream_type}")
# Always add log stream as secondary output
self.add_stream(LogStream())