From 4c912f58180db4c335bd65d5cfaccac5228b7c64 Mon Sep 17 00:00:00 2001 From: Yu Zeng Date: Thu, 12 Jan 2023 19:10:48 -0800 Subject: [PATCH] Prepare for KCL Python Release 2.1.0 --- README.md | 3 ++ samples/sample_kinesis_wordputter.py | 61 ++++++++++++---------------- setup.py | 2 +- 3 files changed, 30 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 459a2fe..5e11584 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,9 @@ all languages. ## Release Notes +### Release 2.1.0 (January 12, 2023) +* Upgraded to use version 2.4.4 of the [Amazon Kinesis Client library][kinesis-github] + ### Release 2.0.6 (November 23, 2021) * Upgraded multiple dependencies [PR #152](https://github.com/awslabs/amazon-kinesis-client-python/pull/152) * Amazon Kinesis Client Library 2.3.9 diff --git a/samples/sample_kinesis_wordputter.py b/samples/sample_kinesis_wordputter.py index 8b40b3b..62b75dd 100644 --- a/samples/sample_kinesis_wordputter.py +++ b/samples/sample_kinesis_wordputter.py @@ -4,62 +4,58 @@ SPDX-License-Identifier: Apache-2.0 ''' from __future__ import print_function -import sys, random, time, argparse -from boto import kinesis -def get_stream_status(conn, stream_name): +import argparse +import sys +import time + +import boto3 + +def get_stream_status(kinesis, stream_name): ''' Query this provided connection object for the provided stream's status. - - :type conn: boto.kinesis.layer1.KinesisConnection + :type conn: Kinesis.Client :param conn: A connection to Amazon Kinesis - :type stream_name: str :param stream_name: The name of a stream. - :rtype: str :return: The stream's status ''' - r = conn.describe_stream(stream_name) + r = kinesis.describe_stream(StreamName=stream_name) description = r.get('StreamDescription') return description.get('StreamStatus') -def wait_for_stream(conn, stream_name): +def wait_for_stream(kinesis, stream_name): ''' Wait for the provided stream to become active. - - :type conn: boto.kinesis.layer1.KinesisConnection - :param conn: A connection to Amazon Kinesis - + :type kinesis: Kinesis.Client + :param kinesis: A low-level client representing Amazon Kinesis :type stream_name: str :param stream_name: The name of a stream. ''' SLEEP_TIME_SECONDS = 3 - status = get_stream_status(conn, stream_name) + status = get_stream_status(kinesis, stream_name) while status != 'ACTIVE': print('{stream_name} has status: {status}, sleeping for {secs} seconds'.format( stream_name = stream_name, status = status, secs = SLEEP_TIME_SECONDS)) time.sleep(SLEEP_TIME_SECONDS) # sleep for 3 seconds - status = get_stream_status(conn, stream_name) + status = get_stream_status(kinesis, stream_name) -def put_words_in_stream(conn, stream_name, words): +def put_words_in_stream(kinesis, stream_name, words): ''' Put each word in the provided list of words into the stream. - - :type conn: boto.kinesis.layer1.KinesisConnection - :param conn: A connection to Amazon Kinesis - + :type kinesis: Kinesis.Client + :param kinesis: A connection to Amazon Kinesis :type stream_name: str :param stream_name: The name of a stream. - :type words: list :param words: A list of strings to put into the stream. ''' for w in words: try: - conn.put_record(stream_name, w, w) + kinesis.put_record(StreamName=stream_name, Data=w, PartitionKey=w) print("Put word: " + w + " into stream: " + stream_name) except Exception as e: sys.stderr.write("Encountered an exception while trying to put a word: " @@ -69,16 +65,12 @@ def put_words_in_stream_periodically(conn, stream_name, words, period_seconds): ''' Puts words into a stream, then waits for the period to elapse then puts the words in again. There is no strict guarantee about how frequently we put each word into the stream, just that we will wait between iterations. - :type conn: boto.kinesis.layer1.KinesisConnection :param conn: A connection to Amazon Kinesis - :type stream_name: str :param stream_name: The name of a stream. - :type words: list :param words: A list of strings to put into the stream. - :type period_seconds: int :param period_seconds: How long to wait, in seconds, between iterations over the list of words. ''' @@ -90,10 +82,8 @@ def put_words_in_stream_periodically(conn, stream_name, words, period_seconds): if __name__ == '__main__': parser = argparse.ArgumentParser(''' Puts words into a stream. - # Using the -w option multiple times sample_wordputter.py -s STREAM_NAME -w WORD1 -w WORD2 -w WORD3 -p 3 - # Passing input from STDIN echo "WORD1\\nWORD2\\nWORD3" | sample_wordputter.py -s STREAM_NAME -p 3 ''') @@ -115,18 +105,19 @@ def put_words_in_stream_periodically(conn, stream_name, words, period_seconds): one of the standard credentials providers. ''' print("Connecting to stream: {s} in {r}".format(s=stream_name, r=args.region)) - conn = kinesis.connect_to_region(region_name = args.region) + kinesis = boto3.client('kinesis', region_name=args.region) + try: - status = get_stream_status(conn, stream_name) + status = get_stream_status(kinesis, stream_name) if 'DELETING' == status: print('The stream: {s} is being deleted, please rerun the script.'.format(s=stream_name)) sys.exit(1) elif 'ACTIVE' != status: - wait_for_stream(conn, stream_name) + wait_for_stream(kinesis, stream_name) except: # We'll assume the stream didn't exist so we will try to create it with just one shard - conn.create_stream(stream_name, 1) - wait_for_stream(conn, stream_name) + kinesis.create_stream(StreamName=stream_name, ShardCount=1) + wait_for_stream(kinesis, stream_name) # Now the stream should exist if len(args.words) == 0: print('No -w options provided. Waiting on input from STDIN') @@ -134,6 +125,6 @@ def put_words_in_stream_periodically(conn, stream_name, words, period_seconds): else: words = args.words if args.period != None: - put_words_in_stream_periodically(conn, stream_name, words, args.period) + put_words_in_stream_periodically(kinesis, stream_name, words, args.period) else: - put_words_in_stream(conn, stream_name, words) + put_words_in_stream(kinesis, stream_name, words) \ No newline at end of file diff --git a/setup.py b/setup.py index 056eb63..048c091 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ PACKAGE_NAME = 'amazon_kclpy' JAR_DIRECTORY = os.path.join(PACKAGE_NAME, 'jars') -PACKAGE_VERSION = '2.0.6' +PACKAGE_VERSION = '2.1.0' PYTHON_REQUIREMENTS = [ 'boto', # argparse is part of python2.7 but must be declared for python2.6