本节,主要介绍Hadoop-IntelliJ-Plugin 的HDFS连接处理的设计和实现。主要包括HDFS连接Connection的接口定义、HDFS连接的实现、HDFS连接的缓存处理、和连接相关配置项。整个类的设计如下:
HDFS连接接口ConnectionHandler和其实现类ConnectionHandlerImpl
ConnectionHandler接口。
该接口主要定义了获取HDFS连接对象Configuration、HDFS文件系统对象FileSystem、设置连接的相关信息、获取连接关联的文件系统对象的集合、获取连接对应的文件系统版本、判断HDFS文件系统是否可以连接上、测试HDFS连接、获取HDFS连接的状态等。接口定义的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
public interface ConnectionHandler extends Disposable,ConnectionProvider, Presentable {
Project getProject();
boolean createTestConnection() throws IOException;
Configuration getMainConnection();
FileSystem getMainFileSystem();
ConnectionStatus getConnectionStatus();
ConnectionInfo getConnectionInfo();
void setConnectionInfo(ConnectionInfo paramConnectionInfo);
boolean isActive();
FileSystemType getFileSystemType();
double getFileSystemVersion();
boolean isConnected();
int getIdleMinutes();
FileSystemInfo getFileSystemInfo();
void disconnect();
String getId();
String getUserName();
String getPresentableText();
ConnectionHandlerRef getRef();
boolean isVirtual();
ConnectionLoadMonitor getLoadMonitor();
ConnectionBundle getConnectionBundle();
boolean canConnect();
boolean isValid(boolean paramBoolean);
boolean isValid();
FileSystemObjectBundle getObjectBundle();
Filter<FileSystemBrowserTreeNode> getObjectTypeFilter();
ConnectionSettings getSettings(); }
|
HDFS连接的实现ConnectionHandlerImpl
在实现类中,我们重点看下 如何通过外部配置实现连接到HDFS的。在实现类初始化方法中,首先创建一个HDFS的Configuration对象,由于IDEA插件的启动是由IDEA框架去自动维护,HDFS读取配置的时候是默认当前线程类的加载器,这里其实就是IDEA的主程序的线程,也就是说,如果使用IDEA的主线程类的加载器加载HDFS配置,那么在运行的时候,HDFS实例化的时候会找不到HDFS 的相关的jar 包。因此这里需要绕一下,IDEA框架启动装配插件后,在插件初始化HDFS配置代码处,重新设置HDFS配置的类加载器,然后再设置Configuration的相关配置对象,这样才能从插件路径下找到HDFS的jar包。设置HDFS的Configuration后,调用HDFS的FileSystem.get(Configuration) ,获取到HDFS的文件系统对象FileSystem,然后测试HDFS是否能够连接上。测试HDFS连接,这里实现比较简单,使用FileSystem.exties(path) 判断根目录”/“,是否存在即可。如果“/”目录不存在或者该方法如果抛出一个IOException 异常,表明HDFS因某种原因,连接不上。代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
public ConnectionHandlerImpl(ConnectionBundle connectionBundle, ConnectionSettings connectionSettings) { this.connectionBundle = connectionBundle; this.connectionSettings = connectionSettings; configurationHdfs = new Configuration(false); ClassLoader pClassLoader=null; try { pClassLoader= Class.forName(com.fangyuzhong.intelliJ.hadoop.fsconnection.ConnectionManager.class.getName()).getClassLoader(); } catch (Exception ex) { LOGGER.error("获取当前类的加载器错误",ex); } if(pClassLoader!=null) { String hdfsPath = connectionSettings.getFileSystemSettings().getHDFSUrl(); String yarnResourceAMPath = connectionSettings.getFileSystemSettings().getMapReducelUrl(); if(!StringUtil.isEmptyOrSpaces(hdfsPath)) { configurationHdfs.setClassLoader(pClassLoader); configurationHdfs.set(Constants.FS_HDFS_IMPL_KEY, Constants.FS_HDFS_IMPL_VALUE); configurationHdfs.set(Constants.FS_FILE_IMPL_KEY,Constants.FS_FILE_IMPL_VALUE); configurationHdfs.set(Constants.FS_DEFAULTFS_KEY, hdfsPath); configurationHdfs.set(Constants.YARN_RESOURCEMANAGER_SCHEDULER_ADDRESS,yarnResourceAMPath); InitiFileSystem(configurationHdfs); } } try { canConnection = createTestConnection(); } catch (IOException ex) { canConnection=false; LOGGER.error("创建连接测试异常",ex); } connectionInfo = new ConnectionInfo(connectionSettings); connectionStatus = new ConnectionStatus(); ref = new ConnectionHandlerRef(this); connectionStatus.setConnected(canConnection); }
private void InitiFileSystem(Configuration configuration) { try { fileSystem = FileSystem.get(configuration); } catch (Exception ex) { LOGGER.error("通过configuration获取HDFS系统对象异常",ex); } }
|
| public boolean createTestConnection() throws IOException { boolean canConnection = false; if (fileSystem == null) { canConnection = false; } else { canConnection = fileSystem.exists(new Path("/")); } return canConnection; }
|
该类中还有个重要的方法:FileSystemObjectBundle getObjectBundle(),标示该连接到HDFS后,获取根目录下所有的对象,包括目录和文件对象
|
@Override public FileSystemObjectBundle getObjectBundle() { if (objectBundle == null) { objectBundle = new FileSystemObjectBundleImpl(this, connectionBundle); } return objectBundle; }
|
关于类FIleSystemObjectBundleImpl 将在后面介绍文件系统对象的时候介绍。
ConnectionBundle类,表示连接集合。
用户可以进行多个HDFS连接设置,这样就会产生多个连接Connection,那使用ConnectionBundle类来定义和处理这些连接。ConnectionBundle类继承FIleSystemBrowserTreeNodeBase,表示他也是个树节点,当用户选择以单个树多个根节点来展示的时候,ConnectionBundle就以节点的方式来展示。这里还要注意,实例化 ConnectionBundle的时候,默认加载一个虚拟的连接 virtualConnectons对象。相关代码片段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public ConnectionBundle(Project project) { this.projectRef = new ProjectRef(project); this.virtualConnections.add(new VirtualConnectionHandler("virtual-hdfs-fsconnection", "Virtual - hdfs 3.0", FileSystemType.HDFS,3.0, project)); }
public void addConnection(ConnectionHandler connectionHandler) { this.connectionHandlers.add(connectionHandler); Disposer.register(this, connectionHandler); }
|
HDFS连接Connection的管理类 ConnectionManager。
ConnectionManager属于Project级别插件,实现了抽象类AbstractProjectComponent,维护整个HDFS的连接管理,包括:连接配置修改后,重新构建UI对象更新Connection连接、获取连接的集合ConnectionBundle、测试HDFS连接、根据连接的ID获取ConnectionHandle 等等。相关代码片段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
public static ConnectionManager getInstance(@NotNull Project project) { return getComponent(project); }
private static ConnectionManager getComponent(@NotNull Project project) {
return FailsafeUtil.getComponent(project, ConnectionManager.class); }
private ConnectionManager(Project project) { super(project); this.connectionBundle = ConnectionBundleSettings.getInstance(getProject()).getConnectionBundle() Disposer.register(this, this.connectionBundle); }
public void initComponent() { super.initComponent(); Project project = getProject(); EventUtil.subscribe(project, this, ConnectionSettingsListener.TOPIC, this.connectionSettingsListener); this.idleConnectionCleaner = new Timer("HDFS - Idle Connection Cleaner [" + project.getName() + "]"); this.idleConnectionCleaner.schedule(new CloseIdleConnectionTask(), TimeUtil.ONE_MINUTE, TimeUtil.ONE_MINUTE); }
private ConnectionSettingsListener connectionSettingsListener = new ConnectionSettingsAdapter() { public void connectionChanged(String connectionId) { final ConnectionHandler connectionHandler = getConnectionHandler(connectionId); connectionHandler.getObjectBundle().refreshTreeChildren(); } };
|
HDFS连接connection缓存类 ConnectionCache
ConnectionCache,缓存类,使用HashMap 以连接的ID为Key,对应的连接ConfigurationHandler为Value进行存储。该类是Application级别的插件,IDEA启动时进行初始化。该类中提供按照连接的ID查找连接ConnectionHandler。初始化时注册了Project生命周期的相关事件处理。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
public class ConnectionCache implements ApplicationComponent {
private static Map<String, ConnectionHandler> CACHE = new THashMap();
@Nullable public static ConnectionHandler findConnectionHandler(String connectionId) { ConnectionHandler connectionHandler = CACHE.get(connectionId); ProjectManager projectManager = ProjectManager.getInstance(); if ((connectionHandler == null) && (projectManager != null)) { synchronized (ConnectionCache.class) { connectionHandler = CACHE.get(connectionId); if (connectionHandler == null) { for (Project project : projectManager.getOpenProjects()) { ConnectionManager connectionManager = ConnectionManager.getInstance(project); connectionHandler = connectionManager.getConnectionHandler(connectionId); if ((connectionHandler != null) && (!connectionHandler.isDisposed())) { CACHE.put(connectionId, connectionHandler); return connectionHandler; } } } } } return (connectionHandler == null) || (connectionHandler.isDisposed()) ? null : connectionHandler; }
public void initComponent() { EventUtil.subscribe(null, ProjectLifecycleListener.TOPIC, this.projectLifecycleListener); } public void disposeComponent() { } @NotNull public String getComponentName() { return "HadoopNavigator.ConnectionCache";
}
private ProjectLifecycleListener projectLifecycleListener = new ProjectLifecycleListener.Adapter() {
public void projectComponentsInitialized(@NotNull Project project) { ConnectionManager connectionManager = ConnectionManager.getInstance(project); if (connectionManager == null) return; List<ConnectionHandler> connectionHandlers = connectionManager.getConnectionHandlers(); for (ConnectionHandler connectionHandler : connectionHandlers) { ConnectionCache.CACHE.put(connectionHandler.getId(), connectionHandler); } }
public void afterProjectClosed(@NotNull Project project) { Iterator<String> connectionIds = ConnectionCache.CACHE.keySet().iterator(); while (connectionIds.hasNext()) { String connectionId = (String) connectionIds.next(); ConnectionHandler connectionHandler = (ConnectionHandler) ConnectionCache.CACHE.get(connectionId); if ((connectionHandler.isDisposed()) || (connectionHandler.getProject() == project)) { connectionIds.remove(); } } } }; }
|