Skip to content

Commit

Permalink
[Bug] Flink job modify_time change with duration bug fixed #3188 (#3211)
Browse files Browse the repository at this point in the history
* [BUG] Flink job modify_time change with duration bug fixed #3188

* minor improvement

* README linkis minor improvement
  • Loading branch information
wolfboys authored Oct 6, 2023
1 parent 106160b commit 809eda8
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 48 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ git clone [email protected]:apache/incubator-streampark.git
cd incubator-streampark
./build.sh
```
🗄️Deatils:how to [Local Development and Debugging](https://streampark.apache.org/docs/user-guide/local%20development%20and%20debugging)

🗄 Deatils:how to [Development](https://streampark.apache.org/docs/user-guide/development)
## 🧑‍💻 Downloads

Download address for run-directly software package : [https://streampark.apache.org/download](https://streampark.apache.org/download)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ object PropertiesUtils extends Logger {
buffer.toString()
}

private[this] def eachAppendYamlItem(
prefix: String,
private[this] def eachYamlItem(
k: String,
v: Any,
proper: collection.mutable.Map[String, String]): Map[String, String] = {
prefix: String = "",
proper: MutableMap[String, String] = MutableMap[String, String]()): Map[String, String] = {
v match {
case map: JavaLinkedMap[String, Any] =>
map
.flatMap(
x => {
prefix match {
case "" => eachAppendYamlItem(k, x._1, x._2, proper)
case other => eachAppendYamlItem(s"$other.$k", x._1, x._2, proper)
case "" => eachYamlItem(x._1, x._2, k, proper)
case other => eachYamlItem(x._1, x._2, s"$other.$k", proper)
}
})
.toMap
Expand All @@ -84,11 +84,10 @@ object PropertiesUtils extends Logger {

def fromYamlText(text: String): Map[String, String] = {
try {
val map = MutableMap[String, String]()
new Yaml()
.load(text)
.asInstanceOf[java.util.Map[String, Map[String, Any]]]
.flatMap(x => eachAppendYamlItem("", x._1, x._2, map))
.flatMap(x => eachYamlItem(x._1, x._2))
.toMap
} catch {
case e: IOException =>
Expand Down Expand Up @@ -148,11 +147,10 @@ object PropertiesUtils extends Logger {
inputStream != null,
s"[StreamPark] fromYamlFile: Properties inputStream must not be null")
try {
val map = MutableMap[String, String]()
new Yaml()
.load(inputStream)
.asInstanceOf[java.util.Map[String, Map[String, Any]]]
.flatMap(x => eachAppendYamlItem("", x._1, x._2, map))
.flatMap(x => eachYamlItem(x._1, x._2))
.toMap
} catch {
case e: IOException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ set names utf8mb4;
set foreign_key_checks = 0;

alter table `t_flink_app`
add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null;

add column `k8s_name` varchar(63) collate utf8mb4_general_ci default null,
-- modify_time change with duration #3188
modify column `modify_time` datetime not null default current_timestamp comment 'modify time';
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@
-- Table of t_resource
-- ----------------------------
create sequence "public"."streampark_t_resource_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;

create table "public"."t_resource" (
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"resource_name" varchar(128) collate "pg_catalog"."default" not null,
"resource_type" int4,
"resource_path" varchar(255) default null,
"resource" text collate "pg_catalog"."default",
"engine_type" int4,
"main_class" varchar(255) collate "pg_catalog"."default",
"description" text collate "pg_catalog"."default" default null,
"creator_id" int8 not null,
"connector_required_options" text default null,
"connector_optional_options" text default null,
"team_id" int8 not null,
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
)
;
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"resource_name" varchar(128) collate "pg_catalog"."default" not null,
"resource_type" int4,
"resource_path" varchar(255) default null,
"resource" text collate "pg_catalog"."default",
"engine_type" int4,
"main_class" varchar(255) collate "pg_catalog"."default",
"description" text collate "pg_catalog"."default" default null,
"creator_id" int8 not null,
"connector_required_options" text default null,
"connector_optional_options" text default null,
"team_id" int8 not null,
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
);

comment on column "public"."t_resource"."id" is 'Resource id';
comment on column "public"."t_resource"."resource_name" is 'Resource name';
comment on column "public"."t_resource"."resource_type" is '0:app 1:common 2:connector 3:format 4:udf';
Expand All @@ -52,21 +52,21 @@ comment on column "public"."t_resource"."modify_time" is 'modify time';

alter table "public"."t_resource" add constraint "t_resource_pkey" primary key ("id");
create index "un_team_dname_inx" on "public"."t_resource" using btree (
"team_id" "pg_catalog"."int8_ops" asc nulls last,
"resource_name" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last
);
"team_id" "pg_catalog"."int8_ops" asc nulls last,
"resource_name" collate "pg_catalog"."default" "pg_catalog"."text_ops" asc nulls last
);

alter table "public"."t_flink_sql"
add column "team_resource" varchar(64) default null;
add column "team_resource" varchar(64) default null;

alter table "public"."t_flink_app"
add column "probing" boolean default false;
add column "probing" boolean default false;

alter table "public"."t_flink_cluster"
add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
add column "start_time" timestamp(6) collate "pg_catalog"."default",
add column "end_time" timestamp(6) collate "pg_catalog"."default",
add column "alert_id" int8 collate "pg_catalog"."default";
add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
add column "start_time" timestamp(6) collate "pg_catalog"."default",
add column "end_time" timestamp(6) collate "pg_catalog"."default",
add column "alert_id" int8 collate "pg_catalog"."default";

insert into "public"."t_menu" values (120400, 120000, 'menu.resource', '/flink/resource', 'flink/resource/View', null, 'apartment', '0', '1', 3, now(), now());
insert into "public"."t_menu" values (110401, 110400, 'add', null, null, 'token:add', null, '1', '1', null, now(), now());
Expand All @@ -91,16 +91,16 @@ comment on column "public"."t_user"."login_type" is 'login type 0:password 1:lda
-- Table of t_flink_gateway
-- ----------------------------
create sequence "public"."streampark_t_flink_gateway_id_seq"
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;
increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807;

create table "public"."t_flink_gateway" (
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"gateway_name" varchar(128) collate "pg_catalog"."default" not null,
"description" text collate "pg_catalog"."default" default null,
"gateway_type" int4,
"address" varchar(150) collate "pg_catalog"."default",
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
"id" int8 not null default nextval('streampark_t_resource_id_seq'::regclass),
"gateway_name" varchar(128) collate "pg_catalog"."default" not null,
"description" text collate "pg_catalog"."default" default null,
"gateway_type" int4,
"address" varchar(150) collate "pg_catalog"."default",
"create_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
"modify_time" timestamp(6) not null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
);
comment on column "public"."t_flink_gateway"."id" is 'The id of the gateway';
comment on column "public"."t_flink_gateway"."gateway_name" is 'The name of the gateway';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ create table if not exists `t_flink_app` (
`option_state` tinyint default null,
`tracking` tinyint default null,
`create_time` datetime not null default current_timestamp comment 'create time',
`modify_time` datetime not null default current_timestamp on update current_timestamp comment 'modify time',
`modify_time` datetime not null default current_timestamp comment 'modify time',
`option_time` datetime default null,
`release` tinyint default 1,
`build` tinyint default 1,
Expand Down

0 comments on commit 809eda8

Please sign in to comment.