Skip to content

Commit

Permalink
logging enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
jrpereirajr committed Sep 23, 2024
1 parent 2edcba7 commit f7f227e
Showing 1 changed file with 100 additions and 28 deletions.
128 changes: 100 additions & 28 deletions src/dc/musketeersbr/sqlembeddings/Socket.cls
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ ClassMethod StartServerPy(port As %Integer) [ Language = python, Private ]
:param frame: The current stack frame.
:return: None
"""
print("\nServer shutting down gracefully...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer("Server shutting down gracefully...")
server_socket.close()
sys.exit(0)

def load_provider(providerName, modelName, loadedProviders):
if providerName in loadedProviders:
print(f"Model '{providerName}' already loaded, skipping...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Model '{providerName}' already loaded, skipping...")
model = loadedProviders[providerName]
else:
print("Model '{providerName}' not loaded, loading...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer("Model '{providerName}' not loaded, loading...")
providerClassName = iris.cls("dc.musketeersbr.sqlembeddings.Embeddings").GetEmbeddingsProvider(providerName)
print(f"Selected providerClassName: {providerClassName}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Selected providerClassName: {providerClassName}")
model = iris.cls(providerClassName).LoadEmbeddingModel(modelName)
print(f"Selected model: {model}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Selected model: {model}")
loadedProviders[providerName] = model

return model, loadedProviders
Expand All @@ -73,9 +73,9 @@ ClassMethod StartServerPy(port As %Integer) [ Language = python, Private ]
modelFQN = request['modelFQN']
providerName = modelFQN.split('/')[0]
modelName = '/'.join(modelFQN.split('/')[1:])
print(f"Selected modelFQN: {modelFQN}")
print(f"Selected providerName: {providerName}")
print(f"Selected model: {modelName}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Selected modelFQN: {modelFQN}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Selected providerName: {providerName}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Selected model: {modelName}")

model, _ = load_provider(providerName, modelName, loadedProviders)
providerClassName = iris.cls("dc.musketeersbr.sqlembeddings.Embeddings").GetEmbeddingsProvider(providerName)
Expand All @@ -91,46 +91,46 @@ ClassMethod StartServerPy(port As %Integer) [ Language = python, Private ]
loadedProviders = {}
error_msg = None

print(f"Server listening on port {port}...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Server listening on port {port}...")

try:
while True:
client_socket, addr = server_socket.accept()
print(f"Connection from {addr}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Connection from {addr}")

# if there was an error loading the model, send an error message
if error_msg is not None:
request = json.dumps({'error': error_msg})
print(f"Sending error: {request}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Sending error: {request}")
client_socket.sendall(request.encode('utf-8'))
else:
request = client_socket.recv(99999)
request = request.decode('utf-8')
if request:
print(f"Received: {request}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Received: {request}")

request = json.loads(request)
document = request['document']
provider, model = get_provider_from_request(request, loadedProviders)
print(f"Selected provider: {provider}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Selected provider: {provider}")
embeddings_string = provider.ExecuteEmbeddings(document, model)
#; embeddings_string = embeddings_string.encode('utf-8')
response = json.dumps({'embeddings': embeddings_string})
client_socket.sendall(response.encode('utf-8'))
print(f"Response sent.")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Response sent.")

client_socket.close()

except Exception as e:
print(f"Error: {e}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Error: {e}")
error_msg = str(e)
request = json.dumps({'error': error_msg})
print(f"Sending error: {request}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer(f"Sending error: {request}")
client_socket.sendall(request.encode('utf-8'))

finally:
server_socket.close()
print("Server closed.")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogServer("Server closed.")
}

/// Starts a client to connect to the embeddings services server and send a message.
Expand All @@ -149,6 +149,17 @@ ClassMethod StartClient(pMessage As %DynamicObject, pServer As %String = "localh
Return embeddings
}

/// Starts a client to send a request to the server to generate embeddings for the given message.
///
/// This method starts a client to send a request to the server to generate embeddings for the given message.
/// The client will connect to the given server and port, and send the message as a JSON-serialized object.
/// The server will generate the embeddings and return them as a JSON-serialized object, which will be deserialized
/// and returned as a string.
///
/// @param message The message to be embedded as a %DynamicObject.
/// @param server The server host name or IP address.
/// @param port The port number to connect to.
/// @return The embeddings as a string.
ClassMethod StartClientPy(message As %DynamicObject, server As %String = "localhost", port As %Integer = "") As %String [ Language = python ]
{
import socket
Expand All @@ -166,7 +177,7 @@ ClassMethod StartClientPy(message As %DynamicObject, server As %String = "localh
:param sig: The signal number that was received.
:param frame: The current stack frame.
"""
print("\nClient shutting down gracefully...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient("Client shutting down gracefully...")
client_socket.close()
sys.exit(0)

Expand All @@ -183,7 +194,7 @@ ClassMethod StartClientPy(message As %DynamicObject, server As %String = "localh
s.bind(('', 0))
available_port = s.getsockname()[1]
s.close()
print(f"Available port: {available_port}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient(f"Available port: {available_port}")
return available_port

def check_if_server_is_down(server, port):
Expand All @@ -200,7 +211,7 @@ ClassMethod StartClientPy(message As %DynamicObject, server As %String = "localh
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((server, port))
print(f"Server is {'up' if result == 0 else 'down'} on port {port}.")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient(f"Server is {'up' if result == 0 else 'down'} on port {port}.")
return result != 0

def kill_server():
Expand Down Expand Up @@ -233,10 +244,10 @@ ClassMethod StartClientPy(message As %DynamicObject, server As %String = "localh
curr_port = iris.cls("dc.musketeersbr.sqlembeddings.Socket").GetCurrentPort()
curr_pid = iris.cls("dc.musketeersbr.sqlembeddings.Socket").GetCurrentProcessID()
if curr_pid:
print(f"Killing server on port {curr_port} and pid {curr_pid}...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient(f"Killing server on port {curr_port} and pid {curr_pid}...")
kill_server()

print(f"Starting a new server on port {port}...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient(f"Starting a new server on port {port}...")
pid = iris.cls("dc.musketeersbr.sqlembeddings.Socket").StartServerBackground(port)
#; iris.cls("dc.musketeersbr.sqlembeddings.Socket").SaveCurrentProcessID(pid)
#; iris.cls("dc.musketeersbr.sqlembeddings.Socket").SaveCurrentPort(port)
Expand All @@ -251,16 +262,16 @@ ClassMethod StartClientPy(message As %DynamicObject, server As %String = "localh

try:
if str(port) == '':
print("No port specified, using current port...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient("No port specified, using current port...")
port = iris.cls("dc.musketeersbr.sqlembeddings.Socket").GetCurrentPort()
print(f"Current port: {port}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient(f"Current port: {port}")

if str(port) == '':
print("No port specified, using default port...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient("No port specified, using default port...")
port = get_available_port()
print(f"Selected port: {port}")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient(f"Selected port: {port}")

print("Starting the client on port " + str(port) + "...")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient("Starting the client on port " + str(port) + "...")

if check_if_server_is_down(server, port):
start_new_server(port)
Expand All @@ -281,7 +292,7 @@ ClassMethod StartClientPy(message As %DynamicObject, server As %String = "localh
return response["embeddings"]

except ConnectionRefusedError:
print("[Error] Connection refused: Server may be down or not accepting connections.")
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogClient("[Error] Connection refused: Server may be down or not accepting connections.")
start_new_server(port)
return iris.cls("dc.musketeersbr.sqlembeddings.Socket").StartClientPy(message, server, port)

Expand Down Expand Up @@ -317,4 +328,65 @@ ClassMethod GetCurrentProcessID() As %Integer
Return $GET(^IRIS.Temp("MusketeersBR","SQLEMBEDDINGS","PROCESSID"))
}

/// Gets a logger instance for logging information.
///
/// This method creates a logger instance and sets up basic logging configuration
/// to log to a file at /tmp/sql-embeddings.log with level INFO.
///
/// @return A logger instance.
ClassMethod GetLogger() As %SYS.Python [ Language = python ]
{
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(filename='/tmp/sql-embeddings.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

return logger
}

/// Logs a message to the global log and the logger for this class.
///
/// @param message The message to be logged.
ClassMethod Log(message As %String) [ Language = python ]
{
import iris
#; logger = iris.cls("dc.musketeersbr.sqlembeddings.Socket").GetLogger()
#; logger.info(message)
print(message)
iris.cls('dc.musketeersbr.sqlembeddings.Socket').LogToGlobal(message)
}

/// Logs a message to the global ^IRIS.Temp("MusketeersBR","SQLEMBEDDINGS","LOG") array.
///
/// This method is used to log messages from the embeddings services.
/// The messages are stored in a global array and can be accessed later.
///
/// @param pMessage The message to be logged.
ClassMethod LogToGlobal(pMessage As %String)
{
Set ^IRIS.Temp("MusketeersBR","SQLEMBEDDINGS","LOG",$I(^IRIS.Temp("MusketeersBR","SQLEMBEDDINGS","LOG"))) = pMessage
}

/// Logs a message to the server log.
///
/// @param message The message to be logged.
ClassMethod LogServer(message As %String) [ Language = python ]
{
import iris
iris.cls("dc.musketeersbr.sqlembeddings.Socket").Log(f"[Server] {message}")
}

/// Logs a message from the client.
///
/// This method logs a message from the client using the iris Log method.
/// The message will be prefixed with "[Client] " to indicate that it
/// came from the client.
///
/// @param message The message to be logged.
ClassMethod LogClient(message As %String) [ Language = python ]
{
import iris
iris.cls("dc.musketeersbr.sqlembeddings.Socket").Log(f"[Client] {message}")
}

}

0 comments on commit f7f227e

Please sign in to comment.