diff --git a/Source/ColorSummary.jl b/Source/ColorSummary.jl index 48bad42..c4d0b2a 100644 --- a/Source/ColorSummary.jl +++ b/Source/ColorSummary.jl @@ -245,8 +245,10 @@ function join_table_cycle_likelihoods(g::DataGraph, color_hash, cycle_size::Int, # start with a forward edge. # detailed_edges::Set{Tuple{Int, Int, Int, Int, Vector{Bool}}} = Set() + # [n1, n2, c1, c2, [d]] + DetailedEdge = Tuple{Int, Int, Color, Color, Vector{Bool}} # map start_node => vector of edges with that start node - detailed_edges::Dict{Int, Set{Tuple{Int, Int, Int, Int, Vector{Bool}}}} = Dict(i => Set() for i in vertices(g.graph)) + detailed_edges::Dict{Int, Vector{DetailedEdge}} = Dict(i => [] for i in vertices(g.graph)) for edge in edges(g.graph) # detailed edge = [n1, n2, c1, c2, [d]] detailed_edge = (src(edge), dst(edge), color_hash[src(edge)], color_hash[dst(edge)], [true]) @@ -258,31 +260,44 @@ function join_table_cycle_likelihoods(g::DataGraph, color_hash, cycle_size::Int, # create tables for each size of cycle/path stored_cycles::Dict{CyclePathAndColors, Float32} = Dict() # this stores summary info representing the path lengths we want to close stored_paths::Dict{CyclePathAndColors, Float32} = Dict() # this stores summary info representing the path lengths that actually closed - # summary info = [c1, c2, [d]] + unique_path_counts = counter(CyclePathAndColors) # initialize with size two data # start up the "current joins" vector - updated_paths::Dict{Tuple{Int, Int, Int, Int, Vector{Bool}}, Float64} = Dict() # stores our progress as we repeatedly join - for edge_set in values(detailed_edges) - for edge in edge_set - summary_info = CyclePathAndColors(edge[5], (edge[3], edge[4])) - updated_paths[edge] = 1.0 - stored_paths[summary_info] = get(stored_paths, summary_info, 0) + 1.0 - edge_dir = edge[5][1] - is_closed = (edge[1], edge[2], color_hash[edge[1]], color_hash[edge[2]], [!edge_dir]) in detailed_edges[edge[1]] - stored_cycles[summary_info] = get(stored_cycles, summary_info, 0) + (is_closed ? 1.0 : 0.0) - end + updated_paths::Dict{DetailedEdge, Float32} = Dict() # stores our progress as we repeatedly join + starting_edges = ne(g.graph) > max_partial_paths ? sample(collect(edges(g.graph)), max_partial_paths, replace=false) : collect(edges(g.graph)) + for edge in starting_edges + # summary info = [c1, c2, [d]] + src_node = src(edge) + dst_node = dst(edge) + c1 = color_hash[src_node] + c2 = color_hash[dst_node] + out_summary_info = CyclePathAndColors([true], (c1, c2)) + in_summary_info = CyclePathAndColors([false], (c2, c1)) + out_detailed_edge = (src_node, dst_node, c1, c2, [true]) + in_detailed_edge = (src_node, src_node, c2, c1, [false]) + updated_paths[out_detailed_edge] = 1.0 + updated_paths[in_detailed_edge] = 1.0 + stored_paths[out_summary_info] = get(stored_paths, out_summary_info, 0) + 1.0 + stored_paths[in_summary_info] = get(stored_paths, in_summary_info, 0) + 1.0 + inc!(unique_path_counts, out_summary_info) + inc!(unique_path_counts, in_summary_info) + + is_closed = has_edge(g.graph, dst_node, src_node) + stored_cycles[in_summary_info] = get(stored_cycles, in_summary_info, 0) + (is_closed ? 1.0 : 0.0) + stored_cycles[out_summary_info] = get(stored_cycles, out_summary_info, 0) + (is_closed ? 1.0 : 0.0) end + rng = Random.default_rng() # for each cycle size... - for current_cycle_size in 3: cycle_size + for current_cycle_size in 2 : cycle_size # new_paths stores the current detailed paths and their count - new_paths::Dict{Tuple{Int, Int, Int, Int, Vector{Bool}}, Float64} = Dict() - joined_path::Tuple{Int, Int, Int, Int, Vector{Bool}} = (0,0,0,0,[]) + new_paths::Dict{DetailedEdge, Float64} = Dict() + joined_path::DetailedEdge = (0,0,0,0,[]) # join/aggregate all of the current paths with their connected edges # runtime of size-n iteration through the loop is o(e^[n-1]), where e is size of edges if (length(updated_paths) > max_partial_paths) - new_dict::Dict{Tuple{Int, Int, Int, Int, Vector{Bool}}, Float64} = Dict() + new_dict::Dict{DetailedEdge, Float64} = Dict() current_paths = collect(keys(updated_paths)) sampled_paths = sample(current_paths, max_partial_paths; replace=false) sampled_weight = 0.0 @@ -297,19 +312,31 @@ function join_table_cycle_likelihoods(g::DataGraph, color_hash, cycle_size::Int, end for condensed_path in keys(updated_paths) # first join/aggregate everything - for edge in detailed_edges[condensed_path[2]] - # [n1, n2, c1, c2, [d]] - joined_path = (condensed_path[1], edge[2], condensed_path[3], edge[4], cat(condensed_path[5], edge[5], dims=1)) - # this aggregation should improve runtime... - new_paths[joined_path] = get(new_paths, joined_path, updated_paths[condensed_path]-1) + 1 + left_node = condensed_path[2] + num_neighbors = length(all_neighbors(g.graph, left_node)) + num_sampled = 5 + for _ in 1:num_sampled + neighbor = all_neighbors(g.graph, left_node)[(rand(rng, UInt32) .% num_neighbors) + 1] + neighbor_color = color_hash[neighbor] + is_out_edge = has_edge(g.graph, left_node, neighbor) + if is_out_edge + joined_path = (condensed_path[1], neighbor, condensed_path[3], neighbor_color, cat(condensed_path[5], [true], dims=1)) + new_paths[joined_path] = get(new_paths, joined_path, 0) + updated_paths[condensed_path] * num_neighbors/num_sampled + end + is_in_edge = has_edge(g.graph, neighbor, left_node) + if is_in_edge + joined_path = (condensed_path[1], neighbor, condensed_path[3], neighbor_color, cat(condensed_path[5], [false], dims=1)) + new_paths[joined_path] = get(new_paths, joined_path, 0) + updated_paths[condensed_path] * num_neighbors/num_sampled + end end end # go through all of the extended paths and track which ones close in a cycle for joined_path in keys(new_paths) # o(e) summary_info = CyclePathAndColors(joined_path[5], (joined_path[3], joined_path[4])) stored_paths[summary_info] = get(stored_paths, summary_info, 0) + new_paths[joined_path] + inc!(unique_path_counts, summary_info) # if the cycle exists, store the cycle in the overall cycles - if (joined_path[1], joined_path[2], joined_path[3], joined_path[4], [false]) in detailed_edges[joined_path[1]] + if has_edge(g.graph, joined_path[1], joined_path[2]) stored_cycles[summary_info] = get(stored_cycles, summary_info, 0) + new_paths[joined_path] end end @@ -318,31 +345,32 @@ function join_table_cycle_likelihoods(g::DataGraph, color_hash, cycle_size::Int, end # now go through and aggregate all duplicates - MinPathWeight = 25 + MinUniquePaths = 10 default_cycle_weights = Dict() default_cycle_counts = Dict() - cycle_length_weights = Dict(i => 0.0 for i in 2:cycle_size) - cycle_length_counts = Dict(i => 0.0 for i in 2:cycle_size) + cycle_length_weights = Dict(i => 0.0 for i in 1:cycle_size) + cycle_length_counts = Dict(i => 0.0 for i in 1:cycle_size) for path in keys(stored_paths) # CyclePathAndColors => count - if stored_paths[path] >= MinPathWeight + if unique_path_counts[path] >= MinUniquePaths cycle_likelihoods[path] = get(stored_cycles, path, 0) / stored_paths[path] end default_path = color_path_to_default(path) + inc!(unique_path_counts, default_path) default_cycle_weights[default_path] = get(default_cycle_weights, default_path, 0) + get(stored_cycles, path, 0) default_cycle_counts[default_path] = get(default_cycle_counts, default_path, 0) + stored_paths[path] - path_length = length(path.path) + 1 + path_length = length(path.path) cycle_length_weights[path_length] = get(cycle_length_weights, path_length, 0) + get(stored_cycles, path, 0) cycle_length_counts[path_length] = get(cycle_length_counts, path_length, 0) + stored_paths[path] end for default_path in keys(default_cycle_weights) - if default_cycle_counts[default_path] < MinPathWeight - continue + if unique_path_counts[default_path] >= MinUniquePaths + cycle_likelihoods[default_path] = default_cycle_weights[default_path] / default_cycle_counts[default_path] end - cycle_likelihoods[default_path] = default_cycle_weights[default_path] / default_cycle_counts[default_path] end - cycle_length_likelihoods = Dict(i => cycle_length_counts[i] == 0 ? 0 : cycle_length_weights[i] / cycle_length_counts[i] for i in 2:cycle_size) + cycle_length_likelihoods = Dict(i => cycle_length_counts[i] == 0 ? 0 : cycle_length_weights[i] / cycle_length_counts[i] for i in 1:cycle_size) + println(cycle_length_likelihoods) return cycle_likelihoods, cycle_length_likelihoods end diff --git a/Source/QuasiStableCardinalityEstimator.jl b/Source/QuasiStableCardinalityEstimator.jl index 7c9c102..41b5163 100644 --- a/Source/QuasiStableCardinalityEstimator.jl +++ b/Source/QuasiStableCardinalityEstimator.jl @@ -224,14 +224,16 @@ function handle_extra_edges!(query::QueryGraph, summary::ColorSummary{DS}, parti else probability_no_edge *= (1.0 - get_independent_cycle_likelihood(summary))^path_count end + default_no_edge_probability *= probability_no_edge default_no_edge_probabilities[path_bools] = probability_no_edge - if length(path_bools) == 1 - println("Path Bools: ", path_bools, "DefaultProb: ", haskey(summary.cycle_probabilities, default_cycle_description), " CycleSize: ", haskey(summary.cycle_length_probabilities, path_length), " Prob Edge: ", probability_no_edge) + if probability_no_edge == 1.0 + println(path_length) + println(" Prob No Edge: ", default_no_edge_probability, "Has Default Prob: ", haskey(summary.cycle_probabilities, default_cycle_description), " Default Prob: ", get(summary.cycle_probabilities, default_cycle_description, 0), " Length Prob: ", summary.cycle_length_probabilities[path_length]) end - end + edge_deg::Dict{Int, Dict{Int, DS}} = Dict() if haskey(summary.edge_deg, edge_label) && haskey(summary.edge_deg[edge_label], child_label) edge_deg = summary.edge_deg[edge_label][child_label] @@ -380,7 +382,7 @@ function get_cardinality_bounds(query::QueryGraph, summary::ColorSummary{DS}; ma new_label = only(query.vertex_labels[new_node]) new_data_labels = get_data_label(query, new_node) num_current_paths = size(partial_paths)[2] - num_current_paths * num_colors > 10^9 && return get_default_count(DS) # If the requested memory is too large, return the default. + num_current_paths * num_colors > 10^8 && return get_default_count(DS) # If the requested memory is too large, return a timeout. new_partial_paths = zeros(Color, length(current_query_nodes), num_current_paths * num_colors) new_partial_weights = fill(W(0), num_current_paths * num_colors) # Update the partial paths using the parent-child combo that comes next from the query. @@ -444,4 +446,4 @@ function get_cardinality_bounds(query::QueryGraph, summary::ColorSummary{DS}; ma final_estimate += get_count(w) end return final_estimate -end \ No newline at end of file +end