From 809eda861e3e6e1a1c78c7a0f098e71add275ec6 Mon Sep 17 00:00:00 2001 From: benjobs Date: Fri, 6 Oct 2023 02:17:42 -0500 Subject: [PATCH] [Bug] Flink job modify_time change with duration bug fixed #3188 (#3211) * [BUG] Flink job modify_time change with duration bug fixed #3188 * minor improvement * README linkis minor improvement --- README.md | 3 +- .../common/util/PropertiesUtils.scala | 16 ++--- .../assembly/script/upgrade/mysql/2.2.2.sql | 5 +- .../assembly/script/upgrade/pgsql/2.2.0.sql | 68 +++++++++---------- .../src/main/resources/db/schema-h2.sql | 2 +- 5 files changed, 46 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 362793a5f8..485b160a83 100644 --- a/README.md +++ b/README.md @@ -67,8 +67,7 @@ git clone git@github.com:apache/incubator-streampark.git cd incubator-streampark ./build.sh ``` -๐Ÿ—„๏ธDeatils:how to [Local Development and Debugging](https://streampark.apache.org/docs/user-guide/local%20development%20and%20debugging) - +๐Ÿ—„ Deatils:how to [Development](https://streampark.apache.org/docs/user-guide/development) ## ๐Ÿง‘โ€๐Ÿ’ป Downloads Download address for run-directly software package : [https://streampark.apache.org/download](https://streampark.apache.org/download) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 91b25c2f1a..706b3b0905 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -53,19 +53,19 @@ object PropertiesUtils extends Logger { buffer.toString() } - private[this] def eachAppendYamlItem( - prefix: String, + private[this] def eachYamlItem( k: String, v: Any, - proper: collection.mutable.Map[String, String]): Map[String, String] = { + prefix: String = "", + proper: MutableMap[String, String] = MutableMap[String, String]()): Map[String, String] = { v match { case map: JavaLinkedMap[String, Any] => map .flatMap( x => { prefix match { - case "" => eachAppendYamlItem(k, x._1, x._2, proper) - case other => eachAppendYamlItem(s"$other.$k", x._1, x._2, proper) + case "" => eachYamlItem(x._1, x._2, k, proper) + case other => eachYamlItem(x._1, x._2, s"$other.$k", proper) } }) .toMap @@ -84,11 +84,10 @@ object PropertiesUtils extends Logger { def fromYamlText(text: String): Map[String, String] = { try { - val map = MutableMap[String, String]() new Yaml() .load(text) .asInstanceOf[java.util.Map[String, Map[String, Any]]] - .flatMap(x => eachAppendYamlItem("", x._1, x._2, map)) + .flatMap(x => eachYamlItem(x._1, x._2)) .toMap } catch { case e: IOException => @@ -148,11 +147,10 @@ object PropertiesUtils extends Logger { inputStream != null, s"[StreamPark] fromYamlFile: Properties inputStream must not be null") try { - val map = MutableMap[String, String]() new Yaml() .load(inputStream) .asInstanceOf[java.util.Map[String, Map[String, Any]]] - .flatMap(x => eachAppendYamlItem("", x._1, x._2, map)) + .flatMap(x => eachYamlItem(x._1, x._2)) .toMap } catch { case e: IOException => diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql index 9fe81443d2..e760ce4847 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.2.sql @@ -21,5 +21,6 @@ set names utf8mb4; set foreign_key_checks = 0; alter table `t_flink_app` - add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null; - + add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null, + -- modify_time change with duration #3188 + modify column `modify_time` datetime not null default current_timestamp comment 'modify time'; diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql index 508c8cc07b..8e78a37cb7 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql @@ -19,25 +19,25 @@ -- Table of t_resource -- ---------------------------- create sequence "public"."streampark_t_resource_id_seq" - increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; +increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; create table "public"."t_resource" ( - "id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass), - "resource_name" varchar(128) collate "pg_catalog"."default" not null, - "resource_type" int4, - "resource_path" varchar(255) default null, - "resource" text collate "pg_catalog"."default", - "engine_type" int4, - "main_class" varchar(255) collate "pg_catalog"."default", - "description" text collate "pg_catalog"."default" default null, - "creator_id" int8 not null, - "connector_required_options" text default null, - "connector_optional_options" text default null, - "team_id" int8 not null, - "create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone), - "modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone) -) -; +"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass), +"resource_name" varchar(128) collate "pg_catalog"."default" not null, +"resource_type" int4, +"resource_path" varchar(255) default null, +"resource" text collate "pg_catalog"."default", +"engine_type" int4, +"main_class" varchar(255) collate "pg_catalog"."default", +"description" text collate "pg_catalog"."default" default null, +"creator_id" int8 not null, +"connector_required_options" text default null, +"connector_optional_options" text default null, +"team_id" int8 not null, +"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone), +"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone) +); + comment on column "public"."t_resource"."id" is 'Resource id'; comment on column "public"."t_resource"."resource_name" is 'Resource name'; comment on column "public"."t_resource"."resource_type" is '0:app 1:common 2:connector 3:format 4:udf'; @@ -52,21 +52,21 @@ comment on column "public"."t_resource"."modify_time" is 'modify time'; alter table "public"."t_resource" add constraint "t_resource_pkey" primary key ("id"); create index "un_team_dname_inx" on "public"."t_resource" using btree ( - "team_id" "pg_catalog"."int8_ops" asc nulls last, - "resource_name" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last - ); +"team_id" "pg_catalog"."int8_ops" asc nulls last, +"resource_name" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last +); alter table "public"."t_flink_sql" - add column "team_resource" varchar(64) default null; +add column "team_resource" varchar(64) default null; alter table "public"."t_flink_app" - add column "probing" boolean default false; +add column "probing" boolean default false; alter table "public"."t_flink_cluster" - add column "job_manager_url" varchar(150) collate "pg_catalog"."default", - add column "start_time" timestamp(6) collate "pg_catalog"."default", - add column "end_time" timestamp(6) collate "pg_catalog"."default", - add column "alert_id" int8 collate "pg_catalog"."default"; +add column "job_manager_url" varchar(150) collate "pg_catalog"."default", +add column "start_time" timestamp(6) collate "pg_catalog"."default", +add column "end_time" timestamp(6) collate "pg_catalog"."default", +add column "alert_id" int8 collate "pg_catalog"."default"; insert into "public"."t_menu" values (120400, 120000, 'menu.resource', '/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3, now(), now()); insert into "public"."t_menu" values (110401, 110400, 'add', null, null, 'token:add', null, '1', '1', null, now(), now()); @@ -91,16 +91,16 @@ comment on column "public"."t_user"."login_type" is 'login type 0:password 1:lda -- Table of t_flink_gateway -- ---------------------------- create sequence "public"."streampark_t_flink_gateway_id_seq" - increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; +increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; create table "public"."t_flink_gateway" ( - "id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass), - "gateway_name" varchar(128) collate "pg_catalog"."default" not null, - "description" text collate "pg_catalog"."default" default null, - "gateway_type" int4, - "address" varchar(150) collate "pg_catalog"."default", - "create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone), - "modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone) +"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass), +"gateway_name" varchar(128) collate "pg_catalog"."default" not null, +"description" text collate "pg_catalog"."default" default null, +"gateway_type" int4, +"address" varchar(150) collate "pg_catalog"."default", +"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone), +"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone) ); comment on column "public"."t_flink_gateway"."id" is 'The id of the gateway'; comment on column "public"."t_flink_gateway"."gateway_name" is 'The name of the gateway'; diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index a509afa5e2..34823e074f 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -80,7 +80,7 @@ create table if not exists `t_flink_app` ( `option_state` tinyint default null, `tracking` tinyint default null, `create_time` datetime not null default current_timestamp comment 'create time', - `modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time', + `modify_time` datetime not null default current_timestamp comment 'modify time', `option_time` datetime default null, `release` tinyint default 1, `build` tinyint default 1,