Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [Module Name] Bug title mysql-mysql中加入transformer操作,sql可能有点长报错 #1896

Open
2 of 3 tasks
gongpengsheng opened this issue May 14, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@gongpengsheng
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

"transformer": {
"transformSql": "SELECT b.uuid, b.xh, b.xz, b.cm, b.xm , CASE WHEN b.sfzhm IS NOT NULL THEN CASE WHEN MOD(CAST(SUBSTR(b.sfzhm, 17, 1) AS INTEGER), 2) > 0 THEN 1 ELSE 2 END ELSE 9 END AS xb, CAST(b.sfzhm AS VARCHAR(255)) AS sfzhm, b.zh , if(b.jzje SIMILAR TO '^[0-9]*$', b.jzje, CAST(NULL AS INTEGER)) AS jzje , b.bz, b.xm2, b.sfzhm2, b.ht_data_create_time, b.create_by , b.del_flag, b.log_id FROM ( SELECT a.uuid, a.xh, a.xz, a.cm, a.xm , a.xb , CASE WHEN mod(CAST(substr(a.sfzhmjy, 1, 1) AS INTEGER) * 7 + CAST(substr(a.sfzhmjy, 2, 1) AS INTEGER) * 9 + CAST(substr(a.sfzhmjy, 3, 1) AS INTEGER) * 10 + CAST(substr(a.sfzhmjy, 4, 1) AS INTEGER) * 5 + CAST(substr(a.sfzhmjy, 5, 1) AS INTEGER) * 8 + CAST(substr(a.sfzhmjy, 6, 1) AS INTEGER) * 4 + CAST(substr(a.sfzhmjy, 7, 1) AS INTEGER) * 2 + CAST(substr(a.sfzhmjy, 8, 1) AS INTEGER) * 1 + CAST(substr(a.sfzhmjy, 9, 1) AS INTEGER) * 6 + CAST(substr(a.sfzhmjy, 10, 1) AS INTEGER) * 3 + CAST(substr(a.sfzhmjy, 11, 1) AS INTEGER) * 7 + CAST(substr(a.sfzhmjy, 12, 1) AS INTEGER) * 9 + CAST(substr(a.sfzhmjy, 13, 1) AS INTEGER) * 10 + CAST(substr(a.sfzhmjy, 14, 1) AS INTEGER) * 5 + CAST(substr(a.sfzhmjy, 15, 1) AS INTEGER) * 8 + CAST(substr(a.sfzhmjy, 16, 1) AS INTEGER) * 4 + CAST(substr(a.sfzhmjy, 17, 1) AS INTEGER) * 2, 11) <> CASE WHEN substr(a.sfzhmjy, 18, 1) = '1' THEN '0' WHEN substr(a.sfzhmjy, 18, 1) = '0' THEN '1' WHEN substr(a.sfzhmjy, 18, 1) IN ('X', 'x') THEN '2' WHEN substr(a.sfzhmjy, 18, 1) = '9' THEN '3' WHEN substr(a.sfzhmjy, 18, 1) = '8' THEN '4' WHEN substr(a.sfzhmjy, 18, 1) = '7' THEN '5' WHEN substr(a.sfzhmjy, 18, 1) = '6' THEN '6' WHEN substr(a.sfzhmjy, 18, 1) = '5' THEN '7' WHEN substr(a.sfzhmjy, 18, 1) = '4' THEN '8' WHEN substr(a.sfzhmjy, 18, 1) = '3' THEN '9' WHEN substr(a.sfzhmjy, 18, 1) = '2' THEN '10' END THEN NULL ELSE a.sfzhmjy END AS sfzhm, a.zh, a.jzje, a.bz, a.xm2 , a.sfzhm2, a.ht_data_create_time, a.create_by, a.del_flag, a.log_id FROM ( SELECT uuid, xh, xz, cm, xm , xb , CASE WHEN CHARACTER_LENGTH(sfzhm) = 15 THEN CONCAT(SUBSTRING(sfzhm, 1, 6), '19', SUBSTRING(sfzhm, 7, 9), SUBSTRING('10X98765432', (CAST(SUBSTRING(sfzhm, 1, 1) AS INTEGER) * 7 + CAST(SUBSTRING(sfzhm, 2, 1) AS INTEGER) * 9 + CAST(SUBSTRING(sfzhm, 3, 1) AS INTEGER) * 10 + CAST(SUBSTRING(sfzhm, 4, 1) AS INTEGER) * 5 + CAST(SUBSTRING(sfzhm, 5, 1) AS INTEGER) * 8 + CAST(SUBSTRING(sfzhm, 6, 1) AS INTEGER) * 4 + 1 * 2 + 9 * 1 + CAST(SUBSTRING(sfzhm, 7, 1) AS INTEGER) * 6 + CAST(SUBSTRING(sfzhm, 8, 1) AS INTEGER) * 3 + CAST(SUBSTRING(sfzhm, 9, 1) AS INTEGER) * 7 + CAST(SUBSTRING(sfzhm, 10, 1) AS INTEGER) * 9 + CAST(SUBSTRING(sfzhm, 11, 1) AS INTEGER) * 10 + CAST(SUBSTRING(sfzhm, 12, 1) AS INTEGER) * 5 + CAST(SUBSTRING(sfzhm, 13, 1) AS INTEGER) * 8 + CAST(SUBSTRING(sfzhm, 14, 1) AS INTEGER) * 4 + CAST(SUBSTRING(sfzhm, 15, 1) AS INTEGER) * 2) % 11 + 1, 1)) WHEN CHARACTER_LENGTH(sfzhm) = 18 THEN sfzhm ELSE sfzhm END AS sfzhmjy, zh, jzje, bz, xm2 , sfzhm2, ht_data_create_time, create_by, del_flag, log_id FROM sourceTable ) a ) b"
}

What you expected to happen

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:811)
at com.dtstack.chunjun.environment.MyLocalStreamEnvironment.execute(MyLocalStreamEnvironment.java:194)
at com.dtstack.chunjun.Main.exeSyncJob(Main.java:227)
at com.dtstack.chunjun.Main.main(Main.java:118)
at com.dtstack.chunjun.client.local.LocalClusterClientHelper.submit(LocalClusterClientHelper.java:35)
at com.dtstack.chunjun.client.Launcher.main(Launcher.java:119)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:666)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:446)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not instantiate generated class 'StreamExecCalc$88216'
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:66)
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40)
at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:646)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:620)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:560)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:177)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:519)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:77)
at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:64)
... 11 more
Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
... 13 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 16 more
Caused by: org.codehaus.janino.InternalCompilerException: Compiling "StreamExecCalc$88216": Code of method "split$88205$(LStreamExecCalc$88216;)V" of class "StreamExecCalc$88216" grows beyond 64 KB
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86)
... 22 more
Caused by: org.codehaus.janino.InternalCompilerException: Code of method "split$88205$(LStreamExecCalc$88216;)V" of class "StreamExecCalc$88216" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:1048)
at org.codehaus.janino.CodeContext.write(CodeContext.java:940)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:1065)
at org.codehaus.janino.UnitCompiler.writeConstantMethodrefInfo(UnitCompiler.java:12324)
at org.codehaus.janino.UnitCompiler.invoke(UnitCompiler.java:12084)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5212)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3792)
at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3754)
at org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3734)
at org.codehaus.janino.Java$Assignment.accept(Java.java:4477)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
at org.codehaus.janino.Java$Block.accept(Java.java:2779)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476)
at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2950)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
at org.codehaus.janino.Java$Block.accept(Java.java:2779)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2950)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553)
at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493)
at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487)
at org.codehaus.janino.Java$Block.accept(Java.java:2779)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2465)
at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495)
at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487)
at org.codehaus.janino.Java$IfStatement.accept(Java.java:2950)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
... 29 more

How to reproduce

{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"table": {
"tableName": "sourceTable"
},
"parameter": {
"column": [
{
"name": "uuid",
"type": "bigint"
},
{
"name": "xh",
"type": "varchar"
},
{
"name": "xz",
"type": "varchar"
},
{
"name": "cm",
"type": "varchar"
},
{
"name": "xm",
"type": "varchar"
},
{
"name": "xb",
"type": "varchar"
},
{
"name": "sfzhm",
"type": "varchar"
},
{
"name": "zh",
"type": "varchar"
},
{
"name": "jzje",
"type": "varchar"
},
{
"name": "bz",
"type": "varchar"
},
{
"name": "xm2",
"type": "varchar"
},
{
"name": "sfzhm2",
"type": "varchar"
},
{
"name": "ht_data_create_time",
"type": "datetime"
},
{
"name": "create_by",
"type": "varchar"
},
{
"name": "del_flag",
"type": "tinyint"
},
{
"name": "log_id",
"type": "varchar"
}
],
"username": "root",
"password": "root@123",
"queryTimeOut": 2000,
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://xxx:3306/chunjun?useSSL=false"
],
"table": [
"res14088207000006"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"table": {
"tableName": "sinkTable"
},
"parameter": {
"username": "root",
"password": "root@123",
"queryTimeOut": 2000,
"connection": [
{
"jdbcUrl": "jdbc:mysql://xxx:3306/chunjun?useSSL=false",
"table": [
"result"
]
}
],
"writeMode": "insert",
"column": [
{
"name": "uuid",
"type": "bigint"
},
{
"name": "xh",
"type": "varchar"
},
{
"name": "xz",
"type": "varchar"
},
{
"name": "cm",
"type": "varchar"
},
{
"name": "xm",
"type": "varchar"
},
{
"name": "xb",
"type": "varchar"
},
{
"name": "sfzhm",
"type": "varchar"
},
{
"name": "zh",
"type": "varchar"
},
{
"name": "jzje",
"type": "varchar"
},
{
"name": "bz",
"type": "varchar"
},
{
"name": "xm2",
"type": "varchar"
},
{
"name": "sfzhm2",
"type": "varchar"
},
{
"name": "ht_data_create_time",
"type": "date"
},
{
"name": "create_by",
"type": "varchar"
},
{
"name": "del_flag",
"type": "tinyint"
},
{
"name": "log_id",
"type": "varchar"
}
]
}
},
"transformer": {
"transformSql": "SELECT b.uuid, b.xh, b.xz, b.cm, b.xm , CASE WHEN b.sfzhm IS NOT NULL THEN CASE WHEN MOD(CAST(SUBSTR(b.sfzhm, 17, 1) AS INTEGER), 2) > 0 THEN 1 ELSE 2 END ELSE 9 END AS xb, CAST(b.sfzhm AS VARCHAR(255)) AS sfzhm, b.zh , if(b.jzje SIMILAR TO '^[0-9]*$', b.jzje, CAST(NULL AS INTEGER)) AS jzje , b.bz, b.xm2, b.sfzhm2, b.ht_data_create_time, b.create_by , b.del_flag, b.log_id FROM ( SELECT a.uuid, a.xh, a.xz, a.cm, a.xm , a.xb , CASE WHEN mod(CAST(substr(a.sfzhmjy, 1, 1) AS INTEGER) * 7 + CAST(substr(a.sfzhmjy, 2, 1) AS INTEGER) * 9 + CAST(substr(a.sfzhmjy, 3, 1) AS INTEGER) * 10 + CAST(substr(a.sfzhmjy, 4, 1) AS INTEGER) * 5 + CAST(substr(a.sfzhmjy, 5, 1) AS INTEGER) * 8 + CAST(substr(a.sfzhmjy, 6, 1) AS INTEGER) * 4 + CAST(substr(a.sfzhmjy, 7, 1) AS INTEGER) * 2 + CAST(substr(a.sfzhmjy, 8, 1) AS INTEGER) * 1 + CAST(substr(a.sfzhmjy, 9, 1) AS INTEGER) * 6 + CAST(substr(a.sfzhmjy, 10, 1) AS INTEGER) * 3 + CAST(substr(a.sfzhmjy, 11, 1) AS INTEGER) * 7 + CAST(substr(a.sfzhmjy, 12, 1) AS INTEGER) * 9 + CAST(substr(a.sfzhmjy, 13, 1) AS INTEGER) * 10 + CAST(substr(a.sfzhmjy, 14, 1) AS INTEGER) * 5 + CAST(substr(a.sfzhmjy, 15, 1) AS INTEGER) * 8 + CAST(substr(a.sfzhmjy, 16, 1) AS INTEGER) * 4 + CAST(substr(a.sfzhmjy, 17, 1) AS INTEGER) * 2, 11) <> CASE WHEN substr(a.sfzhmjy, 18, 1) = '1' THEN '0' WHEN substr(a.sfzhmjy, 18, 1) = '0' THEN '1' WHEN substr(a.sfzhmjy, 18, 1) IN ('X', 'x') THEN '2' WHEN substr(a.sfzhmjy, 18, 1) = '9' THEN '3' WHEN substr(a.sfzhmjy, 18, 1) = '8' THEN '4' WHEN substr(a.sfzhmjy, 18, 1) = '7' THEN '5' WHEN substr(a.sfzhmjy, 18, 1) = '6' THEN '6' WHEN substr(a.sfzhmjy, 18, 1) = '5' THEN '7' WHEN substr(a.sfzhmjy, 18, 1) = '4' THEN '8' WHEN substr(a.sfzhmjy, 18, 1) = '3' THEN '9' WHEN substr(a.sfzhmjy, 18, 1) = '2' THEN '10' END THEN NULL ELSE a.sfzhmjy END AS sfzhm, a.zh, a.jzje, a.bz, a.xm2 , a.sfzhm2, a.ht_data_create_time, a.create_by, a.del_flag, a.log_id FROM ( SELECT uuid, xh, xz, cm, xm , xb , CASE WHEN CHARACTER_LENGTH(sfzhm) = 15 THEN CONCAT(SUBSTRING(sfzhm, 1, 6), '19', SUBSTRING(sfzhm, 7, 9), SUBSTRING('10X98765432', (CAST(SUBSTRING(sfzhm, 1, 1) AS INTEGER) * 7 + CAST(SUBSTRING(sfzhm, 2, 1) AS INTEGER) * 9 + CAST(SUBSTRING(sfzhm, 3, 1) AS INTEGER) * 10 + CAST(SUBSTRING(sfzhm, 4, 1) AS INTEGER) * 5 + CAST(SUBSTRING(sfzhm, 5, 1) AS INTEGER) * 8 + CAST(SUBSTRING(sfzhm, 6, 1) AS INTEGER) * 4 + 1 * 2 + 9 * 1 + CAST(SUBSTRING(sfzhm, 7, 1) AS INTEGER) * 6 + CAST(SUBSTRING(sfzhm, 8, 1) AS INTEGER) * 3 + CAST(SUBSTRING(sfzhm, 9, 1) AS INTEGER) * 7 + CAST(SUBSTRING(sfzhm, 10, 1) AS INTEGER) * 9 + CAST(SUBSTRING(sfzhm, 11, 1) AS INTEGER) * 10 + CAST(SUBSTRING(sfzhm, 12, 1) AS INTEGER) * 5 + CAST(SUBSTRING(sfzhm, 13, 1) AS INTEGER) * 8 + CAST(SUBSTRING(sfzhm, 14, 1) AS INTEGER) * 4 + CAST(SUBSTRING(sfzhm, 15, 1) AS INTEGER) * 2) % 11 + 1, 1)) WHEN CHARACTER_LENGTH(sfzhm) = 18 THEN sfzhm ELSE sfzhm END AS sfzhmjy, zh, jzje, bz, xm2 , sfzhm2, ht_data_create_time, create_by, del_flag, log_id FROM sourceTable ) a ) b"
}
}
],
"setting": {
"restore": {
"restoreColumnName": "uuid",
"restoreColumnIndex": 0
},
"speed": {
"bytes": 0,
"readerChannel": 1,
"writerChannel": 1
}
}
}
}

Anything else

No response

Version

master

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@gongpengsheng gongpengsheng added the bug Something isn't working label May 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant