extensions/net.sf.basedb.hdfs/trunk/src/net/sf/basedb/hdfs/HdfsConnectionManager.java

Code
Comments
Other
Rev Date Author Line
1330 01 Apr 11 nicklas 1 /**
1330 01 Apr 11 nicklas 2   $Id$
1330 01 Apr 11 nicklas 3
1330 01 Apr 11 nicklas 4   Copyright (C) 2011 Nicklas Nordborg
1330 01 Apr 11 nicklas 5
1330 01 Apr 11 nicklas 6   This file is part of BASE - BioArray Software Environment.
1330 01 Apr 11 nicklas 7   Available at http://base.thep.lu.se/
1330 01 Apr 11 nicklas 8
1330 01 Apr 11 nicklas 9   BASE is free software; you can redistribute it and/or
1330 01 Apr 11 nicklas 10   modify it under the terms of the GNU General Public License
1330 01 Apr 11 nicklas 11   as published by the Free Software Foundation; either version 3
1330 01 Apr 11 nicklas 12   of the License, or (at your option) any later version.
1330 01 Apr 11 nicklas 13
1330 01 Apr 11 nicklas 14   BASE is distributed in the hope that it will be useful,
1330 01 Apr 11 nicklas 15   but WITHOUT ANY WARRANTY; without even the implied warranty of
1330 01 Apr 11 nicklas 16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
1330 01 Apr 11 nicklas 17   GNU General Public License for more details.
1330 01 Apr 11 nicklas 18
1330 01 Apr 11 nicklas 19   You should have received a copy of the GNU General Public License
1330 01 Apr 11 nicklas 20   along with BASE. If not, see <http://www.gnu.org/licenses/>.
1330 01 Apr 11 nicklas 21 */
1330 01 Apr 11 nicklas 22 package net.sf.basedb.hdfs;
1330 01 Apr 11 nicklas 23
1450 03 Nov 11 nicklas 24 import java.io.FileNotFoundException;
1330 01 Apr 11 nicklas 25 import java.io.IOException;
1330 01 Apr 11 nicklas 26 import java.io.InputStream;
1330 01 Apr 11 nicklas 27 import java.net.InetSocketAddress;
1330 01 Apr 11 nicklas 28 import java.net.URI;
1330 01 Apr 11 nicklas 29 import java.util.Date;
1330 01 Apr 11 nicklas 30
1330 01 Apr 11 nicklas 31 import org.apache.hadoop.conf.Configuration;
1330 01 Apr 11 nicklas 32 import org.apache.hadoop.hdfs.DFSClient;
1330 01 Apr 11 nicklas 33 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
1330 01 Apr 11 nicklas 34
1330 01 Apr 11 nicklas 35 import net.sf.basedb.util.uri.CloseResourceInputStream;
1330 01 Apr 11 nicklas 36 import net.sf.basedb.util.uri.ConnectionParameters;
1330 01 Apr 11 nicklas 37 import net.sf.basedb.util.uri.ConnectionManager;
1330 01 Apr 11 nicklas 38 import net.sf.basedb.util.uri.UriMetadata;
1330 01 Apr 11 nicklas 39
1330 01 Apr 11 nicklas 40 /**
1330 01 Apr 11 nicklas 41   Direct connection to Hadoop Distributed File System (HDFS). URI:s should
1330 01 Apr 11 nicklas 42   use 'hdfs://' as schema. The host and port of the URI should point to the 
1330 01 Apr 11 nicklas 43   namenode of the Hadoop cluster. The path of the URI is the path of the
1330 01 Apr 11 nicklas 44   file in the HDFS filesystem.
1330 01 Apr 11 nicklas 45
1330 01 Apr 11 nicklas 46   @author Nicklas
1330 01 Apr 11 nicklas 47   @since 1.0
1330 01 Apr 11 nicklas 48 */
1330 01 Apr 11 nicklas 49 public class HdfsConnectionManager
1330 01 Apr 11 nicklas 50   implements ConnectionManager
1330 01 Apr 11 nicklas 51 {
1330 01 Apr 11 nicklas 52
1330 01 Apr 11 nicklas 53   private final URI uri;
1330 01 Apr 11 nicklas 54   private final ConnectionParameters parameters;
1330 01 Apr 11 nicklas 55   private UriMetadata metadata;
1330 01 Apr 11 nicklas 56   
1330 01 Apr 11 nicklas 57   public HdfsConnectionManager(URI uri, ConnectionParameters parameters)
1330 01 Apr 11 nicklas 58   {
1330 01 Apr 11 nicklas 59     this.uri = uri;
1330 01 Apr 11 nicklas 60     this.parameters = parameters;
1330 01 Apr 11 nicklas 61   }
1330 01 Apr 11 nicklas 62   
1330 01 Apr 11 nicklas 63   /*
1330 01 Apr 11 nicklas 64     From the UriHandler interface
1330 01 Apr 11 nicklas 65     -----------------------------
1330 01 Apr 11 nicklas 66   */
1330 01 Apr 11 nicklas 67   @Override
1330 01 Apr 11 nicklas 68   public URI getURI()
1330 01 Apr 11 nicklas 69   {
1330 01 Apr 11 nicklas 70     return uri;
1330 01 Apr 11 nicklas 71   }
1330 01 Apr 11 nicklas 72
1330 01 Apr 11 nicklas 73   @Override
1330 01 Apr 11 nicklas 74   public InputStream getInputStream() 
1330 01 Apr 11 nicklas 75     throws IOException
1330 01 Apr 11 nicklas 76   {
1330 01 Apr 11 nicklas 77     URI uri = getURI();
1330 01 Apr 11 nicklas 78     InputStream stream = null;
1330 01 Apr 11 nicklas 79     DFSClient client = null;
1330 01 Apr 11 nicklas 80     try
1330 01 Apr 11 nicklas 81     {
1330 01 Apr 11 nicklas 82       int port = uri.getPort();
1330 01 Apr 11 nicklas 83       if (port == -1) port = 9000;
1330 01 Apr 11 nicklas 84       client = createClient(uri, parameters);
1450 03 Nov 11 nicklas 85       InputStream tmp = client.open(uri.getPath());
1450 03 Nov 11 nicklas 86       if (tmp == null)
1450 03 Nov 11 nicklas 87       {
1450 03 Nov 11 nicklas 88         throw new FileNotFoundException(uri.toString());
1450 03 Nov 11 nicklas 89       }
1450 03 Nov 11 nicklas 90       stream = new CloseResourceInputStream(tmp, client);
1330 01 Apr 11 nicklas 91     }
1330 01 Apr 11 nicklas 92     finally
1330 01 Apr 11 nicklas 93     {
1330 01 Apr 11 nicklas 94       // If no stream has been created we shutdown the client immediately
1330 01 Apr 11 nicklas 95       if (stream == null && client != null) closeClient(client);
1330 01 Apr 11 nicklas 96     }
1330 01 Apr 11 nicklas 97     return stream;        
1330 01 Apr 11 nicklas 98   }
1330 01 Apr 11 nicklas 99
1330 01 Apr 11 nicklas 100   @Override
1330 01 Apr 11 nicklas 101   public UriMetadata getMetadata() 
1330 01 Apr 11 nicklas 102     throws IOException
1330 01 Apr 11 nicklas 103   {
1330 01 Apr 11 nicklas 104     if (metadata == null) 
1330 01 Apr 11 nicklas 105     {
1330 01 Apr 11 nicklas 106       URI uri = getURI();
1330 01 Apr 11 nicklas 107       DFSClient client = null;
1330 01 Apr 11 nicklas 108       try
1330 01 Apr 11 nicklas 109       {
1330 01 Apr 11 nicklas 110         client = createClient(uri, parameters);
1450 03 Nov 11 nicklas 111         HdfsFileStatus status = client.getFileInfo(uri.getPath());
1450 03 Nov 11 nicklas 112         if (status == null)
1450 03 Nov 11 nicklas 113         {
1450 03 Nov 11 nicklas 114           throw new FileNotFoundException(uri.toString());
1450 03 Nov 11 nicklas 115         }
1450 03 Nov 11 nicklas 116         metadata = createMetadata(uri, status);
1330 01 Apr 11 nicklas 117       }
1330 01 Apr 11 nicklas 118       finally
1330 01 Apr 11 nicklas 119       {
1330 01 Apr 11 nicklas 120         closeClient(client);
1330 01 Apr 11 nicklas 121       }
1330 01 Apr 11 nicklas 122     }
1330 01 Apr 11 nicklas 123     return metadata;
1330 01 Apr 11 nicklas 124   }
1330 01 Apr 11 nicklas 125   // --------------------------------------
1330 01 Apr 11 nicklas 126   
1330 01 Apr 11 nicklas 127   /**
1330 01 Apr 11 nicklas 128     Create a DFSClient for accessing the given URI.
1330 01 Apr 11 nicklas 129   */
1330 01 Apr 11 nicklas 130   public DFSClient createClient(URI uri, ConnectionParameters parameters)
1330 01 Apr 11 nicklas 131     throws IOException
1330 01 Apr 11 nicklas 132   {
1330 01 Apr 11 nicklas 133     DFSClient client = null;
1330 01 Apr 11 nicklas 134     ClassLoader old = null;
1330 01 Apr 11 nicklas 135     boolean gotOld = false;
1330 01 Apr 11 nicklas 136     try
1330 01 Apr 11 nicklas 137     {
1330 01 Apr 11 nicklas 138       // We need to set the context class loader because
1330 01 Apr 11 nicklas 139       // there is code inside that uses this class loader to
1330 01 Apr 11 nicklas 140       // load HDFS classes
1330 01 Apr 11 nicklas 141       old = Hdfs.setContextClassLoader();
1330 01 Apr 11 nicklas 142       gotOld = true;
1330 01 Apr 11 nicklas 143       Configuration conf = new Configuration();
1330 01 Apr 11 nicklas 144       int port = uri.getPort();
1330 01 Apr 11 nicklas 145       if (port == -1) port = 9000;
1330 01 Apr 11 nicklas 146       InetSocketAddress namenode = new InetSocketAddress(uri.getHost(), port);
1330 01 Apr 11 nicklas 147       client = new DFSClient(namenode, conf);
1330 01 Apr 11 nicklas 148     }
1330 01 Apr 11 nicklas 149     finally
1330 01 Apr 11 nicklas 150     {
1330 01 Apr 11 nicklas 151       if (gotOld) Hdfs.resetContextClassLoader(old);
1330 01 Apr 11 nicklas 152     }
1330 01 Apr 11 nicklas 153     return client;
1330 01 Apr 11 nicklas 154   }
1330 01 Apr 11 nicklas 155
1330 01 Apr 11 nicklas 156   /**
1330 01 Apr 11 nicklas 157     Close a DFSClient without throwing an IOException.
1330 01 Apr 11 nicklas 158   */
1330 01 Apr 11 nicklas 159   public void closeClient(DFSClient client)
1330 01 Apr 11 nicklas 160   {
1330 01 Apr 11 nicklas 161     if (client == null) return;
1330 01 Apr 11 nicklas 162     try
1330 01 Apr 11 nicklas 163     {
1330 01 Apr 11 nicklas 164       client.close();
1330 01 Apr 11 nicklas 165     }
1330 01 Apr 11 nicklas 166     catch (IOException ex)
1330 01 Apr 11 nicklas 167     {}
1330 01 Apr 11 nicklas 168   }
1330 01 Apr 11 nicklas 169   
1330 01 Apr 11 nicklas 170   /**
1330 01 Apr 11 nicklas 171     Read metadata from the given file information.
1330 01 Apr 11 nicklas 172     <ul>
1330 01 Apr 11 nicklas 173     <li>Length: {@link HdfsFileStatus#getLen()}
1330 01 Apr 11 nicklas 174     <li>Last modified: {@link HdfsFileStatus#getModificationTime()}
1330 01 Apr 11 nicklas 175     </ul>
1330 01 Apr 11 nicklas 176   */
1330 01 Apr 11 nicklas 177   public UriMetadata createMetadata(URI uri, HdfsFileStatus status)
1330 01 Apr 11 nicklas 178   {
1330 01 Apr 11 nicklas 179     UriMetadata meta = new UriMetadata(uri);
1330 01 Apr 11 nicklas 180     meta.setLength(status.getLen());
1330 01 Apr 11 nicklas 181     meta.setLastModified(new Date(status.getModificationTime()));
1330 01 Apr 11 nicklas 182     return meta;
1330 01 Apr 11 nicklas 183   }
1330 01 Apr 11 nicklas 184   
1330 01 Apr 11 nicklas 185 }