diff --git a/engine/src/main/java/org/pentaho/di/www/ExecuteTransServlet.java b/engine/src/main/java/org/pentaho/di/www/ExecuteTransServlet.java index 16eb5c34bc2f..f70397bd44da 100644 --- a/engine/src/main/java/org/pentaho/di/www/ExecuteTransServlet.java +++ b/engine/src/main/java/org/pentaho/di/www/ExecuteTransServlet.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2022 by Hitachi Vantara : http://www.pentaho.com + * Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com * ******************************************************************************* * @@ -23,16 +23,25 @@ package org.pentaho.di.www; import java.io.IOException; +import java.io.InputStream; import java.io.PrintWriter; -import java.util.Enumeration; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; import java.util.UUID; +import java.util.Map; +import java.util.HashMap; +import java.util.Enumeration; import java.util.concurrent.ExecutionException; - import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; import org.pentaho.di.core.Const; +import org.pentaho.di.core.RowMetaAndData; +import org.pentaho.di.core.exception.KettleStepException; import org.pentaho.di.core.util.Utils; import org.pentaho.di.core.encryption.Encr; import org.pentaho.di.core.exception.KettleException; @@ -51,10 +60,13 @@ import org.pentaho.di.repository.RepositoriesMeta; import org.pentaho.di.repository.RepositoryMeta; import org.pentaho.di.trans.Trans; -import org.pentaho.di.trans.TransAdapter; import org.pentaho.di.trans.TransConfiguration; import org.pentaho.di.trans.TransExecutionConfiguration; import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.RowAdapter; +import org.pentaho.di.trans.step.StepInterface; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.trans.step.StepMeta; public class ExecuteTransServlet extends BaseHttpServlet implements CartePluginInterface { @@ -69,8 +81,10 @@ public class ExecuteTransServlet extends BaseHttpServlet implements CartePluginI private static final String PASS = "pass"; private static final String TRANS = "trans"; private static final String LEVEL = "level"; + private static final String XML_REQUEST_BODY = "Xml request body"; public static final String CONTEXT_PATH = "/kettle/executeTrans"; + private boolean isPostCall = false; public ExecuteTransServlet() { } @@ -81,280 +95,174 @@ public ExecuteTransServlet( TransformationMap transformationMap ) { /** -
-

/kettle/executeTrans

- -

GET

-

Executes transformation from the specified repository. - Connects to the repository provided as a parameter, loads the transformation from it and executes it. - Empty response is returned or response contains output of an error happened during the transformation execution. - Response contains ERROR result if error happened during transformation execution.

- -

Example Request:
-

-    GET /kettle/executeTrans/?rep=my_repository&user=my_user&pass=my_password&trans=my_trans&level=INFO
-    
- -

-

Parameters

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
namedescriptiontype
repRepository id to connect to.query
userUser name to be used to connect to repository.query
passUser password to be used to connect to repository.query
transTransfromation name to be loaded and executed.query
levelLogging level to be used for transformation execution (i.e. Debug).query
*any name*All the other parameters will be sent to the transformation for using as variables. - When necessary you can add custom parameters to the request. - They will be used to set the transformation variables values..query
- -

Response Body

- - - - - - - - - - - - -
element:(custom)
media types:application/xml
-

Response contains error output of the transformation executed or nothing - if the execution was successful.

- -

Example Error Response:

-
-  
-    ERROR
-    Unexpected error executing the transformation:
-    
org.pentaho.di.core.exception.KettleException:
-    
Unable to find transformation 'dummy-trans.ktr' in directory
-    :/home/admin

 at
-    org.pentaho.di.www.ExecuteTransServlet.loadTransformation(ExecuteTransServlet.java:214)

-    at org.pentaho.di.www.ExecuteTransServlet.doGet(ExecuteTransServlet.java:104)

-    at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)

-    at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)

-    at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)

-    at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:390)

-    at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)

-    at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:765)

-    at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)

-    at org.mortbay.jetty.handler.HandlerCollection.handle(HandlerCollection.java:114)

-    at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)

-    at org.mortbay.jetty.Server.handle(Server.java:326)

-    at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:536)

-    at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:915)

-    at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:539)

-    at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)

-    at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:405)

-    at org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:228)

-    at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

-    
-    
-  
-    
- -

Status Codes

- - - - - - - - - - - - - - - - - - - - - - - - - - - -
codedescription
200Request was processed.
400When missing mandatory param trans
401When authentication to repository fails
404When transformation is not found
500Internal server error occurs during request processing.
-
- */ +
+

/kettle/executeTrans

+ +

GET

+

Executes transformation from the specified repository. + Connects to the repository provided as a parameter, loads the transformation from it and executes it. + Empty response is returned or response contains output of an error happened during the transformation execution. + Response contains ERROR result if error happened during transformation execution.

+ +

Example Request:
+

+   GET /kettle/executeTrans/?rep=my_repository&user=my_user&pass=my_password&trans=my_trans&level=INFO
+   
+ +

+

Parameters

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
namedescriptiontype
repRepository id to connect to.query
userUser name to be used to connect to repository.query
passUser password to be used to connect to repository.query
transTransfromation name to be loaded and executed.query
levelLogging level to be used for transformation execution (i.e. Debug).query
*any name*All the other parameters will be sent to the transformation for using as variables. + When necessary you can add custom parameters to the request. + They will be used to set the transformation variables values..query
+ +

Response Body

+ + + + + + + + + + + + +
element:(custom)
media types:application/xml
+

Response contains error output of the transformation executed or nothing + if the execution was successful.

+ +

Example Error Response:

+
+   
+   ERROR
+   Unexpected error executing the transformation:
+   
org.pentaho.di.core.exception.KettleException:
+   
Unable to find transformation 'dummy-trans.ktr' in directory
+   :/home/admin

 at
+   org.pentaho.di.www.ExecuteTransServlet.loadTransformation(ExecuteTransServlet.java:214)

+   at org.pentaho.di.www.ExecuteTransServlet.doGet(ExecuteTransServlet.java:104)

+   at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)

+   at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)

+   at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)

+   at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:390)

+   at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)

+   at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:765)

+   at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)

+   at org.mortbay.jetty.handler.HandlerCollection.handle(HandlerCollection.java:114)

+   at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)

+   at org.mortbay.jetty.Server.handle(Server.java:326)

+   at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:536)

+   at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:915)

+   at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:539)

+   at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)

+   at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:405)

+   at org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:228)

+   at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

+   
+   
+   
+   
+ +

Status Codes

+ + + + + + + + + + + + + + + + + + + + + + + + + + + +
codedescription
200Request was processed.
400When missing mandatory param trans
401When authentication to repository fails
404When transformation is not found
500Internal server error occurs during request processing.
+
+ */ + @Override public void doGet( HttpServletRequest request, HttpServletResponse response ) throws ServletException, - IOException { + IOException { if ( isJettyMode() && !request.getContextPath().startsWith( CONTEXT_PATH ) ) { return; } if ( log.isDebug() ) { - logDebug( BaseMessages.getString( PKG, "ExecuteTransServlet.Log.ExecuteTransRequested" ) ); + logDebug( BaseMessages.getString( PKG, "ExecuteTransServlet.Log.ExecuteTransGetCallRequested" ) ); } // Options taken from PAN // - String[] knownOptions = new String[] { REP, USER, PASS, TRANS, LEVEL }; - String repOption = request.getParameter( REP ); String userOption = request.getParameter( USER ); String passOption = Encr.decryptPasswordOptionallyEncrypted( request.getParameter( PASS ) ); String transOption = request.getParameter( TRANS ); - String levelOption = request.getParameter( LEVEL ); response.setStatus( HttpServletResponse.SC_OK ); - - String encoding = System.getProperty( "KETTLE_DEFAULT_SERVLET_ENCODING", null ); - if ( encoding != null && !Utils.isEmpty( encoding.trim() ) ) { - response.setCharacterEncoding( encoding ); - response.setContentType( "text/html; charset=" + encoding ); - } - - PrintWriter out = response.getWriter(); - if ( transOption == null ) { - response.setStatus( HttpServletResponse.SC_BAD_REQUEST ); - out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( - PKG, "ExecuteTransServlet.Error.MissingMandatoryParameter", TRANS ) ) ); + sendBadRequest( response, TRANS ); return; } try { - final Repository repository = openRepository( repOption, userOption, passOption ); final TransMeta transMeta = loadTransformation( repository, transOption ); - - // Set the servlet parameters as variables in the transformation - // - String[] parameters = transMeta.listParameters(); - Enumeration parameterNames = request.getParameterNames(); - while ( parameterNames.hasMoreElements() ) { - String parameter = (String) parameterNames.nextElement(); - String[] values = request.getParameterValues( parameter ); - - // Ignore the known options. set the rest as variables - // - if ( Const.indexOfString( parameter, knownOptions ) < 0 ) { - // If it's a trans parameter, set it, otherwise simply set the variable - // - if ( Const.indexOfString( parameter, parameters ) < 0 ) { - transMeta.setVariable( parameter, values[0] ); - } else { - transMeta.setParameterValue( parameter, values[0] ); - } - } - } - - TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration(); - LogLevel logLevel = LogLevel.getLogLevelForCode( levelOption ); - transExecutionConfiguration.setLogLevel( logLevel ); - TransConfiguration transConfiguration = new TransConfiguration( transMeta, transExecutionConfiguration ); - - String carteObjectId = UUID.randomUUID().toString(); - SimpleLoggingObject servletLoggingObject = - new SimpleLoggingObject( CONTEXT_PATH, LoggingObjectType.CARTE, null ); - servletLoggingObject.setContainerObjectId( carteObjectId ); - servletLoggingObject.setLogLevel( logLevel ); - - // Create the transformation and store in the list... - // - final Trans trans = new Trans( transMeta, servletLoggingObject ); - - trans.setRepository( repository ); - trans.setSocketRepository( getSocketRepository() ); - - getTransformationMap().addTransformation( transMeta.getName(), carteObjectId, trans, transConfiguration ); - trans.setContainerObjectId( carteObjectId ); - - if ( repository != null ) { - // The repository connection is open: make sure we disconnect from the repository once we - // are done with this transformation. - // - trans.addTransListener( new TransAdapter() { - @Override public void transFinished( Trans trans ) { - repository.disconnect(); - } - } ); - } - - // Pass the servlet print writer to the transformation... - // - trans.setServletPrintWriter( out ); - trans.setServletReponse( response ); - trans.setServletRequest( request ); - - try { - // Execute the transformation... - // - executeTrans( trans ); - String logging = KettleLogStore.getAppender().getBuffer( trans.getLogChannelId(), false ).toString(); - if ( trans.isFinishedOrStopped() && trans.getErrors() > 0 ) { - response.setStatus( HttpServletResponse.SC_INTERNAL_SERVER_ERROR ); - out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( - PKG, "ExecuteTransServlet.Error.ErrorExecutingTrans", logging ) ) ); - } - out.flush(); - } catch ( Exception executionException ) { - String logging = KettleLogStore.getAppender().getBuffer( trans.getLogChannelId(), false ).toString(); - throw new KettleException( BaseMessages.getString( PKG, "ExecuteTransServlet.Error.ErrorExecutingTrans", logging ), executionException ); - } + executeTransformation( transMeta, request, response, isPostCall ); } catch ( Exception ex ) { - // When we get to this point KettleAuthenticationException has already been wrapped in an Execution Exception - // and that in a KettleException - Throwable kettleExceptionCause = ex.getCause(); - if ( kettleExceptionCause != null && kettleExceptionCause instanceof ExecutionException ) { - Throwable executionExceptionCause = kettleExceptionCause.getCause(); - if ( executionExceptionCause != null && executionExceptionCause instanceof KettleAuthenticationException ) { - response.setStatus( HttpServletResponse.SC_UNAUTHORIZED ); - out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( - PKG, "ExecuteTransServlet.Error.Authentication", getContextPath() ) ) ); - } - } else if ( ex.getMessage().contains( UNABLE_TO_FIND_TRANS ) ) { - response.setStatus( HttpServletResponse.SC_NOT_FOUND ); - out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( - PKG, "ExecuteTransServlet.Error.UnableToFindTransformation", transOption ) ) ); - } else { - response.setStatus( HttpServletResponse.SC_INTERNAL_SERVER_ERROR ); - out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( - PKG, "ExecuteTransServlet.Error.UnexpectedError", Const.CR + Const.getStackTracker( ex ) ) ) ); - } + handleExecuteTransError( ex, response, transOption ); } } @@ -383,7 +291,7 @@ private TransMeta loadTransformation( Repository repository, String trans ) thro name = trans.substring( lastSlash + 1 ); } RepositoryDirectoryInterface directory = - repository.loadRepositoryDirectoryTree().findDirectory( directoryPath ); + repository.loadRepositoryDirectoryTree().findDirectory( directoryPath ); if ( directory == null ) { throw new KettleException( "Unable to find directory path '" + directoryPath + "' in the repository" ); } @@ -392,8 +300,7 @@ private TransMeta loadTransformation( Repository repository, String trans ) thro if ( transformationID == null ) { throw new KettleException( "Unable to find transformation '" + name + "' in directory :" + directory ); } - TransMeta transMeta = repository.loadTransformation( transformationID, null ); - return transMeta; + return repository.loadTransformation( transformationID, null ); } } @@ -424,14 +331,213 @@ public String getService() { return CONTEXT_PATH + " (" + toString() + ")"; } - protected void executeTrans( Trans trans ) throws KettleException { + protected Map> executeTrans( Trans trans, Map> previewDataMap ) throws KettleException { trans.prepareExecution( null ); + Map> previewDataMapUpdated = null; + if ( isPostCall ) { + previewDataMapUpdated = capturePreviewData( trans, trans.getTransMeta().getSteps(), previewDataMap ); + } trans.startThreads(); trans.waitUntilFinished(); + return previewDataMapUpdated; } public String getContextPath() { return CONTEXT_PATH; } + @Override + protected void doPost( HttpServletRequest request, HttpServletResponse response ) throws ServletException, + IOException { + if ( isJettyMode() && !request.getContextPath().startsWith( CONTEXT_PATH ) ) { + return; + } + if ( log.isDebug() ) { + logDebug( BaseMessages.getString( PKG, "ExecuteTransServlet.Log.ExecuteTransPostCallRequested" ) ); + } + isPostCall = true; + response.setStatus( HttpServletResponse.SC_OK ); + InputStream requestInputStream = request.getInputStream(); + if ( requestInputStream == null ) { + sendBadRequest( response, XML_REQUEST_BODY ); + return; + } + TransMeta transMeta = null; + try { + transMeta = new TransMeta( requestInputStream, null, true, null, null ); + executeTransformation( transMeta, request, response, isPostCall ); + + } catch ( Exception ex ) { + handleExecuteTransError( ex, response, transMeta.getFilename() ); + } + } + + private void sendBadRequest( HttpServletResponse response, String parameterName ) throws IOException { + response.setStatus( HttpServletResponse.SC_BAD_REQUEST ); + PrintWriter out = response.getWriter(); + out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( PKG, "ExecuteTransServlet.Error.MissingMandatoryParameter", parameterName ) ) ); + } + + private void handleExecuteTransError( Exception ex, HttpServletResponse response, String transName ) throws IOException { + PrintWriter out = response.getWriter(); + // When we get to this point KettleAuthenticationException has already been wrapped in an Execution Exception + // and that in a KettleException + Throwable kettleExceptionCause = ex.getCause(); + if ( kettleExceptionCause instanceof ExecutionException ) { + Throwable executionExceptionCause = kettleExceptionCause.getCause(); + if ( executionExceptionCause instanceof KettleAuthenticationException ) { + response.setStatus( HttpServletResponse.SC_UNAUTHORIZED ); + out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( + PKG, "ExecuteTransServlet.Error.Authentication", getContextPath() ) ) ); + } + } else if ( ex.getMessage().contains( UNABLE_TO_FIND_TRANS ) ) { + response.setStatus( HttpServletResponse.SC_NOT_FOUND ); + out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( + PKG, "ExecuteTransServlet.Error.UnableToFindTransformation", transName ) ) ); + } else { + response.setStatus( HttpServletResponse.SC_INTERNAL_SERVER_ERROR ); + out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( + PKG, "ExecuteTransServlet.Error.UnexpectedError", Const.CR + Const.getStackTracker( ex ) ) ) ); + } + } + + void executeTransformation( TransMeta transMeta, HttpServletRequest request, HttpServletResponse response, boolean isPostCall ) throws KettleException, IOException { + PrintWriter out = response.getWriter(); + response.setContentType( "application/json" ); + + String encoding = System.getProperty( "KETTLE_DEFAULT_SERVLET_ENCODING", null ); + if ( encoding != null && !Utils.isEmpty( encoding.trim() ) ) { + response.setCharacterEncoding( encoding ); + response.setContentType( "text/html; charset=" + encoding ); + } + String[] knownOptions = new String[] { REP, USER, PASS, TRANS, LEVEL }; + // Options taken from PAN + String levelOption = request.getParameter( LEVEL ); + // Set the servlet parameters as variables in the transformation + // + String[] parameters = transMeta.listParameters(); + Enumeration parameterNames = request.getParameterNames(); + while ( parameterNames.hasMoreElements() ) { + String parameter = (String) parameterNames.nextElement(); + String[] values = request.getParameterValues( parameter ); + + // Ignore the known options. set the rest as variables + // + if ( Const.indexOfString( parameter, knownOptions ) < 0 ) { + // If it's a trans parameter, set it, otherwise simply set the variable + // + if ( Const.indexOfString( parameter, parameters ) < 0 ) { + transMeta.setVariable( parameter, values[0] ); + } else { + transMeta.setParameterValue( parameter, values[0] ); + } + } + } + + TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration(); + LogLevel logLevel = LogLevel.getLogLevelForCode( levelOption ); + transExecutionConfiguration.setLogLevel( logLevel ); + TransConfiguration transConfiguration = new TransConfiguration( transMeta, transExecutionConfiguration ); + + String carteObjectId = UUID.randomUUID().toString(); + SimpleLoggingObject servletLoggingObject = + new SimpleLoggingObject( CONTEXT_PATH, LoggingObjectType.CARTE, null ); + servletLoggingObject.setContainerObjectId( carteObjectId ); + servletLoggingObject.setLogLevel( logLevel ); + + // Create the transformation and store in the list... + // + final Trans trans = new Trans( transMeta, servletLoggingObject ); + + trans.setSocketRepository( getSocketRepository() ); + + getTransformationMap().addTransformation( transMeta.getName(), carteObjectId, trans, transConfiguration ); + trans.setContainerObjectId( carteObjectId ); + trans.setServletPrintWriter( out ); + trans.setServletReponse( response ); + trans.setServletRequest( request ); + + try { + // Execute the transformation... + // + Map> previewDataMap = executeTrans( trans, new HashMap<>() ); + String logging = KettleLogStore.getAppender().getBuffer( trans.getLogChannelId(), false ).toString(); + if ( trans.isFinishedOrStopped() && trans.getErrors() > 0 ) { + response.setStatus( HttpServletResponse.SC_INTERNAL_SERVER_ERROR ); + out.println( new WebResult( WebResult.STRING_ERROR, BaseMessages.getString( + PKG, "ExecuteTransServlet.Error.ErrorExecutingTrans", logging ) ) ); + } + if ( isPostCall ) { + writeToJson( out, transMeta, carteObjectId, previewDataMap ); + } + out.flush(); + } catch ( Exception executionException ) { + String logging = KettleLogStore.getAppender().getBuffer( trans.getLogChannelId(), false ).toString(); + throw new KettleException( BaseMessages.getString( PKG, "ExecuteTransServlet.Error.ErrorExecutingTrans", logging ), executionException ); + } + } + + private void writeToJson( PrintWriter out, TransMeta transMeta, String carteObjectId, Map> previewDataMap ) { + JSONObject finalJsonOutput = new JSONObject(); + finalJsonOutput.put( "carteId", carteObjectId ); + + JSONArray previewJson = new JSONArray(); + + for ( StepMeta stepMeta : transMeta.getSteps() ) { + + JSONObject stepJSON = new JSONObject(); + stepJSON.put( "stepName", stepMeta.getName() ); + + List rowMetaAndDataList = previewDataMap.get( stepMeta ); + String[] columnNames = rowMetaAndDataList.get( 0 ).getRowMeta().getFieldNames(); + + stepJSON.put( "columnInfo", Arrays.asList( columnNames ) ); + JSONArray dataArray = new JSONArray(); + String data = ""; + try { + for ( RowMetaAndData rowMetaAndData : rowMetaAndDataList ) { + Object[] rowData = rowMetaAndData.getData(); + RowMetaInterface rowMeta = rowMetaAndData.getRowMeta(); + JSONArray datarowArray = new JSONArray(); + + for ( int column = 0; column < columnNames.length; column++ ) { + data = rowMeta.getString( rowData, column ); + datarowArray.add( data ); + } + JSONObject dataRow = new JSONObject(); + dataRow.put( "data", datarowArray ); + dataArray.add( dataRow ); + } + stepJSON.put( "rows", dataArray ); + } catch ( Exception e ) { + data = "Conversion error: " + e.getMessage(); + } + + previewJson.add( stepJSON ); + + } + finalJsonOutput.put( "previewData", previewJson ); + out.println( finalJsonOutput ); + } + + public Map> capturePreviewData( final Trans trans, List stepMetas, Map> previewDataMap ) { + + previewDataMap.clear(); + for ( StepMeta stepMeta : stepMetas ) { + StepInterface step = trans.findRunThread( stepMeta.getName() ); + final List rowsData = new ArrayList<>(); + previewDataMap.put( stepMeta, rowsData ); + step.addRowListener( new RowAdapter() { + public void rowWrittenEvent( RowMetaInterface rowMeta, Object[] row ) throws KettleStepException { + try { + rowsData.add( new RowMetaAndData( rowMeta, rowMeta.cloneRow( row ) ) ); + } catch ( Exception e ) { + throw new KettleStepException( "Unable to clone row for metadata : " + rowMeta, e ); + } + } + } ); + } + return previewDataMap; + } + } diff --git a/engine/src/main/java/org/pentaho/di/www/GetDatabaseDetailsServlet.java b/engine/src/main/java/org/pentaho/di/www/GetDatabaseDetailsServlet.java new file mode 100644 index 000000000000..832c38d5a9c2 --- /dev/null +++ b/engine/src/main/java/org/pentaho/di/www/GetDatabaseDetailsServlet.java @@ -0,0 +1,104 @@ +/*! ****************************************************************************** + * + * Pentaho Data Integration + * + * Copyright (C) 2024 by Hitachi Vantara : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.di.www; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.pentaho.di.core.database.DatabaseInterface; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.core.exception.KettlePluginException; +import org.pentaho.di.core.plugins.DatabasePluginType; +import org.pentaho.di.core.plugins.PluginInterface; +import org.pentaho.di.core.plugins.PluginRegistry; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; + + +public class GetDatabaseDetailsServlet extends BaseHttpServlet implements CartePluginInterface { + + private static Class PKG = GetDatabaseDetailsServlet.class; // for i18n purposes, needed by Translator2!! + + private static final long serialVersionUID = 3634806745372015720L; + + public static final String CONTEXT_PATH = "/kettle/registeredDatabasePlugins"; + + public GetDatabaseDetailsServlet() { + } + + public void doGet( HttpServletRequest request, HttpServletResponse response ) throws ServletException, + IOException { + + if ( isJettyMode() && !request.getContextPath().startsWith( CONTEXT_PATH ) ) { + return; + } + + response.setStatus( HttpServletResponse.SC_OK ); + response.setCharacterEncoding( "UTF-8" ); + response.setContentType( "application/json;charset=UTF-8" ); + JSONArray previewJson = new JSONArray(); + + PrintWriter out = response.getWriter(); + PluginRegistry registry = PluginRegistry.getInstance(); + List stepPlugins = registry.getPlugins( DatabasePluginType.class ); + + for ( PluginInterface pluginInterface : stepPlugins ) { + if ( pluginInterface.getIds().length > 0 ) { + JSONObject dbJson = new JSONObject(); + dbJson.put( "id", pluginInterface.getIds()[0] ); + dbJson.put( "name", pluginInterface.getName() ); + try { + DatabaseInterface db = (DatabaseInterface) registry.loadClass( pluginInterface ); + JSONArray accessTypes = new JSONArray(); + for ( int id : db.getAccessTypeList() ) { + accessTypes.add( DatabaseMeta.getAccessTypeDesc( id ) ); + } + dbJson.put( "accessTypes", accessTypes ); + } catch ( KettlePluginException e ) { + throw new ServletException( e.getMessage() ); + } + previewJson.add( dbJson ); + } + } + + JSONObject finalJsonOutput = new JSONObject(); + finalJsonOutput.put( "plugins", previewJson ); + out.print( finalJsonOutput ); + } + + public String toString() { + return "Registered Plugins Handler"; + } + + public String getService() { + return CONTEXT_PATH + " (" + toString() + ")"; + } + + public String getContextPath() { + return CONTEXT_PATH; + } +} diff --git a/engine/src/main/java/org/pentaho/di/www/GetRegisteredStepsServlet.java b/engine/src/main/java/org/pentaho/di/www/GetRegisteredStepsServlet.java new file mode 100644 index 000000000000..81a44707b4c6 --- /dev/null +++ b/engine/src/main/java/org/pentaho/di/www/GetRegisteredStepsServlet.java @@ -0,0 +1,126 @@ +/*! ****************************************************************************** + * + * Pentaho Data Integration + * + * Copyright (C) 2024 by Hitachi Vantara : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package org.pentaho.di.www; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.pentaho.di.core.exception.KettlePluginException; +import org.pentaho.di.core.plugins.PluginInterface; +import org.pentaho.di.core.plugins.PluginRegistry; +import org.pentaho.di.core.plugins.StepPluginType; +import org.pentaho.di.trans.step.StepMetaInterface; +import org.pentaho.di.www.cache.CarteStatusCache; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; + + +public class GetRegisteredStepsServlet extends BaseHttpServlet implements CartePluginInterface { + + private static Class PKG = GetRegisteredStepsServlet.class; // for i18n purposes, needed by Translator2!! + + private static final long serialVersionUID = 3634806745372015720L; + + public static final String CONTEXT_PATH = "/kettle/registeredSteps"; + + private static final String PLUGIN_TYPE = "pluginType"; + + @VisibleForTesting + CarteStatusCache cache = CarteStatusCache.getInstance(); + + public GetRegisteredStepsServlet() { + } + + public GetRegisteredStepsServlet( TransformationMap transformationMap ) { + super( transformationMap ); + } + + public void doGet( HttpServletRequest request, HttpServletResponse response ) throws ServletException, + IOException { + + if ( isJettyMode() && !request.getContextPath().startsWith( CONTEXT_PATH ) ) { + return; + } + response.setStatus( HttpServletResponse.SC_OK ); + + response.setCharacterEncoding( "UTF-8" ); + response.setContentType( "application/json;charset=UTF-8" ); + String pluginType = request.getParameter( PLUGIN_TYPE ); + + PrintWriter out = response.getWriter(); + PluginRegistry registry = PluginRegistry.getInstance(); + if ( request.getParameter( "stepId" ) == null ) { + writeJson( pluginType, out, registry ); + } else { + String stepId = request.getParameter( "stepId" ); + PluginInterface pluginInterface = registry.getPlugin( StepPluginType.class, stepId ); + try { + StepMetaInterface meta = (StepMetaInterface) registry.loadClass( pluginInterface ); + out.print( new ObjectMapper().writeValueAsString( meta ) ); + } catch ( KettlePluginException e ) { + e.printStackTrace(); + } + + } + } + + private void writeJson( String pluginType, PrintWriter out, PluginRegistry registry ) throws JsonProcessingException { + + JSONArray previewJson = new JSONArray(); + List stepPlugins = pluginType == null ? registry.getPlugins( StepPluginType.class ) + : registry.getPlugins( registry.getPluginTypes().stream() + .filter( x -> x.getName().contains( pluginType ) ) + .findAny().get() ); + for ( PluginInterface pluginInterface : stepPlugins ) { + if ( pluginInterface.getIds().length > 0 ) { + JSONObject stepJson = new JSONObject(); + stepJson.put( "id", pluginInterface.getIds()[0] ); + stepJson.put( "name", pluginInterface.getName() ); + stepJson.put( "category", pluginInterface.getCategory() ); + previewJson.add( stepJson ); + } + } + JSONObject finalJsonOutput = new JSONObject(); + finalJsonOutput.put( "plugins", previewJson ); + out.print( finalJsonOutput ); + } + + public String toString() { + return "Registered Plugins Handler"; + } + + public String getService() { + return CONTEXT_PATH + " (" + toString() + ")"; + } + + public String getContextPath() { + return CONTEXT_PATH; + } +} diff --git a/engine/src/main/resources/kettle-servlets.xml b/engine/src/main/resources/kettle-servlets.xml index 21c7456f21f8..304cbaa50ec3 100644 --- a/engine/src/main/resources/kettle-servlets.xml +++ b/engine/src/main/resources/kettle-servlets.xml @@ -43,8 +43,9 @@ Add a job to the server org.pentaho.di.www.RegisterJobServlet Add a transformation to the server org.pentaho.di.www.RegisterTransServlet Upload a resources export file org.pentaho.di.www.RegisterPackageServlet - - + Get list of registered Database Plugins org.pentaho.di.www.GetDatabaseDetailsServlet + Get list of registered Step Plugins org.pentaho.di.www.GetRegisteredStepsServlet + Register a slave server org.pentaho.di.www.RegisterSlaveServlet diff --git a/engine/src/test/java/org/pentaho/di/www/ExecuteTransServletTest.java b/engine/src/test/java/org/pentaho/di/www/ExecuteTransServletTest.java index fa3db15f3501..ea9657ba5307 100644 --- a/engine/src/test/java/org/pentaho/di/www/ExecuteTransServletTest.java +++ b/engine/src/test/java/org/pentaho/di/www/ExecuteTransServletTest.java @@ -52,6 +52,7 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.concurrent.ExecutionException; import static org.mockito.Mockito.doThrow; @@ -184,7 +185,7 @@ public void doGetTransformationNotFoundTest() throws Exception { HttpServletRequest mockHttpServletRequest = mock( HttpServletRequest.class ); HttpServletResponse mockHttpServletResponse = mock( HttpServletResponse.class ); Trans trans = initMocksForTransExecution( mockHttpServletRequest, mockHttpServletResponse ); - doThrow( new KettleException( "Unable to find transformation" ) ).when( executeTransServlet ).executeTrans( trans ); + doThrow( new KettleException( "Unable to find transformation" ) ).when( executeTransServlet ).executeTrans( trans, new HashMap<>()); executeTransServlet.doGet( mockHttpServletRequest, mockHttpServletResponse );