Skip to content
This repository has been archived by the owner on Sep 27, 2019. It is now read-only.

Sequences #1407

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
172 changes: 172 additions & 0 deletions script/testing/junit/SequenceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
//===----------------------------------------------------------------------===//
//
// Peloton
//
// SequenceTest.java
//
// Identification: script/testing/junit/SequenceTest.java
//
// Copyright (c) 2015-2018, Carnegie Mellon University Database Group
//
//===----------------------------------------------------------------------===//

import java.sql.*;
import org.junit.*;
import org.postgresql.util.PSQLException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static junit.framework.TestCase.fail;

public class SequenceTest extends PLTestBase {
private Connection conn1;
private Connection conn2;

private static final String SQL_DROP_SEQ =
"DROP SEQUENCE seq;";

private static final String SQL_CREATE_SEQ =
"CREATE SEQUENCE seq;";

private static final String SQL_NEXTVAL =
"SELECT NEXTVAL('seq')";

private static final String SQL_CURRVAL =
"SELECT CURRVAL('seq')";

/**
* Test sequence functions for single-statment transactions
*/
@Test
public void test_SingleStmtTxn() throws SQLException {
conn1 = makeDefaultConnection();
conn1.setAutoCommit(true);
Statement stmt1 = conn1.createStatement();

conn2 = makeDefaultConnection();
conn2.setAutoCommit(true);
Statement stmt2 = conn2.createStatement();

// Create a sequence
stmt1.execute(SQL_CREATE_SEQ);

// Check the sequence is visible by others
try {
stmt2.execute(SQL_CREATE_SEQ);
fail();
} catch (PSQLException e) { }

// Check currval cannot be called before nextval
try {
stmt1.execute(SQL_CURRVAL);
fail();
} catch (PSQLException e) { }

// Check functionality with conn1
stmt1.execute(SQL_NEXTVAL);
ResultSet res1 = stmt1.executeQuery(SQL_CURRVAL);
res1.next();
assertEquals(1, res1.getInt(1));
assertNoMoreRows(res1);

// Update should be visible to conn2
stmt2.execute(SQL_NEXTVAL);
ResultSet res2 = stmt2.executeQuery(SQL_CURRVAL);
res2.next();
assertEquals(2, res2.getInt(1));
assertNoMoreRows(res2);

// Currval should be session consistent
res1 = stmt1.executeQuery(SQL_CURRVAL);
res1.next();
assertEquals(1, res1.getInt(1));
assertNoMoreRows(res1);

// Clean up
stmt1.close();
conn1.close();
stmt2.close();
conn2.close();
}

/**
* Test sequence functions for multi-statment transactions
*/
@Test
public void test_MultiStmtTxn() throws SQLException {
conn1 = makeDefaultConnection();
conn1.setAutoCommit(false);
Statement stmt1 = conn1.createStatement();

conn2 = makeDefaultConnection();
conn2.setAutoCommit(false);
Statement stmt2 = conn2.createStatement();

// Check functionality with conn1
stmt1.execute(SQL_NEXTVAL);
ResultSet res1 = stmt1.executeQuery(SQL_CURRVAL);
res1.next();
assertEquals(3, res1.getInt(1));
assertNoMoreRows(res1);

// Update should be visible to conn2
stmt2.execute(SQL_NEXTVAL);
ResultSet res2 = stmt2.executeQuery(SQL_CURRVAL);
res2.next();
assertEquals(4, res2.getInt(1));
assertNoMoreRows(res2);

// Rollback transactions
conn1.rollback();
conn2.rollback();

// Check sequence incremental will not rollback
conn1.setAutoCommit(true);
stmt1.execute(SQL_NEXTVAL);
res1 = stmt1.executeQuery(SQL_CURRVAL);
res1.next();
assertEquals(5, res1.getInt(1));
assertNoMoreRows(res1);

// Clean up
stmt1.close();
conn1.close();
stmt2.close();
conn2.close();
}

/**
* Test dropping sequence
*/
@Test
public void test_Drop_Seq() throws SQLException {
conn1 = makeDefaultConnection();
conn1.setAutoCommit(true);
Statement stmt1 = conn1.createStatement();

conn2 = makeDefaultConnection();
conn2.setAutoCommit(true);
Statement stmt2 = conn2.createStatement();

// Drop the sequence
stmt1.execute(SQL_DROP_SEQ);

// Check the sequence is invisible to all conns
try {
stmt1.execute(SQL_CURRVAL);
fail();
} catch (PSQLException e) { }
try {
stmt2.execute(SQL_CURRVAL);
fail();
} catch (PSQLException e) { }

// Check the same sequence can be created w/o exception
stmt2.execute(SQL_CREATE_SEQ);

// Clean up
stmt1.close();
conn1.close();
stmt2.close();
conn2.close();
}
}
76 changes: 21 additions & 55 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
#include "function/date_functions.h"
#include "function/numeric_functions.h"
#include "function/old_engine_string_functions.h"
#include "function/string_functions.h"
#include "function/sequence_functions.h"
#include "function/timestamp_functions.h"
#include "index/index_factory.h"
#include "settings/settings_manager.h"
Expand All @@ -42,18 +44,11 @@
namespace peloton {
namespace catalog {

// Get instance of the global catalog
Catalog *Catalog::GetInstance() {
static Catalog global_catalog;
return &global_catalog;
}

/* Initialization of catalog, including:
* 1) create peloton database, create catalog tables, add them into
* peloton database, insert columns into pg_attribute
* 2) create necessary indexes, insert into pg_index
* 3) insert peloton into pg_database, catalog tables into pg_table
*/
Catalog::Catalog() : pool_(new type::EphemeralPool()) {
// Begin transaction for catalog initialization
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
Expand All @@ -76,13 +71,6 @@ Catalog::Catalog() : pool_(new type::EphemeralPool()) {
txn_manager.CommitTransaction(txn);
}

/*@brief This function *MUST* be called after a new database is created to
* bootstrap all system catalog tables for that database. The system catalog
* tables must be created in certain order to make sure all tuples are indexed
*
* @param database database which this system catalogs belong to
* @param txn transaction context
*/
void Catalog::BootstrapSystemCatalogs(storage::Database *database,
concurrency::TransactionContext *txn) {
oid_t database_oid = database->GetOid();
Expand Down Expand Up @@ -254,6 +242,10 @@ ResultType Catalog::CreateDatabase(const std::string &database_name,
catalog_map_[database_oid]->Bootstrap(database_name, txn);
LOG_TRACE("Database %s created. Returning RESULT_SUCCESS.",
database_name.c_str());

// Then create the default namespace for our new database
// CreateSchema(database_name, DEFAULT_SCHEMA_NAME, txn);

return ResultType::SUCCESS;
}

Expand Down Expand Up @@ -281,7 +273,7 @@ ResultType Catalog::CreateSchema(const std::string &database_name,
catalog_map_[database_object->GetDatabaseOid()]->GetSchemaCatalog();
auto schema_object = pg_namespace->GetSchemaObject(schema_name, txn);
if (schema_object != nullptr)
throw CatalogException("Schema(namespace) " + schema_name +
throw CatalogException("Namespace " + schema_name +
" already exists");
// Since there isn't physical class corresponds to schema(namespace), the only
// thing needs to be done is inserting record into pg_namespace
Expand Down Expand Up @@ -872,10 +864,6 @@ ResultType Catalog::DropLayout(oid_t database_oid, oid_t table_oid,
// GET WITH NAME - CHECK FROM CATALOG TABLES, USING TRANSACTION
//===--------------------------------------------------------------------===//

/* Check database from pg_database with database_name using txn,
* get it from storage layer using database_oid,
* throw exception and abort txn if not exists/invisible
* */
storage::Database *Catalog::GetDatabaseWithName(
const std::string &database_name,
concurrency::TransactionContext *txn) const {
Expand All @@ -893,10 +881,6 @@ storage::Database *Catalog::GetDatabaseWithName(
return storage_manager->GetDatabaseWithOid(database_object->GetDatabaseOid());
}

/* Check table from pg_table with table_name & schema_name using txn,
* get it from storage layer using table_oid,
* throw exception and abort txn if not exists/invisible
* */
storage::DataTable *Catalog::GetTableWithName(
const std::string &database_name, const std::string &schema_name,
const std::string &table_name, concurrency::TransactionContext *txn) {
Expand Down Expand Up @@ -959,10 +943,6 @@ std::shared_ptr<DatabaseCatalogObject> Catalog::GetDatabaseObject(
return database_object;
}

/* Check table from pg_table with table_name & schema_name using txn,
* get it from storage layer using table_oid,
* throw exception and abort txn if not exists/invisible
* */
std::shared_ptr<TableCatalogObject> Catalog::GetTableObject(
const std::string &database_name, const std::string &schema_name,
const std::string &table_name, concurrency::TransactionContext *txn) {
Expand Down Expand Up @@ -1034,24 +1014,6 @@ std::shared_ptr<SystemCatalogs> Catalog::GetSystemCatalogs(
return catalog_map_[database_oid];
}

//===--------------------------------------------------------------------===//
// DEPRECATED
//===--------------------------------------------------------------------===//

// This should be deprecated! this can screw up the database oid system
void Catalog::AddDatabase(storage::Database *database) {
std::lock_guard<std::mutex> lock(catalog_mutex);
auto storage_manager = storage::StorageManager::GetInstance();
storage_manager->AddDatabaseToStorageManager(database);
auto &txn_manager = concurrency::TransactionManagerFactory::GetInstance();
auto txn = txn_manager.BeginTransaction();
BootstrapSystemCatalogs(database, txn);
DatabaseCatalog::GetInstance()->InsertDatabase(
database->GetOid(), database->GetDBName(), pool_.get(),
txn); // I guess this can pass tests
txn_manager.CommitTransaction(txn);
}

//===--------------------------------------------------------------------===//
// HELPERS
//===--------------------------------------------------------------------===//
Expand All @@ -1064,16 +1026,6 @@ Catalog::~Catalog() {
// FUNCTION
//===--------------------------------------------------------------------===//

/* @brief
* Add a new built-in function. This proceeds in two steps:
* 1. Add the function information into pg_catalog.pg_proc
* 2. Register the function pointer in function::BuiltinFunction
* @param name & argument_types function name and arg types used in SQL
* @param return_type the return type
* @param prolang the oid of which language the function is
* @param func_name the function name in C++ source (should be unique)
* @param func_ptr the pointer to the function
*/
void Catalog::AddBuiltinFunction(
const std::string &name, const std::vector<type::TypeId> &argument_types,
const type::TypeId return_type, oid_t prolang, const std::string &func_name,
Expand Down Expand Up @@ -1308,6 +1260,20 @@ void Catalog::InitializeFunctions() {
function::BuiltInFuncType{OperatorId::Like,
function::OldEngineStringFunctions::Like},
txn);
// Sequence
AddBuiltinFunction(
"nextval", {type::TypeId::VARCHAR}, type::TypeId::INTEGER,
internal_lang, "Nextval",
function::BuiltInFuncType{OperatorId::Nextval,
function::SequenceFunctions::_Nextval},
txn);
AddBuiltinFunction(
"currval", {type::TypeId::VARCHAR}, type::TypeId::INTEGER,
internal_lang, "Currval",
function::BuiltInFuncType{OperatorId::Currval,
function::SequenceFunctions::_Currval},
txn);


/**
* decimal functions
Expand Down
Loading