About us
Products
Services
Articles
Contact us

14. Store Connectors

Contents | Previous Chapter | Next Chapter

The store connectors are components that communicate with a message store. A connector is loaded dynamically by MonitorStarter, but MailMonitor is the class that invokes the connector's methods.


14.1. StoreConnector

The StoreConnector class defines an initialize() method, declares three abstract methods: connect(), process() and disconnect() and implements a helper method, processMessage().

The processMessage() method creates a bean object, maps the content of an input stream to that object and invokes the processing method of a processor object.

The StoreConnector's subclasses described in the next sections are demonstrative versions. You might have to adapt them in order to perform some additional tasks such as the backup and the deletion of the processed messages.

StoreConnector.java:

package com.devsphere.apps.mapping.monitor;

import com.devsphere.helpers.logging.*;

import com.devsphere.logging.*;
import com.devsphere.mapping.*;

import java.io.*;
import java.util.*;
import java.lang.reflect.*;

/**
 * Abstract connector to a message store
 */
public abstract class StoreConnector {
    protected Class beanClass;
    protected Object processor;
    protected Method method;
    protected boolean debug;
    protected AbstractLogger logger;
    protected ApplicationErrorHandler errorHandler;

    /**
     * Initializes the store connector
     */
    public void initialize(Class beanClass, Object processor, Method method,
        boolean debug, AbstractLogger logger, ApplicationErrorHandler errorHandler) {
        this.beanClass = beanClass;
        this.processor = processor;
        this.method = method;
        this.debug = debug;
        this.logger = logger;
        this.errorHandler = errorHandler;
    }

    /**
     * Tries to establish a connection and returns true succeeds.
     */
    public abstract boolean connect(String protocol, String host, String user,
        String password, String folderName);

    /**
     * Gets and processes the new messages
     */
    public abstract void process();

    /**
     * Tries to close the connection.
     * Should be called even if connect() failed.
     */
    public abstract void disconnect();

    /**
     * Maps the text of a message to a bean object and passes the object
     * to the processing method
     */
    protected void processMessage(InputStream in) {
        Object beanObject = textToBean(in);
        if (beanObject != null)
            processBean(beanObject);
    }

    /**
     * Converts a set of name-value pairs to a bean object.
     * May return null if an error occurs.
     */
    private Object textToBean(InputStream in) {
        // Create the bean object
        Object beanObject = null;
        try {
            beanObject = beanClass.newInstance();
        } catch (Exception t) {
            errorHandler.fatalError("[COULDNT_INSTANTIATE_BEAN_CLASS]", t,
                beanClass.getName());
        }

        // Text-to-Bean mapping
        if (beanObject != null)
            try {
                Hashtable errorTable
                    = TextUtils.textToBean(in, beanObject, logger);
                if (errorTable != null)
                    errorHandler.error("[MAPPING_ERRORS]", null,
                        errorTable.toString());
            } catch (IOException e) {
                errorHandler.fatalError("[COULDNT_READ_BEAN_DATA]", e);
            }

        return beanObject;
    }

    /**
     * Calls the processing method
     */
    private void processBean(Object beanObject) {
        // Invoke the processing method
        try {
            method.invoke(processor, new Object[] { beanObject } );
        } catch (InvocationTargetException e) {
            Throwable t = e.getTargetException();
            if (t instanceof Exception)
                errorHandler.error("[PROCESSING_ERROR]", (Exception) t);
            else if (t instanceof Error)
                throw (Error) t;
        } catch (Exception t) {
            errorHandler.fatalError("[COULDNT_INVOKE_PROC_METHOD]", t,
                method.getName());
        }
    }

}

14.2. MailStoreConnector

The MailStoreConnector class uses the JavaMail API to connect to a mail store. The connect() method gets a Store object and calls its connect() method. Then it opens a folder and returns true if the operation succeeds. The disconnect() method closes the folder and calls the close() method of the Store object.

The process() method relies on two abstract methods getMessages() and wasProcessed(), which must be defined by subclasses.

MailStoreConnector.java:

package com.devsphere.apps.mapping.monitor;

import com.devsphere.helpers.logging.*;
import com.devsphere.logging.*;

import java.io.*;
import java.lang.reflect.*;
import javax.mail.*;
import javax.mail.event.*;

/**
 * Abstract connector to a mail store
 */
public abstract class MailStoreConnector extends StoreConnector {
    protected Session session;
    protected Store store;
    protected Folder folder;
    protected String host;
    protected boolean connected;

    /**
     * Initializes the store connector
     */
    public void initialize(Class beanClass, Object processor, Method method,
        boolean debug, AbstractLogger logger, ApplicationErrorHandler errorHandler) {
        super.initialize(beanClass, processor, method, debug, logger,
            errorHandler);

        // Get the session object
        session = Session.getInstance(System.getProperties(), null);
        session.setDebug(debug);
    }

    /**
     * Tries to establish a connection and returns true if succeeds.
     */
    public boolean connect(String protocol, String host, String user,
        String password, String folderName) {
        connected = false;
        this.host = host;

        // Get the store object
        try {
            store = session.getStore(protocol);
        } catch (NoSuchProviderException e) {
            errorHandler.error("[NO_PROTOCOL_PROVIDER]", e, protocol);
            return connected;
        }
        if (store == null) {
            errorHandler.error("[COULDNT_GET_STORE]", null, protocol);
            return connected;
        }

        // Connect to the host
        try {
            store.connect(host, user, password);
        } catch (MessagingException e) {
            errorHandler.error("[COULDNT_CONNECT]", e, host);
            return connected;
        }

        // Get the folder
        try {
            folder = store.getFolder(folderName);
            if (folder == null || !folder.exists()) {
                errorHandler.error("[FOLDER_NOT_FOUND]", null, folderName);
                return connected;
            }
        } catch (MessagingException e) {
            errorHandler.error("[COULDNT_GET_FOLDER]", e, folderName);
            return connected;
        }

        // Open the folder
        try {
            folder.open(Folder.READ_WRITE);
        } catch (MessagingException e) {
            errorHandler.error("[COULDNT_OPEN_FOLDER]", e, folderName);
            return connected;
        }

        connected = true;
        return connected;
    }

    /**
     * Processes a message
     */
    protected void processMessage(Message msg) {
        // Get the input stream
        InputStream in = null;
        try {
            in = msg.getInputStream();
        } catch (MessagingException e) {
            errorHandler.fatalError("[COULDNT_GET_INPUT_STREAM]", e);
        } catch (IOException e) {
            errorHandler.fatalError("[COULDNT_GET_INPUT_STREAM]", e);
        }

        // Map the text to a bean object and process it
        if (in != null)
            processMessage(in);
    }

    /**
     * Gets all messages that might be eligible for processing
     */
    protected abstract Message[] getMessages();

    /**
     * Returns true if the message was processed
     */
    protected abstract boolean wasProcessed(Message msg);

    /**
     * Processes the messages that haven't been processed yet
     */
    public void process() {
        Message msgs[] = getMessages();
        if (msgs != null)
            for (int i = 0; i < msgs.length; i++)
                if (!wasProcessed(msgs[i]))
                    processMessage(msgs[i]);
    }

    /**
     * Tries to close a connection.
     * Should be called even if connect() failed.
     */
    public void disconnect() {
        // Close the folder
        if (folder != null && folder.isOpen())
            try {
                folder.close(false);
            } catch (MessagingException e) {
                errorHandler.error("[COULDNT_CLOSE_FOLDER]", e,
                    folder.getName());
            }

        // Close the store
        if (store != null)
            try {
                store.close();
            } catch (MessagingException e) {
                errorHandler.error("[COULDNT_CLOSE_CONNECTION]", e, host);
            }

        store = null;
        folder = null;
        connected = false;
    }

}

14.3. IMAPStoreConnector

The IMAPStoreConnector extends MailStoreConnector and uses the JavaMail API to connect to an IMAP server.

The getMessages() method returns all messages of the open folder. The Message instances are lightweight objects, i. e. JavaMail contacts the server only when some information (such as the message content or the envelope or the flags) is needed. Getting all Message objects doesn't imply the getting of these message's content.

The wasProcessed() method returns true if the SEEN flag of the message is set. MailMonitor should be the only application that accesses the content of the messages. If a message is "seen" for the first time by another application, it won't be processed.

IMAPStoreConnector.java:

package com.devsphere.apps.mapping.monitor;

import javax.mail.*;
import javax.mail.event.*;
import java.io.*;

/**
 * Connector to an IMAP mail store
 */
public class IMAPStoreConnector extends MailStoreConnector {
    /**
     * Gets all messages
     */
    protected Message[] getMessages() {
        Message msgs[] = null;
        try {
            msgs = folder.getMessages();

            // Optimization: fetch the flags
            FetchProfile fp = new FetchProfile();
            fp.add(FetchProfile.Item.FLAGS);
            folder.fetch(msgs, fp);
        } catch (MessagingException e) {
            errorHandler.fatalError("[COULDNT_GET_MESSAGES]", e);
        }
        return msgs;
    }

    /**
     * Returns true if the message was read
     */
    protected boolean wasProcessed(Message msg) {
        try {
            return msg.isSet(Flags.Flag.SEEN);
        } catch (MessagingException e) {
            errorHandler.fatalError("[COULDNT_GET_SEEN_FLAG]", e);
        }
        return true;
    }

}

14.4. POP3StoreConnector

The POP3StoreConnector can't use the SEEN flag because the POP3 protocol doesn't support "permanent" flags (the term is defined by JavaMail). Therefore, wasProcessed() returns always false and getMessage() returns all messages that MIGHT not have been processed.

The connector supposes that when the MailMonitor started, none of the store's messages had been processed. Also no other application should delete messages while the MailMonitor runs since the connector relies on message numbers.

POP3StoreConnector.java:

package com.devsphere.apps.mapping.monitor;

import com.devsphere.helpers.logging.*;
import com.devsphere.logging.*;

import javax.mail.*;
import javax.mail.event.*;
import java.io.*;
import java.lang.reflect.*;

/**
 * Connector to a POP3 mail store
 */
public class POP3StoreConnector extends MailStoreConnector {
    protected int count;

    /**
     * Initializes the store connector
     */
    public void initialize(Class beanClass, Object processor, Method method,
        boolean debug, AbstractLogger logger, ApplicationErrorHandler errorHandler) {
        super.initialize(beanClass, processor, method, debug, logger,
            errorHandler);
        count = 0;
    }

    /**
     * Returns false since the POP3 protocol doesn't support flags
     */
    protected boolean wasProcessed(Message msg) {
        return false;
    }

    /**
     * Gets all messages that haven't been read during the current session yet
     */
    protected Message[] getMessages() {
        // Close the folder
        try {
            folder.close(false);
        } catch (MessagingException e) {
            errorHandler.error("[COULDNT_CLOSE_FOLDER]", e, folder.getName());
        }

        // Reopen the folder
        try {
            folder.open(Folder.READ_WRITE);
        } catch (MessagingException e) {
            errorHandler.error("[COULDNT_OPEN_FOLDER]", e, folder.getName());
        }

        // Get the new messages
        Message msgs[] = null;
        try {
            int newCount = folder.getMessageCount();
            if (count < newCount)
                msgs = folder.getMessages(count+1, newCount);
            count = newCount;
        } catch (MessagingException e) {
            errorHandler.fatalError("[COULDNT_GET_MESSAGES]", e);
        }
        return msgs;
    }

}

14.5. DIRStoreConnector

The DIRStoreConnector takes the messages directly from a system folder / directory without using JavaMail. A SMTP server running on the local machine could drop the messages that come from the network to that system folder.

Only the new messages are processed. If you want to process a group of saved messages, you have to start the MailMonitor first and then copy/drop the files to the system folder.

DIRStoreConnector.java:

package com.devsphere.apps.mapping.monitor;

import com.devsphere.helpers.logging.*;
import com.devsphere.logging.*;

import java.io.*;
import java.lang.reflect.*;
import java.util.*;

/**
 * Connector to a directory that stores e-mail messages
 */
public class DIRStoreConnector extends StoreConnector {
    protected File dir;
    protected Date date;

    /**
     * Initializes the store connector
     */
    public void initialize(Class beanClass, Object processor, Method method,
        boolean debug, AbstractLogger logger, ApplicationErrorHandler errorHandler) {
        super.initialize(beanClass, processor, method, debug, logger,
            errorHandler);
        date = new Date();
    }

    /**
     * Tries to establish a connection and returns true if succeeds.
     */
    public boolean connect(String protocol, String host, String user,
        String password, String folderName) {
        dir = new File(folderName);
        return true;
    }

    /**
     * Gets and processes the new messages
     */
    public void process() {
        Date newDate = new Date();
        String list[] = dir.list();
        for (int i = 0; i < list.length; i++) {
            File file = new File(dir, list[i]);
            Date lastModified = new Date(file.lastModified());
            if (lastModified.equals(date) || lastModified.after(date))
                if (lastModified.before(newDate))
                    if (file.canRead())
                        try {
                            InputStream in = new BufferedInputStream(
                                new FileInputStream(file));
                            try {
                                skipHeaders(in);
                                processMessage(in);
                            } finally {
                                in.close();
                            }
                        } catch (IOException e) {
                            errorHandler.fatalError(null, e);
                        }
        }
        date = newDate;
    }

    /**
     * Tries to close the connection.
     * Should be called even if connect() failed.
     */
    public void disconnect() {
        dir = null;
    }

    /**
     * Skips the headers of an e-mail message
     */
    protected void skipHeaders(InputStream in) throws IOException {
        int len = 0;
        while (true) {
            int c = in.read();
            if (c == -1)
                break; // end of file
            if (c == '\n') {
                // end of line
                if (len == 0)
                    break; // empty line
                len = 0;
            } else if (c != '\r')
                len++;
        }
    }

}

14.6. Hints and Tips

The next chapter presents the MailMonitor class, which invokes the connectors' methods and deals with the multithreading issues.

You may provide your own connectors and replace the default implementations by editing the ConnectorResources.properties file.

Contents | Previous Chapter | Next Chapter

Copyright © 2000-2020 Devsphere

About us
Products
Services
Articles
Contact us