Skip to content

Commit

Permalink
Remove comments, more delay for test
Browse files Browse the repository at this point in the history
Signed-off-by: Nijat K <[email protected]>
  • Loading branch information
NeejWeej committed Nov 29, 2024
1 parent 628ceb6 commit 4d64618
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 114 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ build-debug: ## build the library ( DEBUG ) - May need a make clean when switch
SKBUILD_CONFIGURE_OPTIONS="" DEBUG=1 python setup.py build build_ext --inplace

build-conda: ## build the library in Conda
CSP_USE_CCACHE=0 python setup.py build build_ext --csp-no-vcpkg --inplace
python setup.py build build_ext --csp-no-vcpkg --inplace

install: ## install library
python -m pip install .
Expand Down Expand Up @@ -83,7 +83,7 @@ checks: check

TEST_ARGS :=
test-py: ## Clean and Make unit tests
python -m pytest -vv -s csp/tests --junitxml=junit.xml $(TEST_ARGS)
python -m pytest -v csp/tests --junitxml=junit.xml $(TEST_ARGS)

test-cpp: ## Make C++ unit tests
ifneq ($(OS),Windows_NT)
Expand Down
2 changes: 1 addition & 1 deletion cpp/csp/engine/StatusAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class StatusAdapter : public PushInputAdapter
m_statusAccess.meta = meta;
m_statusAccess.level = meta -> getMetaField<int64_t>( "level", "Status" );
m_statusAccess.statusCode = meta -> getMetaField<int64_t>( "status_code", "Status" );
m_statusAccess.msg = meta -> getMetaField<typename csp::StringStructField::CType>( "msg", "Status" );
m_statusAccess.msg = meta -> getMetaField<std::string>( "msg", "Status" );
}

void pushStatus( int64_t level, int64_t statusCode, const std::string & msg, PushBatch *batch = nullptr )
Expand Down
99 changes: 18 additions & 81 deletions cpp/csp/engine/Struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <string>
#include <vector>
#include <unordered_map>
#include <iostream>

namespace csp
{
Expand Down Expand Up @@ -669,69 +668,16 @@ class StructMeta : public std::enable_shared_from_this<StructMeta>
};

template<typename T>
std::shared_ptr<typename StructField::upcast<T>::type> StructMeta::getMetaField(const char* fieldname, const char* expectedtype) {
std::cout << "\n=== getMetaField Debug ===\n";
std::cout << "1. Looking for field: " << fieldname << "\n";

auto field_ = field(fieldname);
if(!field_) {
std::cout << "2. Field not found!\n";
CSP_THROW(TypeError, "Struct type " << name() << " missing required field " << fieldname);
}

std::cout << "2. Field found\n";
std::cout << "3. Field name from object: " << field_->fieldname() << "\n";
std::cout << "4. Field type from CspType: " << field_->type()->type() << "\n";
std::cout << "5. Expected type: " << CspType::Type::fromCType<T>::type << "\n";

// Memory layout & pointer checks
const StructField* field_ptr = field_.get();
std::cout << "6. Field ptr value: " << field_ptr << "\n";
std::cout << "7. Field use count: " << field_.use_count() << "\n";

// Detailed field information
if(field_ptr) {
std::cout << "8. Field metadata:\n";
std::cout << " - Field offset: " << field_ptr->offset() << "\n";
std::cout << " - Field size: " << field_ptr->size() << "\n";
std::cout << " - Field alignment: " << field_ptr->alignment() << "\n";
std::cout << " - Field mask offset: " << field_ptr->maskOffset() << "\n";
std::cout << " - Field mask bit: " << static_cast<int>(field_ptr->maskBit()) << "\n";

// Type verification
std::cout << "9. Type checks:\n";
std::cout << " - Original type: " << typeid(field_).name() << "\n";
std::cout << " - Target type: " << typeid(typename StructField::upcast<T>::type).name() << "\n";
std::cout << " - Is native: " << field_ptr->isNative() << "\n";

// Test various casts
std::cout << "10. Detailed cast tests:\n";
std::cout << " Base classes:\n";
std::cout << " - As StructField*: " << (dynamic_cast<const StructField*>(field_ptr) != nullptr) << "\n";
std::cout << " - As NonNativeStructField*: " << (dynamic_cast<const NonNativeStructField*>(field_ptr) != nullptr) << "\n";
std::cout << " Non-native implementations:\n";
std::cout << " - As StringStructField*: " << (dynamic_cast<const StringStructField*>(field_ptr) != nullptr) << "\n";
std::cout << " - As DialectGenericStructField*: " << (dynamic_cast<const DialectGenericStructField*>(field_ptr) != nullptr) << "\n";
std::cout << " - As ArrayStructField<std::string>*: " << (dynamic_cast<const ArrayStructField<std::vector<std::string>>*>(field_ptr) != nullptr) << "\n";
std::cout << " Native field test:\n";
std::cout << " - As NativeStructField<int64_t>*: " << (dynamic_cast<const NativeStructField<int64_t>*>(field_ptr) != nullptr) << "\n";
}

using TargetType = typename StructField::upcast<T>::type;
auto typedfield = std::dynamic_pointer_cast<TargetType>(field_);
std::cout << "11. Final dynamic_cast result: " << (typedfield ? "success" : "failure") << "\n";
std::shared_ptr<typename StructField::upcast<T>::type> StructMeta::getMetaField( const char * fieldname, const char * expectedtype )
{
auto field_ = field( fieldname );
if( !field_ )
CSP_THROW( TypeError, "Struct type " << name() << " missing required field " << fieldname << " for " << expectedtype );

if(!typedfield) {
std::cout << "12. FAILED CAST DETAILS:\n";
std::cout << " - Source type: " << typeid(StructField).name() << "\n";
std::cout << " - Target type: " << typeid(TargetType).name() << "\n";

CSP_THROW(TypeError, expectedtype << " - provided struct type " << name()
<< " expected type " << CspType::Type::fromCType<T>::type
<< " for field " << fieldname
<< " but got type " << field_->type()->type()
<< " for " << expectedtype);
}
std::shared_ptr<typename StructField::upcast<T>::type> typedfield = std::dynamic_pointer_cast<typename StructField::upcast<T>::type>( field_ );
if( !typedfield )
CSP_THROW( TypeError, expectedtype << " - provided struct type " << name() << " expected type " << CspType::Type::fromCType<T>::type << " for field " << fieldname
<< " but got type " << field_ -> type() -> type() << " for " << expectedtype );

return typedfield;
}
Expand Down Expand Up @@ -827,31 +773,22 @@ class Struct
friend class StructMeta;

//Note these members are not included on size(), they're stored before "this" ptr ( see operator new / delete )
struct alignas(8) HiddenData {
alignas(8) size_t refcount; // 8 bytes at 0x0
alignas(8) std::shared_ptr<const StructMeta> meta; // 16 bytes at 0x8
alignas(8) void* dialectPtr; // 8 bytes at 0x18
// Total: 32 bytes
struct HiddenData
{
size_t refcount;
std::shared_ptr<const StructMeta> meta;
void * dialectPtr;
};

const HiddenData * hidden() const
{
return const_cast<Struct *>( this ) -> hidden();
}

static constexpr size_t HIDDEN_OFFSET = 32; // sizeof(HiddenData) aligned to 8 bytes

HiddenData* hidden() {
std::byte* base = reinterpret_cast<std::byte*>(this);
// Force alignment to match shared_ptr requirements
static_assert(alignof(HiddenData) >= alignof(std::shared_ptr<void>),
"HiddenData must be aligned for shared_ptr");
return reinterpret_cast<HiddenData*>(base - HIDDEN_OFFSET);
}
// HiddenData * hidden()
// {
// return reinterpret_cast<HiddenData *>( reinterpret_cast<uint8_t *>( this ) - sizeof( HiddenData ) );
// }
HiddenData * hidden()
{
return reinterpret_cast<HiddenData *>( reinterpret_cast<uint8_t *>( this ) - sizeof( HiddenData ) );
}

//actual data is allocated past this point
};
Expand Down
16 changes: 0 additions & 16 deletions cpp/csp/python/adapters/websocketadapterimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <csp/python/PyInputAdapterWrapper.h>
#include <csp/python/PyOutputAdapterWrapper.h>

#include <iostream>
using namespace csp::adapters::websocket;

namespace csp::python
Expand Down Expand Up @@ -63,30 +62,15 @@ static OutputAdapter * create_websocket_header_update_adapter( csp::AdapterManag

static OutputAdapter * create_websocket_connection_request_adapter( csp::AdapterManager * manager, PyEngine * pyengine, PyObject * args )
{
// std::cout << "hereeeee33ee" << "\n";
PyObject * pyProperties;
// PyObject * type;
auto * websocketManager = dynamic_cast<ClientAdapterManager*>( manager );
if( !websocketManager )
CSP_THROW( TypeError, "Expected WebsocketClientAdapterManager" );

if( !PyArg_ParseTuple( args, "O!",
&PyDict_Type, &pyProperties ) )
CSP_THROW( PythonPassthrough, "" );
// std::cout << "hereeeee334444ee" << "\n";
return websocketManager -> getConnectionRequestAdapter( fromPython<Dictionary>( pyProperties ) );


// TODO
// Here I think we should have a websocket connection manager
// that will handle the connections and endpoint management
// It will create the connection request output adapter
// That output adapter, when it ticks, with a list of python Dictionary
// will then use the boost beast 'post' function to schedule, on the
// io context, a callback to process that dict (on the websocket connection manager!!!) and handle the endpoint manipulation appropriately

// that websocket connection manager will run the thread with the io context
// being run. Move it away from clientAdapterManager
}

REGISTER_ADAPTER_MANAGER( _websocket_adapter_manager, create_websocket_adapter_manager );
Expand Down
21 changes: 7 additions & 14 deletions csp/tests/adapters/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def create_tornado_server(port: int = None):
def run_io_loop():
nonlocal io_loop, app
io_loop = tornado.ioloop.IOLoop()
io_loop.make_current()
app = tornado.web.Application([(r"/", EchoWebsocketHandler)])
try:
app.listen(port)
Expand Down Expand Up @@ -301,9 +300,9 @@ def g():
)
]
)
val = csp.curve(int, [(timedelta(milliseconds=50), 0), (timedelta(milliseconds=500), 1)])
val = csp.curve(int, [(timedelta(milliseconds=100), 0), (timedelta(milliseconds=500), 1)])
hello = csp.apply(val, lambda x: f"hi world{x}", str)
delayed_conn_req = csp.delay(conn_request, delay=timedelta(milliseconds=100))
delayed_conn_req = csp.delay(conn_request, delay=timedelta(milliseconds=300))

# We connect immediately and send out the hello message
ws.send(hello, connection_request=conn_request)
Expand All @@ -328,12 +327,6 @@ def g():

@pytest.mark.parametrize("reconnect_immeditately", [False, True])
def test_dynamic_disconnect_connect_pruned_subscribe(self, reconnect_immeditately):
@csp.node
def prevent_prune(objs: ts[str]):
if csp.ticked(objs):
# Does nothing but makes sure it's not pruned
...

@csp.graph
def g():
ws = WebsocketAdapterManager(dynamic=True)
Expand All @@ -354,7 +347,7 @@ def g():
disconnect_reqs,
),
(
timedelta(milliseconds=350),
timedelta(milliseconds=700),
[
ConnectionRequest(
uri=f"ws://localhost:{self.port}/",
Expand All @@ -365,7 +358,7 @@ def g():
],
)
const_conn_request = csp.const([ConnectionRequest(uri=f"ws://localhost:{self.port}/")])
val = csp.curve(int, [(timedelta(milliseconds=100, microseconds=1), 0), (timedelta(milliseconds=500), 1)])
val = csp.curve(int, [(timedelta(milliseconds=300, microseconds=1), 0), (timedelta(milliseconds=900), 1)])
hello = csp.apply(val, lambda x: f"hi world{x}", str)

# We connect immediately and send out the hello message
Expand All @@ -382,7 +375,7 @@ def g():
recv4 = ws.subscribe(
str,
RawTextMessageMapper(),
connection_request=csp.const([no_persist_conn], delay=timedelta(milliseconds=250)),
connection_request=csp.const([no_persist_conn], delay=timedelta(milliseconds=500)),
)

csp.add_graph_output("recv", recv)
Expand All @@ -391,7 +384,7 @@ def g():
end = csp.filter(csp.count(recv3) == 3, recv3)
csp.stop_engine(end)

msgs = csp.run(g, starttime=datetime.now(pytz.UTC), endtime=timedelta(seconds=1), realtime=True)
msgs = csp.run(g, starttime=datetime.now(pytz.UTC), endtime=timedelta(seconds=2), realtime=True)
# Did not persist, so did not receive any messages
assert len(msgs["recv4"]) == 0
# Only the second message is received, since we disonnect before the first one is sent
Expand Down Expand Up @@ -431,7 +424,7 @@ def g():
)
val = csp.curve(int, [(timedelta(milliseconds=50), 0), (timedelta(milliseconds=500), 1)])
hello = csp.apply(val, lambda x: f"hi world{x}", str)
delayed_conn_req = csp.delay(conn_request, delay=timedelta(milliseconds=100))
delayed_conn_req = csp.delay(conn_request, delay=timedelta(milliseconds=200))

# We connect immediately and send out the hello message
ws.send(hello, connection_request=conn_request)
Expand Down

0 comments on commit 4d64618

Please sign in to comment.