Skip to content

Commit

Permalink
Merge pull request #6 from venkateshragi/cas-support-cql
Browse files Browse the repository at this point in the history
Added support to mention table properties while creating table support.
  • Loading branch information
Rohit Rai committed Aug 29, 2013
2 parents fee9a26 + 8c79317 commit 7c4dbe1
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
import org.apache.hadoop.hive.cassandra.CassandraException;
import org.apache.hadoop.hive.cassandra.CassandraProxyClient;
import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe;
import org.apache.hadoop.hive.cassandra.serde.cql.AbstractCqlSerDe;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.api.Constants;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

Expand All @@ -19,6 +22,7 @@
public class CqlManager {
final static public int DEFAULT_REPLICATION_FACTOR = 1;
final static public String DEFAULT_STRATEGY = "org.apache.cassandra.locator.SimpleStrategy";
private static final Logger logger = LoggerFactory.getLogger(CqlManager.class);

final static Map<String, String> hiveTypeToCqlType = new HashMap<String, String>();

Expand Down Expand Up @@ -107,65 +111,63 @@ public void closeConnection() {
}
}

/**
* Get CfDef based on the configuration in the table.
*/
private CfDef getCfDef() throws MetaException {
CfDef cf = new CfDef();
cf.setKeyspace(keyspace);
cf.setName(columnFamilyName);

cf.setColumn_type(getColumnType());

return cf;
}

/**
* Create a keyspace with columns defined in the table.
*/
public KsDef createKeyspaceWithColumns()
throws MetaException {
try {
KsDef ks = new KsDef();
ks.setName(getCassandraKeyspace());
ks.setStrategy_class(getStrategy());

if (!ks.isSetStrategy_options())
ks.setStrategy_options(new HashMap<String, String>());

ks.putToStrategy_options("replication_factor", Integer.toString(getReplicationFactor()));

ks.addToCf_defs(getCfDef());
public boolean doesKeyspaceExist() throws MetaException {
String getKeyspaceQuery = "select * from system.schema_keyspaces where keyspace_name='%s'";
try {
CqlResult result = cch.getClient().execute_cql3_query(ByteBufferUtil.bytes(String.format(getKeyspaceQuery, keyspace)), Compression.NONE, ConsistencyLevel.ONE);
List<CqlRow> rows = result.getRows();
//there can be only be one keyspace with the given name or no keyspace at all
assert rows.size() <= 1;
return rows.size() == 1;
} catch (InvalidRequestException e) {
throw new MetaException("Unable to create keyspace " + keyspace + ". Error:" + e.getWhy());
} catch (Exception e) {
throw new MetaException("Unable to create keyspace " + keyspace + ". Error:" + e.getMessage());
}
}

cch.getClient().system_add_keyspace(ks);
cch.getClient().set_keyspace(keyspace);
return ks;
} catch (TException e) {
throw new MetaException("Unable to create key space '" + keyspace + "'. Error:"
+ e.getMessage());
} catch (InvalidRequestException e) {
throw new MetaException("Unable to create key space '" + keyspace + "'. Error:"
+ e.getMessage());
} catch (SchemaDisagreementException e) {
throw new MetaException("Unable to create key space '" + keyspace + "'. Error:"
+ e.getMessage());
public void createKeyspace() throws MetaException {
String createKeyspaceQuery = "create keyspace %s WITH replication = { 'class' : %s, %s } AND durable_writes = %s";
String durableWrites = getPropertyFromTable(AbstractCqlSerDe.DURABLE_WRITES);
if(durableWrites == null){
durableWrites = "true";
}
String strategy = "'" + getStrategy() + "'";
String query = String.format(createKeyspaceQuery, keyspace, strategy, getStrategyOptions(), durableWrites);
try {
cch.getClient().execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
} catch (InvalidRequestException e) {
throw new MetaException("Unable to create keyspace '" + keyspace + "'. Error:" + e.getWhy());
} catch (Exception e) {
throw new MetaException("Unable to create keyspace '" + keyspace + "'. Error:" + e.getMessage());
}
}

}
public String getStrategyOptions() throws MetaException {
String replicationFactor = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_KEYSPACE_REPFACTOR);
String strategyOptions = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_KEYSPACE_STRATEGY_OPTIONS);
if(replicationFactor != null) {
if(strategyOptions != null){
throw new MetaException("Unable to create keyspace '" + keyspace + "' Specify only one of 'cassandra.ks.repfactor' or 'cassandra.ks.stratOptions'");
}
return "'replication_factor' : " + replicationFactor;
}
if(strategyOptions == null) {
throw new MetaException("Unable to create keyspace '" + keyspace + "' Specify either 'cassandra.ks.repfactor' or 'cassandra.ks.stratOptions'");
}
return strategyOptions;
}

/**
* Create the column family if it doesn't exist.
*
* @return
* @throws MetaException
*/
public boolean createCFIfNotFound() throws MetaException {
public void createCFIfNotFound() throws MetaException {
boolean cfExists = checkColumnFamily();

if (!cfExists) {
return (createColumnFamily() != null);
} else {
return cfExists;
createColumnFamily();
}
}

Expand Down Expand Up @@ -211,8 +213,7 @@ private boolean checkColumnFamily() throws MetaException {
/**
* Create column family based on the configuration in the table.
*/
public CfDef createColumnFamily() throws MetaException {
CfDef cf = getCfDef();
public void createColumnFamily() throws MetaException {
try {
cch.getClient().set_keyspace(keyspace);
Properties properties = MetaStoreUtils.getSchema(tbl);
Expand All @@ -237,20 +238,42 @@ public CfDef createColumnFamily() throws MetaException {
queryBuilder.append(hiveTypeToCqlType.get(columnTypes[i]));
queryBuilder.append(",");
}
String keyStr = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_COLUMN_FAMILY_PRIMARY_KEY);
if(keyStr == null || keyStr.isEmpty()) {
keyStr = columnNames[0];
}
queryBuilder.append(" primary key (");
//todo how do we specify composite keys ?
queryBuilder.append(columnNames[0]);
queryBuilder.append(keyStr);
queryBuilder.append(")");
queryBuilder.append(")");

Map<String, String> options = constructTableOptions();
if(!options.isEmpty()){
queryBuilder.append(" WITH ");
Iterator<Map.Entry<String, String>> optionIterator = options.entrySet().iterator();
while (optionIterator.hasNext()){
Map.Entry<String, String> entry = optionIterator.next();

queryBuilder.append(entry.getKey());

queryBuilder.append(" = ");

queryBuilder.append(entry.getValue());


if(optionIterator.hasNext()){
queryBuilder.append(" AND ");
}
}
}

cch.getClient().execute_cql3_query(ByteBufferUtil.bytes(queryBuilder.toString()), Compression.NONE, ConsistencyLevel.ONE);
return cf;
} catch (TException e) {
throw new MetaException("Unable to create column family '" + columnFamilyName + "'. Error:"
+ e.getMessage());
} catch (InvalidRequestException e) {
throw new MetaException("Unable to create column family '" + columnFamilyName + "'. Error:"
+ e.getMessage());
+ e.getWhy());
} catch (SchemaDisagreementException e) {
throw new MetaException("Unable to create column family '" + columnFamilyName + "'. Error:"
+ e.getMessage());
Expand All @@ -263,56 +286,31 @@ public CfDef createColumnFamily() throws MetaException {
}

}

private String getColumnType() throws MetaException {
String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_COL_MAPPING);
List<String> mapping;
if (prop != null) {
mapping = AbstractColumnSerDe.parseColumnMapping(prop);
} else {
List<FieldSchema> schema = tbl.getSd().getCols();
if (schema.size() == 0) {
throw new MetaException("Can't find table column definitions");
}

String[] colNames = new String[schema.size()];
for (int i = 0; i < schema.size(); i++) {
colNames[i] = schema.get(i).getName();
}

String mappingStr = AbstractColumnSerDe.createColumnMappingString(colNames);
mapping = Arrays.asList(mappingStr.split(","));
private Map<String, String> constructTableOptions(){

Map<String, String> options = new HashMap<String, String>();
addIfNotEmpty(AbstractCqlSerDe.COLUMN_FAMILY_COMMENT, options, true);
addIfNotEmpty(AbstractCqlSerDe.READ_REPAIR_CHANCE, options, false);
addIfNotEmpty(AbstractCqlSerDe.DCLOCAL_READ_REPAIR_CHANCE, options, false);
addIfNotEmpty(AbstractCqlSerDe.GC_GRACE_SECONDS, options, false);
addIfNotEmpty(AbstractCqlSerDe.BLOOM_FILTER_FP_CHANCE, options, false);
addIfNotEmpty(AbstractCqlSerDe.COMPACTION, options, false);
addIfNotEmpty(AbstractCqlSerDe.COMPRESSION, options, false);
addIfNotEmpty(AbstractCqlSerDe.REPLICATE_ON_WRITE, options, false);
addIfNotEmpty(AbstractCqlSerDe.CACHING, options, false);

return options;
}

boolean hasKey = false;
boolean hasColumn = false;
boolean hasValue = false;
boolean hasSubColumn = false;

for (String column : mapping) {
if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_KEY_COLUMN)) {
hasKey = true;
} else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_COLUMN_COLUMN)) {
hasColumn = true;
} else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_SUBCOLUMN_COLUMN)) {
hasSubColumn = true;
} else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_VALUE_COLUMN)) {
hasValue = true;
} else {
return "Standard";
}
}

if (hasKey && hasColumn && hasValue) {
if (hasSubColumn) {
return "Super";
} else {
return "Standard";
}
} else {
return "Standard";
private void addIfNotEmpty(String property, Map<String, String> options, boolean wrapQuotes){
String temp = getPropertyFromTable(property);
if(wrapQuotes) {
temp = "'" + temp + "'";
}
if(temp != null && !temp.isEmpty()){
options.put(property, temp);
}
}
}

/**
* Get replication factor from the table property.
Expand All @@ -339,7 +337,7 @@ private int getReplicationFactor() throws MetaException {
* @return strategy
*/
private String getStrategy() {
String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_STRATEGY);
String prop = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_KEYSPACE_STRATEGY);
if (prop == null) {
return DEFAULT_STRATEGY;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ public void preCreateTable(Table table) throws MetaException {
try {
//open connection to cassandra
manager.openConnection();
if(!manager.doesKeyspaceExist()){
logger.info("Keyspace doesnot exist. Creating keyspace {}", table.getDbName());
manager.createKeyspace();
}
//create the column family if it doesn't exist.
manager.createCFIfNotFound();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public abstract class AbstractCqlSerDe implements SerDe {
public static final String CASSANDRA_KEYSPACE_NAME = "cassandra.ks.name"; // keyspace
public static final String CASSANDRA_KEYSPACE_REPFACTOR = "cassandra.ks.repfactor"; //keyspace replication factor
public static final String CASSANDRA_KEYSPACE_STRATEGY = "cassandra.ks.strategy"; //keyspace replica placement strategy
public static final String CASSANDRA_KEYSPACE_STRATEGY_OPTIONS = "cassandra.ks.stratOptions";
public static final String DURABLE_WRITES = "durable.writes";

public static final String CASSANDRA_CF_NAME = "cassandra.cf.name"; // column family
public static final String CASSANDRA_CF_COUNTERS = "cassandra.cf.counters"; // flag this as a counter CF
Expand All @@ -56,10 +58,16 @@ public abstract class AbstractCqlSerDe implements SerDe {
public static final String CASSANDRA_SLICE_PREDICATE_RANGE_COUNT = "cassandra.slice.predicate.range.count";
public static final String CASSANDRA_ENABLE_WIDEROW_ITERATOR = "cassandra.enable.widerow.iterator";

public static final String CASSANDRA_SPECIAL_COLUMN_KEY = "row_key";
public static final String CASSANDRA_SPECIAL_COLUMN_COL = "column_name";
public static final String CASSANDRA_SPECIAL_COLUMN_SCOL = "sub_column_name";
public static final String CASSANDRA_SPECIAL_COLUMN_VAL = "value";
public static final String CASSANDRA_COLUMN_FAMILY_PRIMARY_KEY = "cql.primarykey";
public static final String COLUMN_FAMILY_COMMENT = "comment";
public static final String READ_REPAIR_CHANCE = "read_repair_chance";
public static final String DCLOCAL_READ_REPAIR_CHANCE = "dclocal_read_repair_chance";
public static final String GC_GRACE_SECONDS = "gc_grace_seconds";
public static final String BLOOM_FILTER_FP_CHANCE = "bloom_filter_fp_chance";
public static final String COMPACTION = "compaction";
public static final String COMPRESSION = "compression";
public static final String REPLICATE_ON_WRITE = "replicate_on_write";
public static final String CACHING = "caching";

public static final String CASSANDRA_KEY_COLUMN = ":key";
public static final String CASSANDRA_COLUMN_COLUMN = ":column";
Expand Down

0 comments on commit 7c4dbe1

Please sign in to comment.