Skip to content

Commit

Permalink
update proto, improve cache handling
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Apr 21, 2024
1 parent 1dfb615 commit 3d409c0
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 82 deletions.
54 changes: 40 additions & 14 deletions src/OpenAgentsNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import traceback
import json

import asyncio

class BlobStorage:
def __init__(self, id, url, node):
Expand Down Expand Up @@ -145,7 +145,10 @@ def canRun(self,job):
def preRun(self):
pass

def run(self, job):
async def loop(self):
pass

async def run(self, job):
pass

class OpenAgentsNode:
Expand All @@ -159,7 +162,7 @@ class OpenAgentsNode:
poolAddress = None
poolPort = None
failedJobsTracker = []

isLooping = False
def __init__(self, nameOrMeta=None, icon=None, description=None):
name = ""
if isinstance(nameOrMeta, str):
Expand Down Expand Up @@ -190,7 +193,7 @@ def getClient(self):
return self.rpcClient


def reannounce(self):
async def reannounce(self):
# Announce node
time_ms=int(time.time()*1000)
if time_ms >= self.nextNodeAnnounce:
Expand Down Expand Up @@ -223,7 +226,7 @@ def reannounce(self):
runner._nextAnnouncementTimestamp = int(time.time()*1000) + 5000


def executePendingJob(self ):
async def executePendingJob(self ):
client = self.getClient()
for runner in self.runners:
jobs=[]
Expand Down Expand Up @@ -253,10 +256,19 @@ def executePendingJob(self ):
runner._setNode(self)
runner._setJob(job)
runner.preRun()
output=runner.run(job)
runner.postRun()
self.log("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id)
client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output))
async def task():
try:
output=await runner.run(job)
runner.postRun()
self.log("Job completed in "+str(time.time()-t)+" seconds on node "+self.nodeName, job.id)
client.completeJob(rpc_pb2.RpcJobOutput(jobId=job.id, output=output))
except Exception as e:
self.failedJobsTracker.append([job.id, time.time()])
self.log("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
if wasAccepted:
client.cancelJob(rpc_pb2.RpcCancelJob(jobId=job.id, reason=str(e)))
traceback.print_exc()
asyncio.create_task(task())
except Exception as e:
self.failedJobsTracker.append([job.id, time.time()])
self.log("Job failed in "+str(time.time()-t)+" seconds on node "+self.nodeName+" with error "+str(e), job.id)
Expand All @@ -269,17 +281,31 @@ def log(self,message, jobId=None):
if jobId:
self.getClient().logForJob(rpc_pb2.RpcJobLog(jobId=jobId, log=message))

def run(self, poolAddress=None, poolPort=None):

async def loop(self):
if not self.isLooping:
self.isLooping = True
promises = [runner.loop() for runner in self.runners]
await asyncio.gather(*promises)
self.isLooping = False

def start(self, poolAddress=None, poolPort=None):
asyncio.run(self.run(poolAddress, poolPort))

async def run(self, poolAddress=None, poolPort=None):
self.poolAddress = poolAddress or os.getenv('POOL_ADDRESS', "127.0.0.1")
self.poolPort = poolPort or int(os.getenv('POOL_PORT', "5000"))
while True:
try:
self.reannounce()
self.executePendingJob()
time.sleep(10.0/1000.0)
asyncio.create_task(self.loop())
asyncio.create_task(self.reannounce())
asyncio.create_task(self.executePendingJob())
# time.sleep(10.0/1000.0)
await asyncio.sleep(10.0/1000.0)
except Exception as e:
self.log("Error in main loop "+str(e))
traceback.print_exc()
time.sleep(5000.0/1000.0)
await asyncio.sleep(5)
# time.sleep(5)
except KeyboardInterrupt:
break
2 changes: 1 addition & 1 deletion src/events/embeddings.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
"kind": {{meta.kind}},
"created_at": {{sys.timestamp_seconds}},
"tags": [
["output", "application/hyperblob"]
["output", "application/hyperdrive+bundle"]
["param","run-on", "openagents/embeddings" ],
["param", "max-tokens", "{{in.max_tokens}}"],
["param", "overlap", "{{in.overlap}}"],
Expand Down
139 changes: 72 additions & 67 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class Runner (JobRunner):
openai = None
nlpcloud = None

def __init__(self, filters, meta, template, sockets):
super().__init__(filters, meta, template, sockets)
self.device = int(os.getenv('TRANSFORMERS_DEVICE', "-1"))
Expand All @@ -42,73 +43,80 @@ def __init__(self, filters, meta, template, sockets):
if not os.path.exists(self.cachePath):
os.makedirs(self.cachePath)

def split(self, text, chunk_size, overlap , marker, out):
def cacheSet(self, key, value):
with open(self.cachePath+"/"+key+".dat", "wb") as f:
pickle.dump(value, f)

def cacheGet(self, key):
if os.path.exists(self.cachePath+"/"+key+".dat"):
with open(self.cachePath+"/"+key+".dat", "rb") as f:
return pickle.load(f)
return None


def prepare(self, text, chunk_size, overlap , marker, out):
text = text.strip()
enc = tiktoken.get_encoding("cl100k_base")
tokenized_text = enc.encode(text)

for i in range(0, len(tokenized_text), chunk_size-overlap):
chunk_tokens = tokenized_text[i:min(i+chunk_size, len(tokenized_text))]
chunk = enc.decode(chunk_tokens)
out.append([chunk, marker])
chunk = enc.decode(chunk_tokens).strip()
if len(chunk)>0:
out.append([chunk, marker])


# tokens = self.pipe.tokenizer.tokenize(text)
# for i in range(0, len(tokens), chunk_size-overlap):
# chunk_tokens = tokens[i:min(i+chunk_size, len(tokens))]
# chunk = self.pipe.tokenizer.convert_tokens_to_string(chunk_tokens)
# out.append([chunk, marker])

def encode(self, sentences):


out = []
to_encode = []
to_encode_index=[]
out = []

for s in sentences:
hash = hashlib.sha256((self.modelName+":"+s).encode()).hexdigest()
cache_file = self.cachePath+"/"+hash+".dat"
if not os.path.exists(cache_file):
to_encode.append(s)
to_encode_index.append(len(out))
out.append(None)
cached = self.cacheGet(hash)
if cached is not None:
out.append(cached)
else:
with open(cache_file, "rb") as f:
out.append(pickle.load(f))
# use openai for encoding
if self.nlpcloud :
if len(to_encode)>0:
embeddingsvs=self.nlpcloud.embeddings(to_encode).embeddings
to_encode.append([s,hash,len(out)])
out.append(None)

if len(to_encode)>0:
encoded=None
if self.nlpcloud :
encodedRaw=self.nlpcloud.embeddings([x[0] for x in to_encode]).embeddings
encoded = []
for i in range(len(embeddingsvs)):
embeddings = embeddingsvs[i]
embeddings = np.array(embeddings)
encoded.append(embeddings)
elif self.openai:
if len(to_encode)>0:
embeddingsvs=self.openai.embeddings.create(
input=to_encode,
for i in range(len(encodedRaw)):
embeddings = encodedRaw[i]
encoded.append([np.array(embeddings),to_encode[i][1],to_encode[i][2]])
elif self.openai:
encodedRaw=self.openai.embeddings.create(
input=[x[0] for x in to_encode],
model=self.openaiModelName
)
encoded = []
for i in range(len(to_encode)):
embeddings = embeddingsvs.data[i].embedding
embeddings = np.array(embeddings)
encoded.append(embeddings)


# TODO: more apis?
# Use local model
else:
if len(to_encode)>0:
encoded = self.pipe.encode(to_encode)
embeddings = encodedRaw.data[i].embedding
encoded.append([np.array(embeddings),to_encode[i][1],to_encode[i][2]])
# TODO: more apis?
else: # Use local models
encodedRaw = self.pipe.encode([x[0] for x in to_encode], show_progress_bar=True)
encoded = []
for i in range(len(to_encode)):
embeddings = encodedRaw[i]
encoded.append([embeddings,to_encode[i][1],to_encode[i][2]])

for i in range(len(to_encode_index)):
out[to_encode_index[i]] = encoded[i]
hash = hashlib.sha256((self.modelName+":"+to_encode[i]).encode()).hexdigest()
with open(self.cachePath+"/"+hash+".dat", "wb") as f:
pickle.dump(encoded[i], f)
for i in range(len(encoded)):
embeddings = encoded[i][0]
hash = encoded[i][1]
index = encoded[i][2]
out[index] = embeddings
self.cacheSet(hash, embeddings)

return out

def quantize(self, embeddings):
if len(embeddings) == 0:
return embeddings
binary_embeddings = quantize_embeddings(embeddings, precision="binary")
return binary_embeddings

Expand All @@ -119,7 +127,7 @@ def getParamValue(key,default=None):
model = getParamValue("model", self.modelName)
return model == self.modelName

def run(self,job):
async def run(self,job):
def getParamValue(key,default=None):
param = [x for x in job.param if x.key == key]
return param[0].value[0] if len(param) > 0 else default
Expand All @@ -142,7 +150,7 @@ def getParamValue(key,default=None):
if marker != "query": marker="passage"
if data_type == "text":
sentences.append([data,marker])
elif data_type=="application/hyperblob":
elif data_type=="application/hyperdrive+bundle":
blobDisk = self.openStorage(data)
files = blobDisk.list()
supportedExts = ["html","txt","htm","md"]
Expand All @@ -155,33 +163,32 @@ def getParamValue(key,default=None):

# Check local cache
self.log("Check cache...")
cacheId = str( self.modelName) + str(outputFormat) + str(max_tokens) + str(overlap) + str(quantize) + "".join([sentences[i][0] + ":" + sentences[i][1] for i in range(len(sentences))])
cacheId = hashlib.sha256(cacheId.encode("utf-8")).hexdigest()
cacheFile = os.path.join(self.cachePath, cacheId+".dat")
if os.path.exists(cacheFile):
with open(cacheFile, "r") as f:
return f.read()
cacheId = hashlib.sha256(
(str( self.modelName) + str(outputFormat) + str(max_tokens) + str(overlap) + str(quantize) + "".join([sentences[i][0] + ":" + sentences[i][1] for i in range(len(sentences))])).encode("utf-8")).hexdigest()
cached = self.cacheGet(cacheId)
if cached is not None:
self.log("Cache hit")
return cached

# Split long sentences
self.log("Split long sentences...")
self.log("Prepare sentences...")
sentences_chunks=[]
for sentence in sentences:
self.split(sentence[0], max_tokens, overlap, sentence[1], sentences_chunks)
self.prepare(sentence[0], max_tokens, overlap, sentence[1], sentences_chunks)
sentences = sentences_chunks


# Create embeddings
self.log("Create embeddings for "+str(len(sentences))+" excerpts. max_tokens="+str(max_tokens)+", overlap="+str(overlap))
self.log("Create embeddings for "+str(len(sentences))+" excerpts. max_tokens="+str(max_tokens)+", overlap="+str(overlap)+", quantize="+str(quantize)+", model="+str(self.modelName))
embeddings = self.encode([(sentences[i][1]+": "+sentences[i][0] if self.addMarkersToSentences else sentences[i][0]) for i in range(len(sentences))])
if quantize:
self.log("Quantize embeddings")
embeddings = self.quantize(embeddings)


# Serialize to an output format and return as string
self.log("Embeddings ready. Serialize for output...")
output = ""
if outputFormat=="application/hyperblob":
if outputFormat=="application/hyperdrive+bundle":
blobDisk = self.createStorage()
for i in range(len(sentences)):
dtype = embeddings[i].dtype
Expand All @@ -194,10 +201,9 @@ def getParamValue(key,default=None):
blobDisk.writeBytes(str(i)+".embeddings", sentences_bytes)
blobDisk.writeBytes(str(i)+".embeddings.kind", marker.encode("utf-8"))
blobDisk.writeBytes(str(i)+".embeddings.vectors", embeddings_bytes)
output = blobDisk.getUrl()
output = blobDisk.getUrl()
blobDisk.close()
with open(cacheFile, "w") as f:
f.write(output)

else:
jsonOut = []
for i in range(len(sentences)):
Expand All @@ -209,11 +215,10 @@ def getParamValue(key,default=None):
[sentences[i][0], embeddings_b64, str(dtype), shape , sentences[i][1]]
)
output=json.dumps(jsonOut)
with open(cacheFile, "w") as f:
f.write(output)


self.cacheSet(cacheId, output)
return output

node = OpenAgentsNode(NodeConfig.meta)
node.registerRunner(Runner(filters=EmbeddingsEvent.filters,sockets=EmbeddingsEvent.sockets,meta=EmbeddingsEvent.meta,template=EmbeddingsEvent.template))
node.run()
node.start()

0 comments on commit 3d409c0

Please sign in to comment.