Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Missing old function && Fixed the error showing an error from kafka broker #81

Open
wants to merge 72 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
9f8c4ad
Added issing old function && Fixed the error showing an error from ka…
mshimizu-kx Mar 18, 2021
f279905
Added timeout for cleating a new consumer/producer
mshimizu-kx Mar 18, 2021
4bec0fe
Fixed timeout type for example
mshimizu-kx Mar 18, 2021
b8e5f7b
Custom timestamp for publishWithHeaders
mshimizu-kx Mar 18, 2021
04c8ef4
Updated deprecated pubwithheaders for compatibility
mshimizu-kx Mar 18, 2021
e24e78b
Reverted the change for custom timestamp
mshimizu-kx Mar 18, 2021
5b826ce
Remove callback at deletion and unsubscribe
mshimizu-kx Mar 19, 2021
4600e3f
Corrected documents
mshimizu-kx Mar 19, 2021
7dc6070
Added missing poll_client declaration to a header file
mshimizu-kx Mar 19, 2021
28cb005
Implemented reload; still not working by kafka error
mshimizu-kx Mar 22, 2021
3a1f7a8
Reload needs unsubscribe && Restructured directories for qpacker
mshimizu-kx Mar 23, 2021
24e0edb
Modified install instruction && Added qp.json and configure
mshimizu-kx Mar 23, 2021
2c235e6
Added qp.json in the root
mshimizu-kx Mar 23, 2021
798ae2b
Select libpath in order
mshimizu-kx Mar 23, 2021
2769e90
Added thread
mshimizu-kx Mar 25, 2021
d0ea727
Launch thread for each client.
mshimizu-kx Mar 25, 2021
e51e800
Removed flush && Working
mshimizu-kx Mar 25, 2021
65ee594
Removed manual poll function && Changed document accordingly
mshimizu-kx Mar 26, 2021
2f7dada
Added benchmark
mshimizu-kx Mar 26, 2021
dd64c0b
Removed garbage of manual poll
mshimizu-kx Mar 26, 2021
3a3f47f
Added modified osthread for Windows && Modified thread code
mshimizu-kx Mar 29, 2021
f70204f
Restored non-blocking setting at init
mshimizu-kx Mar 29, 2021
85c95ba
Removed reload
mshimizu-kx Mar 29, 2021
9ff74ff
Used text for document
mshimizu-kx Mar 30, 2021
b137c02
Removed unnecessary loader code
mshimizu-kx Mar 30, 2021
0a55433
Added wrapper of callbacks
mshimizu-kx Mar 30, 2021
57c8a58
Handle deprecated functions in new default functions
mshimizu-kx Mar 30, 2021
377ca6c
Added old function names in reference
mshimizu-kx Mar 31, 2021
a57eb19
Use uj instead of , for statistics && Fixed worng configuration in pr…
mshimizu-kx Apr 2, 2021
142a010
Added qp.json in the root
mshimizu-kx Apr 7, 2021
5e8a48e
Fixed compile flag bug
mshimizu-kx Apr 9, 2021
df4403b
Fixed load path of exmple q file
mshimizu-kx Apr 9, 2021
08111e1
Added missing kafka_deprecated.q in kfk.q
mshimizu-kx Apr 12, 2021
00add15
Fixed some typos in document
mshimizu-kx Apr 19, 2021
4dcb570
Link transformer library
mshimizu-kx May 20, 2021
1f3b0fd
Completed encoder/decoder examples
mshimizu-kx May 20, 2021
3a602be
Fixed typo in decoding example
mshimizu-kx May 20, 2021
adacab8
Added transformer intgration && Cleaned up headers
mshimizu-kx Jun 7, 2021
d14efcb
Fixed wrong static
mshimizu-kx Jun 8, 2021
a5ce5cb
Build examples with a transformer
mshimizu-kx Jun 8, 2021
13299c7
Use qpmake.sh to build kafkakdb.so
mshimizu-kx Jun 8, 2021
fb8131c
Fixed copy path of transformer
mshimizu-kx Jun 9, 2021
6aac4b5
Bare kafka with transformer dependent flag
mshimizu-kx Jun 17, 2021
5742ac8
Build with release-build transformer qpk
mshimizu-kx Jun 28, 2021
5b24a02
Pull transformer qpk
mshimizu-kx Jun 28, 2021
7cf9209
Modified decoder example to filter out header
mshimizu-kx Jun 29, 2021
162923a
Fixed qpmake script
mshimizu-kx Jun 29, 2021
e7d1d48
Pipeline name should not be set to client without transformer
mshimizu-kx Jul 5, 2021
2049d9f
Use transformer ifelse for deleting client
mshimizu-kx Jul 5, 2021
d2f2bda
Fixed update in consumer example
mshimizu-kx Jul 13, 2021
bf52403
Added schema registeration and retrieval
mshimizu-kx Jul 14, 2021
5c90c51
Added schema delete method && Added idle client example
mshimizu-kx Jul 15, 2021
83ce90a
Load schema registry code && Modified schema registration code
mshimizu-kx Jul 15, 2021
96a8259
Added srializer and deserializer of schema registry message
mshimizu-kx Jul 16, 2021
a71eaf9
Fixed function signature of deprecated
mshimizu-kx Jul 16, 2021
d380923
Fixed function signature of deprecated version
mshimizu-kx Jul 19, 2021
8f8abea
Added examples for deprecated version
mshimizu-kx Jul 19, 2021
1415abe
Serialize/deserialize Avro schema
mshimizu-kx Jul 19, 2021
54e99a1
Added serializing/deserializing protobuf
mshimizu-kx Jul 19, 2021
8e02a04
Corrected file path of protobuf in docs of examples
mshimizu-kx Jul 20, 2021
664a31a
Pulled changes on bare kafkakdb from master
mshimizu-kx Jul 20, 2021
7506c58
Added schema registry page
mshimizu-kx Jul 20, 2021
e0ef0ee
Added type mapping
mshimizu-kx Jul 20, 2021
70afabe
Added reference of schema registry
mshimizu-kx Jul 21, 2021
f42fe46
Fixed typo
mshimizu-kx Jul 21, 2021
85472e2
Rewrote test using a barrier
mshimizu-kx Jul 27, 2021
25c69d1
Removed old test
mshimizu-kx Jul 27, 2021
8781cdc
Use kurl instead of curl
mshimizu-kx Jul 29, 2021
437df3c
Use transformer.qpk pulled from registry
mshimizu-kx Jul 29, 2021
d31adbf
Fixed retrieval of schema
mshimizu-kx Jul 29, 2021
65868ce
Modified append in examples
mshimizu-kx Jul 29, 2021
a86ae24
Added transformer related files
mshimizu-kx Aug 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
include/k.h
build/
qpbuild/
clib/transformer*
clib/kurl*
clib/include/k.h
clib/build/
clib/qpbuild/
clib/kafkakdb.so
clib/libs/qtfm*
q/kurl*
tests/bench_producer.log
13 changes: 7 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,18 @@ before_install:
fi
- export FILE_NAME=${FILE_ROOT}-${TRAVIS_OS_NAME}-${TRAVIS_BRANCH}.${FILE_TAIL}
# Build package
- cd clib
- mkdir build
- if [[ $BUILD == "True" && $TRAVIS_OS_NAME == "windows" ]]; then
cd build;
cmake -G "Visual Studio 15 2017 Win64" --config Release .. -DENABLE_SSL:BOOL=OFF;
cmake -G "Visual Studio 15 2017 Win64" --config Release ../.. -DENABLE_SSL:BOOL=OFF;
cmake --build . --config Release --target install;
cd ..;
cd ../..;
elif [[ $BUILD == "True" && ( $TRAVIS_OS_NAME == "linux" || $TRAVIS_OS_NAME == "osx" ) ]]; then
cd build;
cmake .. -DENABLE_SSL:BOOL=OFF;
cmake ../.. -DENABLE_SSL:BOOL=OFF;
cmake --build . --target install;
cd ..;
cd ../..;
fi

script:
Expand All @@ -75,9 +76,9 @@ script:
q test.q tests/ -q;
fi
- if [[ $TRAVIS_OS_NAME == "windows" ]]; then
7z a -tzip -r $FILE_NAME ./build/$FILE_ROOT/*;
7z a -tzip -r $FILE_NAME ./clib/build/$FILE_ROOT/*;
else
tar -zcvf $FILE_NAME -C build/$FILE_ROOT .;
tar -zcvf $FILE_NAME -C clib/build/$FILE_ROOT .;
fi

deploy:
Expand Down
16 changes: 8 additions & 8 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
##%% General Settings %%##vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv#

cmake_minimum_required(VERSION 3.1)
project(kafkakdb C)

# Set library name
set(MY_LIBRARY_NAME kafkakdb)
# Set project name
# Declaration for installing files.
project(kafkakdb C)

# Add src directry
add_subdirectory(src)
add_subdirectory(clib)

# Default option is Release
if(NOT CMAKE_BUILD_TYPE)
Expand All @@ -24,9 +24,9 @@ else()
endif()

# Build package always
file(COPY README.md LICENSE ${INSTALL_SCRIPT} DESTINATION ${PROJECT_BINARY_DIR}/${CMAKE_PROJECT_NAME})
file(COPY examples/ DESTINATION ${PROJECT_BINARY_DIR}/${CMAKE_PROJECT_NAME}/examples/)
file(COPY q/ DESTINATION ${PROJECT_BINARY_DIR}/${CMAKE_PROJECT_NAME}/q/)
file(COPY README.md LICENSE ${INSTALL_SCRIPT} DESTINATION clib/${CMAKE_PROJECT_NAME})
file(COPY examples/ DESTINATION clib/${CMAKE_PROJECT_NAME}/examples/)
file(COPY q/ DESTINATION clib/${CMAKE_PROJECT_NAME}/q/)

# Copy q files to QHOME
install(DIRECTORY q/ DESTINATION $ENV{QHOME}/ CONFIGURATIONS Release)
install(DIRECTORY q/ DESTINATION $ENV{QHOME}/ CONFIGURATIONS Release)
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ Then add the path to `LD_LIBRARY_PATH` (Linux) or `DYLD_LIBRRAY_PATH` (MacOSX) a
```bash

# Linux
$ export LD_LIBRARY_PATH=${KAFKA_LINSTALL_DIR}/lib:${LD_LIBRARY_PATH}
$ export LD_LIBRARY_PATH=${KAFKA_INSTALL_DIR}/lib:${LD_LIBRARY_PATH}
# MacOSX
$ export DYLD_LIBRARY_PATH=${KAFKA_LINSTALL_DIR}/lib:${DYLD_LIBRARY_PATH}
$ export DYLD_LIBRARY_PATH=${KAFKA_INSTALL_DIR}/lib:${DYLD_LIBRARY_PATH}

```

Expand Down Expand Up @@ -107,7 +107,6 @@ $ make install
2. If using OpenSSL, remove `--disable-ssl` from configure command below
3. On macOS with OpenSSL you might need to set `export OPENSSL_ROOT_DIR=/usr/local/Cellar/openssl` before proceeding


#### Windows

##### Install rdkafka
Expand Down Expand Up @@ -209,6 +208,10 @@ w64> MKLINK rdkafka.dll %KAFKA_INSTALL_DIR%\bin\rdkafka.dll

```

### Install qtfm (optional)

Message transformer is supported only for Linux. If you intend to use transformer with kafkakdb, you need to install it to a location where q can serach it (for eaxmple, under `${QHOME}/[os]64`). After installing qtfm.so you need to copy `qtfm.h` to `clib/include` and `transformer.q` to `q/`.

### Install kafkakdb

To use OpenSSL set `OPENSSL_ROOT_DIR` to the install directory of OpenSSL.
Expand All @@ -219,24 +222,28 @@ To use OpenSSL set `OPENSSL_ROOT_DIR` to the install directory of OpenSSL.

]$ git clone https://github.com/KxSystems/kafka.git
]$ cd kafka
]$ mkdir build && cd build
build]$ cmake .. -DENABLE_SSL:BOOL=OFF
]$ mkdir -p clib/build && cd clib/build
build]$ cmake ../.. -DENABLE_SSL:BOOL=OFF
build]$ cmake --build . --target install

```

**Notes:**
1. `cmake --build . --target install` as used in the Linux/MacOS builds installs the required share object and q files to the `QHOME/[ml]64` and `QHOME` directories respectively. If you do not wish to install these files directly, you can execute `cmake --build .` instead of `cmake --build . --target install` and move the files from their build location at `build/kafkakdb`.
2. If you use TLS remove the flag `-DENABLE_SSL:BOOL=OFF`.
3. If you use a transformer library, use a flag `-DUSE_TRANSFORMER:BOOL=ON`, i.e., the build phase may become:

cmake ../../ -DUSE_TRANSFORMER:BOOL=ON

#### Windows

```bat

> git clone https://github.com/KxSystems/kafka.git
> cd kafka
> cd clib
> mkdir build && cd build
build> cmake --config Release .. -DENABLE_SSL:BOOL=OFF
build> cmake --config Release ..\.. -DENABLE_SSL:BOOL=OFF
build> cmake --build . --config Release --target install

```
Expand All @@ -246,6 +253,7 @@ build> cmake --build . --config Release --target install
1. `cmake --build . --config Release --target install` installs the required share object and q files to the `QHOME\w64` and `QHOME` directories respectively. If you do not wish to install these files directly, you can execute `cmake --build . --config Release` instead of `cmake --build . --config Release --target install` and move the files from their build location at `build/kafkakdb`.
2. You can use flag `cmake -G "Visual Studio 16 2019" -A Win32` if building 32-bit version.
3. If you use TLS remove the flag `-DENABLE_SSL:BOOL=OFF`.
4. Windows does not support transformer library.

## Quick Start

Expand Down
18 changes: 18 additions & 0 deletions clib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
##%% General Settings %%##vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv#

cmake_minimum_required(VERSION 3.1)

# Set project name
# Don't move this declaration. `PROJECT_SOURCE_DIR` must be this directorty.
project(kafkakdb C)

# Set library name
set(MY_LIBRARY_NAME kafkakdb)

# Add src directry
add_subdirectory(src)

# Default option is Release
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release)
endif()
89 changes: 89 additions & 0 deletions clib/include/kafkakdb_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#ifndef __KAFKAKDB_CLIENT_H__
#define __KAFKAKDB_CLIENT_H__

//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Load Libraries //
//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//

#include <rdkafka.h>
#include "socketpair.h"
#include "k.h"

//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Macros //
//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//

#ifdef _WIN32
#define EXP __declspec(dllexport)
#else
#define EXP
#endif

//++++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Global Variables //
//++++++++++++++++++++++++++++++++++++++++++++++++++++++++//

//%% Number to String %%//vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv/

/**
* @brief Buffer to write out an integer number.
*/
static char NUMBER[11];

//%% Interface %%//vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv/

#ifdef _WIN32
extern SOCKET spair[2];
#else
#define SOCKET_ERROR -1
extern I spair[2];
#endif

/**
* @brief Thread pool for polling client.
*/
static K ALL_THREADS;

/**
* @brief Client handles expressed in symbol list
*/
extern K CLIENTS;

//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Private Functions //
//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//

//%% Index Conversion %%//vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv/

/**
* @brief Retrieve client handle from a given index.
* @param client_idx: Index of client.
* @return
* - symbol: client handle if index is valid
* - null: error message if index is not valid
*/
rd_kafka_t *index_to_handle(K client_idx);

/**
* @brief Retrieve index from a given client handle.
* @param handle: Client handle.
* @return
* - int: Index of the given client in `CLIENTS`.
* - null int: if the client handle is not a registered one.
*/
I handle_to_index(const rd_kafka_t *handle);

//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Interface //
//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//

//%% Create/Delete %%//vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv/

/**
* @brief Destroy client handle and remove from `CLIENTS`.
* @param client_idx: Index of client in `CLIENTS`.
*/
EXP K delete_client(K client_idx);

// __KAFKAKDB_CIENT_H__
#endif
34 changes: 34 additions & 0 deletions clib/include/kafkakdb_topic.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#ifndef __KAFKAKDB_TOPIC_H__
#define __KAFKAKDB_TOPIC_H__

//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Load Libraries //
//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//

#include <rdkafka.h>
#include "k.h"

//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Global Variables //
//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//

/**
* @brief Topic names expressed in symbol list
*/
extern K TOPICS;

//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//
// Private Functions //
//+++++++++++++++++++++++++++++++++++++++++++++++++++++++//

/**
* @brief Retrieve topic object by topic index
* @param index: Index of topic
* @return
* - symbol: Topic
* - error if index is out of range or topic for the index is null
*/
rd_kafka_topic_t *index_to_topic_handle(K topic_idx);

// __KAFKAKDB_TOPIC_H__
#endif
Loading