Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to init raydp resource #408

Open
yasaorder opened this issue May 23, 2024 · 1 comment
Open

how to init raydp resource #408

yasaorder opened this issue May 23, 2024 · 1 comment

Comments

@yasaorder
Copy link

yasaorder commented May 23, 2024

My env is python3.9, raydp1.6, java 8, pyspark 3.1.1
when i run the code below, i got the warning below. How can I init the spark resource? And if it is nessary to install pyspark package as set some env variable like scala or java or spark home.

WARNING:
2024-05-23 21:36:07,404	INFO worker.py:1519 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(pid=370364) 
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,467 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,471 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,485 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(RayDPSparkMaster pid=371139) 2024-05-23 21:36:12,502 Thread-2 INFO Log4j appears to be running in a Servlet environment, but there's no log4j-web module available. If you want better web container support, please add the log4j-web JAR to your web archive or server lib directory.
(pid=370363) 
24/05/23 17:29:53 WARN Utils: Your hostname, whu-All-Series resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface eno1)
24/05/23 17:29:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 17:29:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

24/05/23 17:30:17 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/23 17:30:32 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/05/23 17:30:47 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

My code is below:

CODE:
import ray
ray.init()

import raydp

spark = raydp.init_spark(app_name='RayDP Example 2',
                         num_executors=4,
                         executor_cores=4,
                         executor_memory='2GB')

# # normal data processesing with Spark
# df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
# df.show()
# word_count = df.groupBy('word').count()
# word_count.show()
#
# # stop the spark cluster
# raydp.stop_spark()

from pyspark.sql.functions import col
df = spark.range(1, 1000)
# calculate z = x + 2y + 1000
df = df.withColumn("x", col("id")*2)\
  .withColumn("y", col("id") + 200)\
  .withColumn("z", col("x") + 2*col("y") + 1000)

from raydp.utils import random_split
train_df, test_df = random_split(df, [0.7, 0.3])

# PyTorch Code
import torch
class LinearModel(torch.nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.linear = torch.nn.Linear(2, 1)

    def forward(self, x, y):
        x = torch.cat([x, y], dim=1)
        return self.linear(x)

model = LinearModel()
optimizer = torch.optim.Adam(model.parameters())
loss_fn = torch.nn.MSELoss()

def lr_scheduler_creator(optimizer, config):
    return torch.optim.lr_scheduler.MultiStepLR(
      optimizer, milestones=[150, 250, 350], gamma=0.1)

# You can use the RayDP Estimator API or libraries like Ray Train for distributed training.
from raydp.torch import TorchEstimator
estimator = TorchEstimator(
  num_workers = 2,
  model = model,
  optimizer = optimizer,
  loss = loss_fn,
  lr_scheduler_creator=lr_scheduler_creator,
  feature_columns = ["x", "y"],
  label_column = "z",
  batch_size = 1000,
  num_epochs = 2
)
# raydp.torch.TorchEstimator
estimator.fit_on_spark(train_df, test_df)

pytorch_model = estimator.get_model()

estimator.shutdown()
@yasaorder
Copy link
Author

yasaorder commented May 23, 2024

exctually, I mean if it is nessary to install pyspark-bin-hadoop like this:https://spark.apache.org/docs/3.3.1/api/python/getting_started/install.html. Or I just need to install raydp and java.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant