Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for JDBC & resequencing message stores #89

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.synapse.message.store.InMemoryMessageStore;
import org.apache.synapse.message.store.MessageStore;
import org.apache.axis2.util.JavaUtils;
import org.apache.synapse.util.xpath.SynapseXPath;
import org.jaxen.JaxenException;


import javax.xml.XMLConstants;
Expand Down Expand Up @@ -56,6 +58,7 @@ public class MessageStoreFactory {
public static final QName CLASS_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "class");
public static final QName NAME_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "name");
public static final QName SEQUENCE_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
public static final QName EXPRESSION_Q = new QName(XMLConfigConstants.NULL_NAMESPACE, "expression");

public static final QName PARAMETER_Q = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
"parameter");
Expand Down Expand Up @@ -98,7 +101,6 @@ public static MessageStore createMessageStore(OMElement elem, Properties propert
messageStore.setParameters(getParameters(elem));



log.info("Successfully created Message Store: " + nameAtt.getAttributeValue());
return messageStore;
}
Expand All @@ -118,6 +120,7 @@ private static Map<String, Object> getParameters(OMElement elem) {
if (paramValue != null) {
parameters.put(paramName.getAttributeValue(), paramValue);
}
processExpressionIfExist(prop, parameters);
} else {
handleException("Invalid MessageStore parameter - Parameter must have a name ");
}
Expand All @@ -126,6 +129,45 @@ private static Map<String, Object> getParameters(OMElement elem) {
return parameters;
}

/**
* If an expression is defined in property, it'll be extracted and populated
*
* @param prop xml element read from the synapse parameter. This should be a != null value.
* @param params list of processed params from the XML.
* @return true if an expression is processed
*/
private static boolean processExpressionIfExist(OMElement prop, Map<String, Object> params) {
try {
OMAttribute expression = prop.getAttribute(EXPRESSION_Q);
if (null != expression) {
SynapseXPath synapseXPath = SynapseXPathFactory.getSynapseXPath(prop, EXPRESSION_Q);
registerParameter(params, prop.getAttribute(NAME_Q), synapseXPath);
return true;
}
} catch (JaxenException e) {
handleException("Error while extracting parameter : " + e.getMessage());
}
return false;
}

/**
* Register the extracted parameter in the list.
*
* @param parameters the list of parameters which should be registered.
* @param paramName the name of the parameter.
* @param paramValue the value of the parameter.
* @param <T> the type of the parameter.
*/
private static <T> void registerParameter(Map<String, Object> parameters,
OMAttribute paramName,
T paramValue) {
if (paramName != null && paramValue != null) {
parameters.put(paramName.getAttributeValue(), paramValue);
} else {
handleException("Invalid MessageStore parameter - Parameter must have a name ");
}
}

private static void handleException(String msg) {
log.error(msg);
throw new SynapseException(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.apache.synapse.SynapseException;
import org.apache.synapse.message.store.InMemoryMessageStore;
import org.apache.synapse.message.store.MessageStore;
import org.apache.synapse.util.xpath.SynapseXPath;

import javax.xml.namespace.QName;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

/**
* Serialize an instance of the given Message Store, and sets properties on it.
Expand Down Expand Up @@ -74,16 +77,11 @@ public static OMElement serializeMessageStore(OMElement parent, MessageStore mes
Iterator iter = messageStore.getParameters().keySet().iterator();
while (iter.hasNext()) {
String name = (String) iter.next();
String value = (String) messageStore.getParameters().get(name);
OMElement property = fac.createOMElement("parameter", synNS);
property.addAttribute(fac.createOMAttribute(
"name", nullNS, name));
property.setText(value.trim());
OMElement property = getParameter(messageStore, name);
store.addChild(property);
}
}


if (getSerializedDescription(messageStore) != null) {
store.addChild(getSerializedDescription(messageStore));
}
Expand All @@ -94,6 +92,39 @@ public static OMElement serializeMessageStore(OMElement parent, MessageStore mes
return store;
}


/**
* Will get the parameter OMElement.
*
* @param messageStore the message store definition metadata.
* @param name the parameter key.
* @return the parameter OMElement.
*/
private static OMElement getParameter(MessageStore messageStore, String name) {
Object paramValue = messageStore.getParameters().get(name);
OMElement property = null;
if (paramValue instanceof String) {
String value = (String) paramValue;
property = fac.createOMElement("parameter", synNS);
property.addAttribute(fac.createOMAttribute("name", nullNS, name));
property.setText(value.trim());
} else if (paramValue instanceof SynapseXPath) {
SynapseXPath value = (SynapseXPath) paramValue;
String expression = value.toString();
Map namespaces = value.getNamespaces();
property = fac.createOMElement("parameter", synNS);
property.addAttribute(fac.createOMAttribute("name", nullNS, name));
property.addAttribute(fac.createOMAttribute("expression", nullNS, expression));
Set<Map.Entry<String, String>> nameSpaceAttributes = namespaces.entrySet();
for (Map.Entry<String, String> nameSpaceElement : nameSpaceAttributes) {
String prefix = nameSpaceElement.getKey();
String uri = nameSpaceElement.getValue();
property.declareNamespace(uri, prefix);
}
}
return property;
}

private static OMElement getSerializedDescription(MessageStore messageStore) {
OMElement descriptionElem = fac.createOMElement(
new QName(SynapseConstants.SYNAPSE_NAMESPACE, "description"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.message;

import org.apache.synapse.MessageContext;

public interface MessageConsumer {
/**
* Receives the next message from the store.
*
* @return Synapse message context of the last message received from the store.
*/
MessageContext receive();

/**
* Acknowledges the last message received so that it will be removed from the store.
*
* @return {@code true} if the acknowledgement is successful. {@code false} otherwise.
*/
boolean ack();

/**
* Cleans up this message consumer
*
* @return {@code true} if cleanup is successful, {@code false} otherwise.
*/
boolean cleanup();

/**
* Check availability of connectivity with the message store
*
* @return {@code true} if connection available, {@code false} otherwise.
*/
boolean isAlive();

/**
* Sets the ID of this message consumer.
*
* @param i ID
*/
public void setId(int i);

/**
* Returns the ID of this Message consumer.
*
* @return ID
*/
public String getId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.message;

import org.apache.synapse.MessageContext;

public interface MessageProducer {
/**
* Stores the given message to the store associated with this message consumer.
*
* @param synCtx Message to be saved.
* @return {@code true} if storing of the message is successful, {@code false} otherwise.
*/
boolean storeMessage(MessageContext synCtx);

/**
* Cleans up this message consumer
*
* @return {@code true} if clean up is successful, {@code false} otherwise.
*/
boolean cleanup();

/**
* Sets the ID of this message consumer.
*
* @param id ID
*/
public void setId(int id);

/**
* Returns the ID of this message consumer.
*
* @return ID
*/
public String getId();
}
Loading