Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复了python3.8及以上版本非windows系统不能监听配置的BUG #125

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
22 changes: 20 additions & 2 deletions nacos/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from multiprocessing import Process, Manager, Queue, pool
from threading import RLock, Thread
from multiprocessing import RLock as PRLock

try:
# python3.6
Expand Down Expand Up @@ -251,13 +252,19 @@ def __init__(self, server_addresses, endpoint=None, namespace=None, ak=None, sk=
self.username = username
self.password = password

self.server_list_lock = RLock()
if platform.system() == "windows":
self.server_list_lock = RLock()
else:
self.server_list_lock = PRLock()
self.server_offset = 0

self.watcher_mapping = dict()
self.subscribed_local_manager = SubscribedLocalManager()
self.subscribe_timer_manager = NacosTimerManager()
self.pulling_lock = RLock()
if platform.system() == "windows":
self.pulling_lock = RLock()
else:
self.pulling_lock = PRLock()
self.puller_mapping = None
self.notify_queue = None
self.callback_tread_pool = None
Expand Down Expand Up @@ -1163,6 +1170,17 @@ def stop_subscribe(self):
"""
self.subscribe_timer_manager.stop()

def __getstate__(self):
self_dict = self.__dict__.copy()
# pool object cannot be passed
del self_dict['callback_tread_pool']
# weak-ref object cannot be pickled
del self_dict['process_mgr']
return self_dict

def __setstate__(self, state):
self.__dict__.update(state)


if DEBUG:
NacosClient.set_debugging()
94 changes: 49 additions & 45 deletions test/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
import time
import shutil

SERVER_1 = "100.69.207.65"
SERVER_1 = "192.168.0.104"
SERVER_ADDRESSES = "%s:8848, 100.69.207.66:8848" % SERVER_1
SERVER_ADDRESSES = "127.0.0.1:8848"
NAMESPACE = "6cface1f-2f1b-4744-a59d-fd818b91a799"
NAMESPACE = ""
SERVER_ADDRESSES = "192.168.0.104:8848"
NAMESPACE = "28162559-3344-4bec-adc2-cc7dc8fa13f2"

# Set the following values if authentication mode is enabled on the server
USERNAME = None
Expand All @@ -25,6 +24,27 @@
# Set the following option if http requests need through by proxy
# client.set_options(proxies={"http":"192.168.56.1:809"})


class ShareFakeWatcher:
content = None
count = 0


def test_cb_fake_watcher(args):
print(args)
ShareFakeWatcher.count += 1
ShareFakeWatcher.content = args["content"]


class ShareLongPulling:
content = None


def cb_long_pulling(x):
ShareLongPulling.content = x["content"]
print(ShareLongPulling.content)


class TestClient(unittest.TestCase):
def test_get_server(self):
self.assertEqual(client.get_server(), (SERVER_1, 8848))
Expand Down Expand Up @@ -57,81 +77,65 @@ def test_fake_watcher(self):
d = "test"
g = "DEFAULT_GROUP"

class Share:
content = None
count = 0

cache_key = "+".join([d, g, NAMESPACE])

def test_cb(args):
print(args)
Share.count += 1
Share.content = args["content"]

client.add_config_watcher(d, g, test_cb)
client.add_config_watcher(d, g, test_cb)
client.add_config_watcher(d, g, test_cb)
client.add_config_watcher(d, g, test_cb_fake_watcher)
client.add_config_watcher(d, g, test_cb_fake_watcher)
client.add_config_watcher(d, g, test_cb_fake_watcher)
time.sleep(1)
client.notify_queue.put((cache_key, "xxx", "md51"))
time.sleep(1)
self.assertEqual(Share.content, "xxx")
self.assertEqual(Share.count, 3)
self.assertEqual(ShareFakeWatcher.content, "xxx")
self.assertEqual(ShareFakeWatcher.count, 3)

client.remove_config_watcher(d, g, test_cb)
Share.count = 0
client.remove_config_watcher(d, g, test_cb_fake_watcher)
ShareFakeWatcher.count = 0
client.notify_queue.put((cache_key, "yyy", "md52"))
time.sleep(1)
self.assertEqual(Share.content, "yyy")
self.assertEqual(Share.count, 2)
self.assertEqual(ShareFakeWatcher.content, "yyy")
self.assertEqual(ShareFakeWatcher.count, 2)

client.remove_config_watcher(d, g, test_cb, True)
Share.count = 0
client.remove_config_watcher(d, g, test_cb_fake_watcher, True)
ShareFakeWatcher.count = 0
client.notify_queue.put((cache_key, "not effective, no watchers", "md53"))
time.sleep(1)
self.assertEqual(Share.content, "yyy")
self.assertEqual(Share.count, 0)
self.assertEqual(ShareFakeWatcher.content, "yyy")
self.assertEqual(ShareFakeWatcher.count, 0)

Share.count = 0
client.add_config_watcher(d, g, test_cb)
ShareFakeWatcher.count = 0
client.add_config_watcher(d, g, test_cb_fake_watcher)
time.sleep(1)
client.notify_queue.put((cache_key, "zzz", "md54"))
time.sleep(1)
self.assertEqual(Share.content, "zzz")
self.assertEqual(Share.count, 1)
self.assertEqual(ShareFakeWatcher.content, "zzz")
self.assertEqual(ShareFakeWatcher.count, 1)

Share.count = 0
ShareFakeWatcher.count = 0
client.notify_queue.put((cache_key, "not effective, md5 no changes", "md54"))
time.sleep(1)
self.assertEqual(Share.content, "zzz")
self.assertEqual(Share.count, 0)
self.assertEqual(ShareFakeWatcher.content, "zzz")
self.assertEqual(ShareFakeWatcher.count, 0)

def test_long_pulling(self):
client2 = nacos.NacosClient(SERVER_ADDRESSES, username=USERNAME, password=PASSWORD)
d = "test1_pulling"
g = "Group1"
g2 = "Group2"

class Share:
content = None

def cb(x):
Share.content = x["content"]
print(Share.content)

client2.publish_config(d, g, "test2")
client2.publish_config(d, g2, "test2")
time.sleep(0.5)
# test common
client2.add_config_watcher(d, g, cb)
client2.add_config_watcher(d, g2, cb)
client2.add_config_watcher(d, g, cb_long_pulling)
client2.add_config_watcher(d, g2, cb_long_pulling)
time.sleep(0.5)
client2.publish_config(d, g, "test")
client2.publish_config(d, g2, "test")
time.sleep(1)
self.assertEqual(Share.content, "test")
self.assertEqual(ShareLongPulling.content, "test")
client2.publish_config(d, g2, u"test2中文")
time.sleep(1)
self.assertEqual(Share.content, u"test2中文")
self.assertEqual(ShareLongPulling.content, u"test2中文")

def test_get_from_failover(self):
d = "test_fo"
Expand Down Expand Up @@ -179,7 +183,7 @@ def test_list_naming_instance_offline(self):
self.assertEqual(len(client.list_naming_instance("test.service")["hosts"]), 0)

def test_list_naming_instance_online(self):
client.add_naming_instance("test.service", "1.0.0.1", 8080, "testCluster2", 0.1, "{}", True, True)
client.add_naming_instance("test.service", "127.0.0.1", 9876, "testCluster2", 0.1, "{}", True, True)
self.assertEqual(len(client.list_naming_instance("test.service")["hosts"]), 1)

def test_get_naming_instance(self):
Expand Down