Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pre-collected BucketRegion stats instead of region iteration in stats service #1404

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import com.gemstone.gemfire.internal.cache._
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType
import com.gemstone.gemfire.internal.i18n.LocalizedStrings
import com.pivotal.gemfirexd.internal.engine.Misc

import org.apache.spark.Logging
import org.apache.spark.sql.execution.columnar.impl.ColumnFormatRelation


class SnappyStorageEvictor extends Logging {
Expand Down Expand Up @@ -126,12 +124,11 @@ class SnappyStorageEvictor extends Logging {
offHeap: Boolean, hasOffHeap: Boolean): Boolean = {
val hasLRU = (region.getEvictionAttributes.getAlgorithm.isLRUHeap
&& (region.getDataStore != null)
&& !region.getAttributes.getEnableOffHeapMemory && !region.isRowBuffer())
&& !region.getAttributes.getEnableOffHeapMemory && !region.isRowBuffer)
if (hasOffHeap) {
// when off-heap is enabled then all column tables use off-heap
val regionPath = Misc.getFullTableNameFromRegionPath(region.getFullPath)
if (offHeap) hasLRU && ColumnFormatRelation.isColumnTable(regionPath)
else hasLRU && !ColumnFormatRelation.isColumnTable(regionPath)
if (offHeap) hasLRU && region.isInternalColumnTable
else hasLRU && !region.isInternalColumnTable
} else {
assert(!offHeap,
"unexpected invocation for hasOffHeap=false and offHeap=true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.snappydata

import java.util.concurrent.TimeUnit
import java.util.function.BiFunction

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -32,20 +31,18 @@ import com.gemstone.gemfire.CancelException
import com.gemstone.gemfire.cache.execute.FunctionService
import com.gemstone.gemfire.i18n.LogWriterI18n
import com.gemstone.gemfire.internal.SystemTimer
import com.gemstone.gemfire.internal.cache.{AbstractRegionEntry, PartitionedRegion, RegionEntry}
import com.gemstone.gemfire.internal.cache.PartitionedRegion
import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.engine.distributed.GfxdListResultCollector.ListResultCollectorValue
import com.pivotal.gemfirexd.internal.engine.distributed.{GfxdListResultCollector, GfxdMessage}
import com.pivotal.gemfirexd.internal.engine.sql.execute.MemberStatisticsMessage
import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer
import com.pivotal.gemfirexd.internal.engine.ui._
import io.snappydata.Constant._
import io.snappydata.sql.catalog.CatalogObjectType
import org.eclipse.collections.impl.map.mutable.UnifiedMap

import org.apache.spark.SparkContext
import org.apache.spark.sql.collection.Utils
import org.apache.spark.sql.execution.columnar.impl.{ColumnFormatKey, ColumnFormatRelation, ColumnFormatValue, RemoteEntriesIterator}
import org.apache.spark.sql.{SnappyContext, ThinClientConnectorMode}

/*
Expand Down Expand Up @@ -275,56 +272,22 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService
}
}

type PRIterator = PartitionedRegion#PRLocalScanIterator

/**
* Allows pulling stats rows efficiently if required. For the corner case
* of bucket moving away while iterating other buckets.
*/
private val createRemoteIterator = new BiFunction[java.lang.Integer, PRIterator,
java.util.Iterator[RegionEntry]] {
override def apply(bucketId: Integer,
iter: PRIterator): java.util.Iterator[RegionEntry] = {
new RemoteEntriesIterator(bucketId, Array.emptyIntArray,
iter.getPartitionedRegion, null)
}
}

def publishColumnTableRowCountStats(): Unit = {
val regions = Misc.getGemFireCache.getApplicationRegions.asScala
for (region <- regions) {
if (region.getDataPolicy.withPartitioning()) {
val table = Misc.getFullTableNameFromRegionPath(region.getFullPath)
val pr = region.asInstanceOf[PartitionedRegion]
val container = pr.getUserAttribute.asInstanceOf[GemFireContainer]
if (ColumnFormatRelation.isColumnTable(table) &&
pr.getLocalMaxMemory > 0) {
var numColumnsInTable = -1
if (pr.isInternalColumnTable && pr.getLocalMaxMemory > 0) {
// Resetting PR numRows in cached batch as this will be calculated every time.
var rowsInColumnBatch = 0L
var offHeapSize = 0L
if (container ne null) {
// TODO: SW: this should avoid iteration and use BucketRegion to get the sizes
val itr = new pr.PRLocalScanIterator(false /* primaryOnly */ , null /* no TX */ ,
null /* not required since includeValues is false */ ,
createRemoteIterator, false /* forUpdate */ , false /* includeValues */)
// using direct region operations
while (itr.hasNext) {
val re = itr.next().asInstanceOf[AbstractRegionEntry]
val key = re.getRawKey.asInstanceOf[ColumnFormatKey]
val bucketRegion = itr.getHostedBucketRegion
if (bucketRegion.getBucketAdvisor.isPrimary) {
if (numColumnsInTable < 0) {
numColumnsInTable = key.getNumColumnsInTable(table)
}
rowsInColumnBatch += key.getColumnBatchRowCount(bucketRegion, re,
numColumnsInTable)
}
re._getValue() match {
case v: ColumnFormatValue => offHeapSize += v.getOffHeapSizeInBytes
case _ =>
}
val buckets = pr.getDataStore.getAllLocalBucketRegions.iterator()
while (buckets.hasNext) {
val bucket = buckets.next()
if (bucket.getBucketAdvisor.isPrimary) {
rowsInColumnBatch += bucket.getNumRowsInColumnTable
}
offHeapSize += bucket.getDirectBytesSizeInMemory
}
val stats = pr.getPrStats
stats.setPRNumRowsInColumnBatches(rowsInColumnBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class StoreHiveCatalog extends ExternalCatalog with Logging {
}
// exclude policies also from the list of hive tables
val metaData = new ExternalTableMetaData(table.identifier.table,
table.database, tableType.toString, null, -1,
table.database, tableType.toString, null, table.schema.length, -1,
-1, null, null, null, null,
tblDataSourcePath, driverClass)
metaData.provider = table.provider match {
Expand Down Expand Up @@ -387,8 +387,8 @@ class StoreHiveCatalog extends ExternalCatalog with Logging {
}
new ExternalTableMetaData(qualifiedName, schema, tableType.toString,
ExternalStoreUtils.getExternalStoreOnExecutor(parameters, partitions, qualifiedName,
schema), columnBatchSize, columnMaxDeltaRows, compressionCodec, baseTable, dmls,
dependentRelations, tblDataSourcePath, driverClass).asInstanceOf[R]
schema), schema.length, columnBatchSize, columnMaxDeltaRows, compressionCodec,
baseTable, dmls, dependentRelations, tblDataSourcePath, driverClass).asInstanceOf[R]
}

case GET_METADATA =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package org.apache.spark.sql.execution.columnar.impl

import java.sql.{Connection, PreparedStatement}

import com.gemstone.gemfire.internal.cache.PartitionedRegion.RegionLock

import scala.util.control.NonFatal
import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, GemFireCacheImpl, LocalRegion, PartitionedRegion}

import com.gemstone.gemfire.internal.cache.PartitionedRegion.RegionLock
import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, LocalRegion, PartitionedRegion}
import com.pivotal.gemfirexd.internal.engine.Misc
import io.snappydata.sql.catalog.{RelationInfo, SnappyExternalCatalog}
import io.snappydata.{Constant, Property}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Descending, Expression, SortDirection}
Expand Down Expand Up @@ -236,7 +237,7 @@ abstract class BaseColumnFormatRelation(
override def getUpdatePlan(relation: LogicalRelation, child: SparkPlan,
updateColumns: Seq[Attribute], updateExpressions: Seq[Expression],
keyColumns: Seq[Attribute]): SparkPlan = {
withTableWriteLock() {() =>
withTableWriteLock() { () =>
ColumnUpdateExec(child, externalColumnTableName, partitionColumns,
partitionExpressions(relation), numBuckets, isPartitioned, schema, externalStore, this,
updateColumns, updateExpressions, keyColumns, connProperties, onExecutor = false)
Expand Down Expand Up @@ -276,7 +277,7 @@ abstract class BaseColumnFormatRelation(
val lock = snc.getContextObject[(Option[TableIdentifier], PartitionedRegion.RegionLock)](
SnappySession.PUTINTO_LOCK) match {
case None => snc.grabLock(table, schemaName, connProperties)
case Some(a) => null // Do nothing as putInto will release lock
case _ => null // Do nothing as putInto will release lock
}
if ((lock != null) && lock.isInstanceOf[RegionLock]) lock.asInstanceOf[RegionLock].lock()
try {
Expand All @@ -301,7 +302,7 @@ abstract class BaseColumnFormatRelation(
}
}
finally {
logDebug(s"Added the ${lock} object to the context. in InsertRows")
logDebug(s"Added the $lock object to the context. in InsertRows")
if (lock != null) {
snc.releaseLock(lock)
}
Expand All @@ -320,7 +321,7 @@ abstract class BaseColumnFormatRelation(
f()
}
finally {
logDebug(s"Added the ${lock} object to the context.")
logDebug(s"Added the $lock object to the context.")
if (lock != null) {
snc.addContextObject(
SnappySession.BULKWRITE_LOCK, lock)
Expand Down Expand Up @@ -712,11 +713,6 @@ object ColumnFormatRelation extends Logging with StoreCallback {
tableName + Constant.SHADOW_TABLE_SUFFIX
}

final def isColumnTable(tableName: String): Boolean = {
tableName.contains(Constant.SHADOW_SCHEMA_NAME_WITH_PREFIX) &&
tableName.endsWith(Constant.SHADOW_TABLE_SUFFIX)
}

def getIndexUpdateStruct(indexEntry: ExternalTableMetaData,
connectedExternalStore: ConnectedExternalStore):
ColumnFormatRelation.IndexUpdateStruct = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class JDBCSourceAsColumnarStore(private var _connProperties: ConnectionPropertie
connectionType match {
case ConnectionType.Embedded =>
val region = Misc.getRegionForTable[ColumnFormatKey, ColumnFormatValue](
columnTableName, true)
columnTableName, true).asInstanceOf[PartitionedRegion]
val key = new ColumnFormatKey(batchId, partitionId,
ColumnFormatEntry.DELETE_MASK_COL_INDEX)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ object StoreCallbacksImpl extends StoreCallbacks with Logging with Serializable
schemas
}

override def isColumnTable(qualifiedName: String): Boolean =
ColumnFormatRelation.isColumnTable(qualifiedName)

override def skipEvictionForEntry(entry: LRUEntry): Boolean = {
// skip eviction of stats rows (SNAP-2102)
entry.getRawKey match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import com.gemstone.gemfire.cache.{EntryEvent, EntryNotFoundException, Region}
import com.gemstone.gemfire.internal.cache.delta.Delta
import com.gemstone.gemfire.internal.cache.versions.{VersionSource, VersionTag}
import com.gemstone.gemfire.internal.cache.{DiskEntry, EntryEventImpl, GemFireCacheImpl}
import com.gemstone.gemfire.internal.cache.{DiskEntry, EntryEventImpl, GemFireCacheImpl, PartitionedRegion}
import com.gemstone.gemfire.internal.shared.FetchRequest
import com.pivotal.gemfirexd.internal.engine.GfxdSerializable
import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer
Expand Down Expand Up @@ -300,7 +300,7 @@ object ColumnDelta {
* Delete entire batch from column store for the batchId and partitionId
* matching those of given key.
*/
private[columnar] def deleteBatch(key: ColumnFormatKey, columnRegion: Region[_, _],
private[columnar] def deleteBatch(key: ColumnFormatKey, columnRegion: PartitionedRegion,
columnTableName: String): Unit = {

// delete all the rows with matching batchId
Expand All @@ -312,7 +312,7 @@ object ColumnDelta {
}
}

val numColumns = key.getNumColumnsInTable(columnTableName)
val numColumns = columnRegion.getNumColumns
// delete the stats rows first
destroyKey(key.withColumnIndex(ColumnFormatEntry.STATROW_COL_INDEX))
destroyKey(key.withColumnIndex(ColumnFormatEntry.DELTA_STATROW_COL_INDEX))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@ import com.gemstone.gemfire.internal.DSFIDFactory.GfxdDSFID
import com.gemstone.gemfire.internal.cache._
import com.gemstone.gemfire.internal.cache.lru.Sizeable
import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView
import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer
import com.gemstone.gemfire.internal.cache.store.{ColumnBatchKey, SerializedDiskBuffer}
import com.gemstone.gemfire.internal.shared._
import com.gemstone.gemfire.internal.shared.unsafe.DirectBufferAllocator
import com.gemstone.gemfire.internal.size.ReflectionSingleObjectSizer.REFERENCE_SIZE
import com.gemstone.gemfire.internal.{ByteBufferDataInput, DSCODE, DSFIDFactory, DataSerializableFixedID, HeapDataOutputStream}
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils
import com.pivotal.gemfirexd.internal.engine.store.{GemFireContainer, RegionKey}
import com.pivotal.gemfirexd.internal.engine.{GfxdDataSerializable, GfxdSerializable, Misc}
import com.pivotal.gemfirexd.internal.engine.store.RegionKey
import com.pivotal.gemfirexd.internal.engine.{GfxdSerializable, Misc}
import com.pivotal.gemfirexd.internal.iapi.types.{DataValueDescriptor, SQLInteger, SQLLongint}
import com.pivotal.gemfirexd.internal.impl.sql.compile.TableName
import com.pivotal.gemfirexd.internal.snappy.ColumnBatchKey

import org.apache.spark.memory.MemoryManagerCallback.{allocateExecutionMemory, memoryManager, releaseExecutionMemory}
import org.apache.spark.sql.collection.SharedUtils
Expand Down Expand Up @@ -99,46 +97,37 @@ object ColumnFormatEntry {
final class ColumnFormatKey(private[columnar] var uuid: Long,
private[columnar] var partitionId: Int,
private[columnar] var columnIndex: Int)
extends GfxdDataSerializable with ColumnBatchKey with RegionKey with Serializable {
extends ColumnBatchKey with GfxdSerializable with RegionKey with Serializable {

// to be used only by deserialization
def this() = this(-1L, -1, -1)

override def getNumColumnsInTable(columnTableName: String): Int = {
val bufferTable = GemFireContainer.getRowBufferTableName(columnTableName)
GemFireXDUtils.getGemFireContainer(bufferTable, true).getNumColumns - 1
}

override def getColumnBatchRowCount(bucketRegion: BucketRegion,
re: AbstractRegionEntry, numColumnsInTable: Int): Int = {
val currentBucketRegion = bucketRegion.getHostedBucketRegion
if ((columnIndex == ColumnFormatEntry.STATROW_COL_INDEX ||
value: SerializedDiskBuffer): Int = {
if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX ||
columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX ||
columnIndex == ColumnFormatEntry.DELETE_MASK_COL_INDEX) &&
!re.isDestroyedOrRemoved) {
val statsOrDeleteVal = re.getValue(currentBucketRegion)
if (statsOrDeleteVal ne null) {
val statsOrDelete = statsOrDeleteVal.asInstanceOf[ColumnFormatValue]
.getValueRetain(FetchRequest.DECOMPRESS)
val buffer = statsOrDelete.getBuffer
try {
if (buffer.remaining() > 0) {
if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX ||
columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX) {
val numColumns = ColumnStatsSchema.numStatsColumns(numColumnsInTable)
val unsafeRow = SharedUtils.toUnsafeRow(buffer, numColumns)
unsafeRow.getInt(ColumnStatsSchema.COUNT_INDEX_IN_SCHEMA)
} else {
val allocator = ColumnEncoding.getAllocator(buffer)
// decrement by deleted row count
-ColumnEncoding.readInt(allocator.baseObject(buffer),
allocator.baseOffset(buffer) + buffer.position() + 8)
}
} else 0
} finally {
statsOrDelete.release()
}
} else 0
columnIndex == ColumnFormatEntry.DELETE_MASK_COL_INDEX) {
val statsOrDelete = value.asInstanceOf[ColumnFormatValue]
.getValueRetain(FetchRequest.DECOMPRESS)
val buffer = statsOrDelete.getBuffer
try {
if (buffer.remaining() > 0) {
if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX ||
columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX) {
val numColumns = ColumnStatsSchema.numStatsColumns(
bucketRegion.getPartitionedRegion.getNumColumns)
val unsafeRow = SharedUtils.toUnsafeRow(buffer, numColumns)
unsafeRow.getInt(ColumnStatsSchema.COUNT_INDEX_IN_SCHEMA)
} else {
val allocator = ColumnEncoding.getAllocator(buffer)
// decrement by deleted row count
-ColumnEncoding.readInt(allocator.baseObject(buffer),
allocator.baseOffset(buffer) + buffer.position() + 8)
}
} else 0
} finally {
statsOrDelete.release()
}
} else 0
}

Expand All @@ -159,6 +148,8 @@ final class ColumnFormatKey(private[columnar] var uuid: Long,
case _ => false
}

override def getDSFID: Int = DataSerializableFixedID.GFXD_TYPE

override def getGfxdID: Byte = GfxdSerializable.COLUMN_FORMAT_KEY

override def toData(out: DataOutput): Unit = {
Expand All @@ -173,6 +164,8 @@ final class ColumnFormatKey(private[columnar] var uuid: Long,
columnIndex = in.readInt()
}

override def getSerializationVersions: Array[Version] = null

override def getSizeInBytes: Int = {
alignedSize(Sizeable.PER_OBJECT_OVERHEAD +
8 /* uuid */ + 4 /* columnIndex */ + 4 /* partitionId */)
Expand Down Expand Up @@ -483,7 +476,7 @@ class ColumnFormatValue extends SerializedDiskBuffer
if (this.refCount > 1 && isInRegion(context)) {
// update the statistics before changing self
val newVal = copy(newBuffer, isCompressed, changeOwnerToStorage = false)
context.updateMemoryStats(this, newVal)
context.updateMemoryStats(this, newVal, this.entry)
}
this.columnBuffer = newBuffer
this.decompressionState = state
Expand Down
2 changes: 1 addition & 1 deletion store
Submodule store updated from 26b2ad to e2410e