Skip to content

Commit

Permalink
Comparison fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptobench committed Dec 15, 2024
1 parent 32eee01 commit e6fc0a3
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 29 deletions.
87 changes: 62 additions & 25 deletions stats-backend/api2/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
r = redis.Redis(connection_pool=pool)


def normalize_properties(data):
"""Normalize the properties dictionary for consistent comparison."""
# Sort lists within the dictionary
for key, value in data.items():
if isinstance(value, list):
data[key] = sorted(value)
return dict(sorted(data.items())) # Return sorted dictionary

@app.task
def update_providers_info(node_props):
now = timezone.now()
Expand Down Expand Up @@ -67,7 +75,6 @@ def update_providers_info(node_props):
new_nodes.append(node)
existing_nodes_dict[node.node_id] = node


# Update existing_nodes_dict with newly created nodes
updated_nodes = Node.objects.filter(node_id__in=new_provider_ids)
for node in updated_nodes:
Expand All @@ -85,10 +92,12 @@ def update_providers_info(node_props):
# Get existing Offers
existing_offers = Offer.objects.filter(
provider__node_id__in=provider_ids,
runtime__in=[data["golem.runtime.name"] for _, data in provider_data_list]
runtime__in=[data["golem.runtime.name"]
for _, data in provider_data_list]
).select_related('provider')

existing_offers_dict = {(offer.provider.node_id, offer.runtime): offer for offer in existing_offers}
existing_offers_dict = {
(offer.provider.node_id, offer.runtime): offer for offer in existing_offers}

# Find which offers are new
existing_offer_keys = set(existing_offers_dict.keys())
Expand All @@ -110,10 +119,11 @@ def update_providers_info(node_props):
new_offers.append(offer)
existing_offers_dict[(provider_id, runtime)] = offer


# Update existing_offers_dict with newly created offers
updated_offers = Offer.objects.filter(provider__node_id__in=provider_ids).select_related('provider')
existing_offers_dict.update({(offer.provider.node_id, offer.runtime): offer for offer in updated_offers})
updated_offers = Offer.objects.filter(
provider__node_id__in=provider_ids).select_related('provider')
existing_offers_dict.update(
{(offer.provider.node_id, offer.runtime): offer for offer in updated_offers})

# Now process and update offers
offers_to_update = [] # list of offers to bulk update
Expand All @@ -137,9 +147,12 @@ def update_providers_info(node_props):
if not monthly_pricing:
print(f"Monthly price is {monthly_pricing}")
offer.monthly_price_glm = min(monthly_pricing, MAX_PRICE_CAP_VALUE)
offer.monthly_price_usd = min(monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE)
offer.hourly_price_glm = min(monthly_pricing / hours_in_current_month, MAX_PRICE_CAP_VALUE)
offer.hourly_price_usd = min(offer.monthly_price_usd / hours_in_current_month, MAX_PRICE_CAP_VALUE)
offer.monthly_price_usd = min(
monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE)
offer.hourly_price_glm = min(
monthly_pricing / hours_in_current_month, MAX_PRICE_CAP_VALUE)
offer.hourly_price_usd = min(
offer.monthly_price_usd / hours_in_current_month, MAX_PRICE_CAP_VALUE)

vcpu_needed = data.get("golem.inf.cpu.threads", 0)
memory_needed = data.get("golem.inf.mem.gib", 0.0)
Expand All @@ -150,25 +163,37 @@ def update_providers_info(node_props):
).order_by("cpu_diff", "memory_diff", "price_usd").first()

if closest_ec2 and monthly_pricing:
offer_price_usd = min(monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE)
ec2_monthly_price = min(closest_ec2.price_usd * 730, MAX_PRICE_CAP_VALUE)
offer_price_usd = min(
monthly_pricing * glm_usd_value.current_price, MAX_PRICE_CAP_VALUE)
ec2_monthly_price = min(
closest_ec2.price_usd * 730, MAX_PRICE_CAP_VALUE)
if ec2_monthly_price != 0:
offer_is_more_expensive = offer_price_usd > ec2_monthly_price
offer_is_cheaper = offer_price_usd < ec2_monthly_price
offer.is_overpriced = offer_is_more_expensive
offer.overpriced_compared_to = closest_ec2 if offer_is_more_expensive else None
offer.times_more_expensive = offer_price_usd / float(ec2_monthly_price) if offer_is_more_expensive else None
offer.suggest_env_per_hour_price = float(closest_ec2.price_usd) / glm_usd_value.current_price
offer.times_more_expensive = offer_price_usd / \
float(ec2_monthly_price) if offer_is_more_expensive else None
offer.suggest_env_per_hour_price = float(
closest_ec2.price_usd) / glm_usd_value.current_price
offer.cheaper_than = closest_ec2 if offer_is_cheaper else None
offer.times_cheaper = float(ec2_monthly_price) / offer_price_usd if offer_is_cheaper else None
offer.times_cheaper = float(
ec2_monthly_price) / offer_price_usd if offer_is_cheaper else None
else:
print("EC2 monthly price is zero, cannot compare offer prices.")

# Always update the offer if any properties have changed
# Compare existing properties with new data
if offer.properties != data:
normalized_existing = normalize_properties(offer.properties.copy())
normalized_new = normalize_properties(data.copy())

if normalized_existing != normalized_new:
print(f"DETECTED CHANGE Updating offer {offer.id}")
offer.properties = data
offers_to_update.append(offer)
else:
print(f"No changes in offer {offer.id}")


# Bulk update offers if any
if offers_to_update:
Expand All @@ -186,12 +211,22 @@ def update_providers_info(node_props):
nodes_to_update = []
for provider_id, data in provider_data_list:
node = existing_nodes_dict[provider_id]
node.wallet = data.get("wallet")
node.network = data.get('network', 'mainnet')
node.type = "provider"
nodes_to_update.append(node)
# Check if any field has changed before adding to update list
if (node.wallet != data.get("wallet") or
node.network != data.get('network', 'mainnet') or
node.type != "provider"):

node.wallet = data.get("wallet")
node.network = data.get('network', 'mainnet')
node.type = "provider"
node.updated_at = timezone.now() # Explicitly set updated_at
nodes_to_update.append(node)

if nodes_to_update:
Node.objects.bulk_update(nodes_to_update, ['wallet', 'network', 'updated_at', 'type'])
Node.objects.bulk_update(
nodes_to_update,
['wallet', 'network', 'type', 'updated_at'] # Include updated_at in the fields list
)
print(f"Done updating {len(provider_ids)} providers")


Expand All @@ -201,15 +236,14 @@ def update_providers_info(node_props):
from .yapapi_utils import build_parser, print_env_info, format_usage # noqa: E402




async def list_offers(
conf: Configuration, subnet_tag: str, current_scan_providers, node_props
):
async with conf.market() as client:
market_api = Market(client)
dbuild = DemandBuilder()
dbuild.add(yp.NodeInfo(name="some scanning node", subnet_tag=subnet_tag))
dbuild.add(yp.NodeInfo(
name="some scanning node", subnet_tag=subnet_tag))
dbuild.add(yp.Activity(expiration=datetime.now(timezone.utc)))

async with market_api.subscribe(
Expand All @@ -234,7 +268,9 @@ async def monitor_nodes_status(subnet_tag: str = "public"):
try:
await asyncio.wait_for(
list_offers(
Configuration(api_config=ApiConfig()),
Configuration(api_config=ApiConfig(
app_key="stats"
)),
subnet_tag=subnet_tag,
node_props=node_props,
current_scan_providers=current_scan_providers,
Expand All @@ -243,7 +279,8 @@ async def monitor_nodes_status(subnet_tag: str = "public"):
)
except asyncio.TimeoutError:
print("Scan timeout reached")
print(f"In the current scan, we found {len(current_scan_providers)} providers")
print(
f"In the current scan, we found {len(current_scan_providers)} providers")
# Delay update_nodes_data call using Celery

update_providers_info.delay(node_props)
7 changes: 5 additions & 2 deletions stats-backend/yapapi/examples/low-level-api/list-offers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ async def list_offers(conf: Configuration, subnet_tag: str):
async with conf.market() as client:
market_api = Market(client)
dbuild = DemandBuilder()
dbuild.add(yp.NodeInfo(name="Golem Stats Indexer", subnet_tag=subnet_tag))
dbuild.add(yp.NodeInfo(
name="Golem Stats Indexer", subnet_tag=subnet_tag))
dbuild.add(yp.Activity(expiration=datetime.now(timezone.utc)))
async with market_api.subscribe(
dbuild.properties, dbuild.constraints
Expand Down Expand Up @@ -132,7 +133,9 @@ def main():
asyncio.get_event_loop().run_until_complete(
asyncio.wait_for(
list_offers(
Configuration(api_config=ApiConfig()), # YAGNA_APPKEY will be loaded from env
Configuration(api_config=ApiConfig(
app_key="stats"
)), # YAGNA_APPKEY will be loaded from env
subnet_tag="public",
),
timeout=60,
Expand Down
7 changes: 5 additions & 2 deletions stats-backend/yapapi/examples/low-level-api/v2/list-offers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ async def list_offers(conf: Configuration, subnet_tag: str):
async with conf.market() as client:
market_api = Market(client)
dbuild = DemandBuilder()
dbuild.add(yp.NodeInfo(name="Golem Stats Indexer", subnet_tag=subnet_tag))
dbuild.add(yp.NodeInfo(
name="Golem Stats Indexer", subnet_tag=subnet_tag))
dbuild.add(yp.Activity(expiration=datetime.now(timezone.utc)))
async with market_api.subscribe(
dbuild.properties, dbuild.constraints
Expand Down Expand Up @@ -104,7 +105,9 @@ def main():
asyncio.get_event_loop().run_until_complete(
asyncio.wait_for(
list_offers(
Configuration(api_config=ApiConfig()),
Configuration(api_config=ApiConfig(
app_key="stats"
)),
subnet_tag="public",
),
timeout=15,
Expand Down

0 comments on commit e6fc0a3

Please sign in to comment.