何の話かというと
「日本OpenStackユーザ会 第4回勉強会」で、この資料を使って、GlusterFSのSwift APIを紹介したところ、実際にソースを見たSwiftの有識者の方からコメントをいただきました。曰く、
「まだ力ずくで対応させている感があるので、しばらく様子を見たほうがよさそうかと」
・・・というわけで、みなさんにも今後の発展の様子をみていただくために、現状がどの程度力ずくなのか、私の方で、最新のソースからdiffった結果を残しておきたいと思います。
比較するのは、オリジナルの「swift-1.4.8.tar.gz」とGlusterFS用にパッチのあたった下記のRPMです。
# rpm -qa | grep swift gluster-swift-container-1.4.8-1.el6.noarch gluster-swift-account-1.4.8-1.el6.noarch gluster-swift-object-1.4.8-1.el6.noarch gluster-swift-proxy-1.4.8-1.el6.noarch gluster-swift-1.4.8-1.el6.noarch gluster-swift-plugin-1.0-1.noarch
注1)これらのRPMは、まだ一般公開されていません。少し古いバージョンが下記にあるので、ソースを見たい方はこちらを参照ください。(ただし、この記事に記載の最新版とは相当な差異があるようです。)
・swift-1.4.5-1.noarch.rpm
・swift-plugin-1.0.-1.el6.noarch.rpm
注2)筆者自身は、オリジナルのSwiftのコードに精通しているわけではないので、diffで出てくる差分のどこまでが、GlusterFS対応のためのものか、比較対象のSwiftのコードがずれているために出てきたものかは分かっていません。とりあえず、以下は、すべての差分はGlusterFS対応のものと考えて書いています。勘違いなどあれば、ご指摘ください。
前提知識
先の資料のイラストでは、「proxy01」というサーバがありますが、実際にはこの中に、proxy/account/container/objectの各サーバが同居して動いています。
# ps -efw | grep swif[t] root 21434 1 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-proxy-server /etc/swift/proxy-server.conf root 21435 1 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-container-server /etc/swift/container-server/1.conf root 21436 1 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-account-server /etc/swift/account-server/1.conf root 21437 1 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-object-server /etc/swift/object-server/1.conf root 21454 21437 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-object-server /etc/swift/object-server/1.conf root 21455 21436 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-account-server /etc/swift/account-server/1.conf root 21456 21435 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-container-server /etc/swift/container-server/1.conf root 21457 21434 0 07:06 ? 00:00:00 /usr/bin/python /usr/bin/swift-proxy-server /etc/swift/proxy-server.conf
それぞれのサーバ機能に対して、バックエンドにGlusterFSを使用するようにパッチがあたっています。
Proxyサーバ
まず、Proxyサーバについての結果です。
# diff -up swift/proxy/server.py /usr/lib/python2.6/site-packages/swift/proxy/server.py --- swift/proxy/server.py 2012-03-22 09:24:46.000000000 +0000 +++ /usr/lib/python2.6/site-packages/swift/proxy/server.py 2012-04-26 07:17:58.000000000 +0000 @@ -1,4 +1,5 @@ # Copyright (c) 2010-2012 OpenStack, LLC. +# Copyright (c) 2011 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -53,11 +54,20 @@ from webob import Request, Response from swift.common.ring import Ring from swift.common.utils import cache_from_env, ContextPool, get_logger, \ - get_remote_client, normalize_timestamp, split_path, TRUE_VALUES + get_remote_client, normalize_timestamp, split_path, TRUE_VALUES, \ + plugin_enabled from swift.common.bufferedhttp import http_connect -from swift.common.constraints import check_metadata, check_object_creation, \ - check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ - MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE + +if plugin_enabled(): + from swift.plugins.constraints import check_object_creation, \ + MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE +else: + from swift.common.constraints import check_object_creation, \ + MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE + +from swift.common.constraints import check_metadata, check_utf8, \ + CONTAINER_LISTING_LIMIT + from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout
Proxyサーバには以外と手は加えられていません。「plugin_enabled()」は、下記のパッチで追加された関数です。
# diff -up swift/common/utils.py /usr/lib/python2.6/site-packages/swift/common/utils.py --- swift/common/utils.py 2012-03-22 09:24:46.000000000 +0000 +++ /usr/lib/python2.6/site-packages/swift/common/utils.py 2012-04-26 07:17:58.000000000 +0000 @@ -1,4 +1,5 @@ # Copyright (c) 2010-2012 OpenStack, LLC. +# Copyright (c) 2011 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1138,3 +1139,12 @@ def streq_const_time(s1, s2): for (a, b) in zip(s1, s2): result |= ord(a) ^ ord(b) return result == 0 + +def plugin_enabled(): + swift_conf = ConfigParser() + swift_conf.read(os.path.join('/etc/swift', 'swift.conf')) + try: + return swift_conf.get('DEFAULT', 'Enable_plugin', 'no') in TRUE_VALUES + except NoOptionError, NoSectionError: + return False +
「swift.conf」で下記のように、「Enable_plugin = yes」を指定すると・・・
# cat /etc/swift/swift.conf [DEFAULT] Enable_plugin = yes [swift-hash] # random unique string that can never change (DO NOT LOSE) swift_hash_path_suffix = gluster
「swift/plugin/」以下のGlusterFS専用コード(下記)が利用されるようになるという仕掛けです。
# ls /usr/lib/python2.6/site-packages/swift/plugins/*py /usr/lib/python2.6/site-packages/swift/plugins/DiskDir.py /usr/lib/python2.6/site-packages/swift/plugins/DiskFile.py /usr/lib/python2.6/site-packages/swift/plugins/Glusterfs.py /usr/lib/python2.6/site-packages/swift/plugins/__init__.py /usr/lib/python2.6/site-packages/swift/plugins/constraints.py /usr/lib/python2.6/site-packages/swift/plugins/utils.py
Accountサーバ
次は、Accountサーバです。
まず、設定ファイルを見ると、pipelineで「egg:swift#gluster」というフィルタが追加されています。
# cat /etc/swift/account-server/1.conf [DEFAULT] devices = /srv/1/node mount_check = false bind_port = 6012 user = root log_facility = LOG_LOCAL2 [pipeline:main] pipeline = gluster account-server [app:account-server] use = egg:swift#account [filter:gluster] use = egg:swift#gluster [account-replicator] vm_test_mode = yes [account-auditor] [account-reaper]
egg-infoを見ると、こいつの実体は、「siwft/common/middleware/glutser.py」と分かります。
# cat /usr/lib/python2.6/site-packages/swift-1.4.8-py2.6.egg-info/entry_points.txt [paste.app_factory] account = swift.account.server:app_factory object = swift.obj.server:app_factory container = swift.container.server:app_factory proxy = swift.proxy.server:app_factory [paste.filter_factory] formpost = swift.common.middleware.formpost:filter_factory cname_lookup = swift.common.middleware.cname_lookup:filter_factory recon = swift.common.middleware.recon:filter_factory healthcheck = swift.common.middleware.healthcheck:filter_factory tempurl = swift.common.middleware.tempurl:filter_factory gluster = swift.common.middleware.gluster:filter_factory name_check = swift.common.middleware.name_check:filter_factory catch_errors = swift.common.middleware.catch_errors:filter_factory ratelimit = swift.common.middleware.ratelimit:filter_factory memcache = swift.common.middleware.memcache:filter_factory staticweb = swift.common.middleware.staticweb:filter_factory domain_remap = swift.common.middleware.domain_remap:filter_factory tempauth = swift.common.middleware.tempauth:filter_factory swift3 = swift.common.middleware.swift3:filter_factory
egg-infoの仕組みについては、このあたりを参照ください。
中身をまるごと掲載すると次の通り。ポイントは、「env['fs_onject']」に、Glusterfsクラスのインスタンスが入るという点。
/usr/lib/python2.6/site-packages/swift/common/middleware/gluster.py
# Copyright (c) 2011 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. from swift.common.utils import get_logger, plugin_enabled from swift import plugins from ConfigParser import ConfigParser class Gluster_plugin(object): """ Update the environment with keys that reflect Gluster_plugin enabled """ def __init__(self, app, conf): self.app = app self.conf = conf self.fs_name = 'Glusterfs' self.logger = get_logger(conf, log_route='gluster') def __call__(self, env, start_response): if not plugin_enabled(): return self.app(env, start_response) env['Gluster_enabled'] =True # fs_objectには、swift/plugins/Glusterfs.pyで定義されたクラスGlusterfsへの参照が入る。 fs_object = getattr(plugins, self.fs_name, False) if not fs_object: raise Exception('%s plugin not found', self.fs_name) # env['fs_onject']には、Glusterfsのインスタンスが入る。 env['fs_object'] = fs_object() fs_conf = ConfigParser() if fs_conf.read('/etc/swift/fs.conf'): try: # env['root']には、ボリュームのマウントポイントが入る。 env['root'] = fs_conf.get ('DEFAULT', 'mount_path') except NoSectionError, NoOptionError: self.logger.exception(_('ERROR mount_path not present')) return self.app(env, start_response) def filter_factory(global_conf, **local_conf): """Returns a WSGI filter app for use with paste.deploy.""" conf = global_conf.copy() conf.update(local_conf) def gluster_filter(app): return Gluster_plugin(app, conf) return gluster_filter
気になるGluterfsクラスの実体ですが、下記のpydocから分かるように、GlusterFSボリュームのマウント/アンマウントと、事前定義されているボリュームのリストを取得するメソッドを提供しています。
# pydoc /usr/lib/python2.6/site-packages/swift/plugins/Glusterfs.py | cat ・・・ CLASSES __builtin__.object Glusterfs class Glusterfs(__builtin__.object) | Methods defined here: | | __init__(self) | | get_export_from_account_id(self, account) | | get_export_list(self) | | get_export_list_local(self) | | get_export_list_remote(self) | | mount(self, account) | | unmount(self, mount_path) | | ---------------------------------------------------------------------- | Data descriptors defined here: | | __dict__ | dictionary for instance variables (if defined) | | __weakref__ | list of weak references to the object (if defined)
特に、「get_export_from_account_id()」では、「アカウント名=AUTH_<ボリューム名>」という対応でのアカウント名とボリューム名の紐付けがハードコードされていることが分かります。
def get_export_from_account_id(self, account): if not account: print 'account is none, returning' raise AttributeError for export in self.get_export_list(): if account == 'AUTH_' + export: return export
ここで、いよいよ、Accountサーバ本体のdiffです。
# diff -up swift/account/server.py /usr/lib/python2.6/site-packages/swift/account/server.py --- swift/account/server.py 2012-03-22 09:24:46.000000000 +0000 +++ /usr/lib/python2.6/site-packages/swift/account/server.py 2012-04-26 07:17:58.000000000 +0000 @@ -1,4 +1,5 @@ # Copyright (c) 2010-2012 OpenStack, LLC. +# Copyright (c) 2011 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,7 +32,7 @@ import simplejson from swift.common.db import AccountBroker from swift.common.utils import get_logger, get_param, hash_path, \ - normalize_timestamp, split_path, storage_directory + normalize_timestamp, split_path, storage_directory, plugin_enabled from swift.common.constraints import ACCOUNT_LISTING_LIMIT, \ check_mount, check_float, check_utf8 from swift.common.db_replicator import ReplicatorRpc @@ -39,6 +40,8 @@ from swift.common.db_replicator import R DATADIR = 'accounts' +if plugin_enabled(): + from swift.plugins.DiskDir import DiskAccount class AccountController(object): """WSGI controller for the account server.""" @@ -52,8 +55,12 @@ class AccountController(object): self.mount_check, logger=self.logger) self.auto_create_account_prefix = \ conf.get('auto_create_account_prefix') or '.' + self.fs_object = None def _get_account_broker(self, drive, part, account): + if self.fs_object: + return DiskAccount(self.root, account, self.fs_object); + hsh = hash_path(account) db_dir = storage_directory(DATADIR, part, hsh) db_path = os.path.join(self.root, drive, db_dir, hsh + '.db') @@ -121,9 +128,15 @@ class AccountController(object): if broker.is_deleted(): return HTTPConflict(request=req) metadata = {} - metadata.update((key, (value, timestamp)) - for key, value in req.headers.iteritems() - if key.lower().startswith('x-account-meta-')) + if not self.fs_object: + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-account-meta-')) + else: + metadata.update((key, value) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-account-meta-')) + if metadata: broker.update_metadata(metadata) if created: @@ -153,6 +166,9 @@ class AccountController(object): broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) + if self.fs_object: + broker.list_containers_iter(None, None,None, + None, None) info = broker.get_info() headers = { 'X-Account-Container-Count': info['container_count'], @@ -164,9 +180,16 @@ class AccountController(object): container_ts = broker.get_container_timestamp(container) if container_ts is not None: headers['X-Container-Timestamp'] = container_ts - headers.update((key, value) - for key, (value, timestamp) in broker.metadata.iteritems() - if value != '') + if not self.fs_object: + headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '') + else: + headers.update((key, value) + for key, value in broker.metadata.iteritems() + if value != '') + + return HTTPNoContent(request=req, headers=headers) def GET(self, req): @@ -190,9 +213,15 @@ class AccountController(object): 'X-Account-Bytes-Used': info['bytes_used'], 'X-Timestamp': info['created_at'], 'X-PUT-Timestamp': info['put_timestamp']} - resp_headers.update((key, value) - for key, (value, timestamp) in broker.metadata.iteritems() - if value != '') + if not self.fs_object: + resp_headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '') + else: + resp_headers.update((key, value) + for key, value in broker.metadata.iteritems() + if value != '') + try: prefix = get_param(req, 'prefix') delimiter = get_param(req, 'delimiter') @@ -224,6 +253,7 @@ class AccountController(object): content_type='text/plain', request=req) account_list = broker.list_containers_iter(limit, marker, end_marker, prefix, delimiter) + if out_content_type == 'application/json': json_pattern = ['"name":%s', '"count":%s', '"bytes":%s'] json_pattern = '{' + ','.join(json_pattern) + '}' @@ -298,15 +328,29 @@ class AccountController(object): return HTTPNotFound(request=req) timestamp = normalize_timestamp(req.headers['x-timestamp']) metadata = {} - metadata.update((key, (value, timestamp)) - for key, value in req.headers.iteritems() - if key.lower().startswith('x-account-meta-')) + if not self.fs_object: + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-account-meta-')) + else: + metadata.update((key, value) + for key, value in req.headers.iteritems() + if key.lower().startswith('x-account-meta-')) if metadata: broker.update_metadata(metadata) return HTTPNoContent(request=req) + def plugin(self, env): + if env.get('Gluster_enabled', False): + self.fs_object = env.get('fs_object') + self.root = env.get('root') + self.mount_check = False + else: + self.fs_object = None + def __call__(self, env, start_response): start_time = time.time() + self.plugin(env) req = Request(env) self.logger.txn_id = req.headers.get('x-trans-id', None) if not check_utf8(req.path_info):
下の方から見ていくと、まず、「__call__」で関数コールされた時に、「plugin()」関数を呼んで、その中で、「self.fs_object(Glusterfsインスタンス)」「self.root(マウントポイント)」をセットしていることが分かります。後は、「self.fs_object」の有無で、GlusterFSの場合かどうかの処理の分岐が追加されています。
ポイントとなる変更部分は、次のところで、account_brokerとして、独自のDiskAccountインスタンスが使用される点です。
def _get_account_broker(self, drive, part, account): + if self.fs_object: + return DiskAccount(self.root, account, self.fs_object); +
こいつは、たとえば、アカウントが保持するコンテナのリストを返す際には、マウントポイント以下のディレクトリをスキャンするなどの動きをします。つまり、コンテナ情報を独自のデータベースに保持するのではなく、GlusterFSのボリュームの中を直接参照するような動きです。同様に、コンテナのメタデータは、コンテナに対応するディレクトリの拡張属性に直接格納しています。DiskAccountは「plugins/DiskDir.py」で定義されているので、詳細はそちらを読んでください。
その他の変更は、メタデータ更新の際に必要だった「timestamp」がなくなっている点です。オリジナルのコードでは、同時に更新リクエストが入った際に、データベースを「遅いものがち」で更新するために、更新リクエストにタイムスタンプを付与していましたが、今の場合、そのあたりは、GlusterFS自身の排他制御にまかせているようです。
Containerサーバ
次は、Containerサーバについての結果です。
# diff -up swift/container/server.py /usr/lib/python2.6/site-packages/swift/container/server.py --- swift/container/server.py 2012-03-22 09:24:46.000000000 +0000 +++ /usr/lib/python2.6/site-packages/swift/container/server.py 2012-04-26 07:17:58.000000000 +0000 @@ -1,4 +1,5 @@ # Copyright (c) 2010-2012 OpenStack, LLC. +# Copyright (c) 2011 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -31,7 +32,8 @@ from webob.exc import HTTPAccepted, HTTP from swift.common.db import ContainerBroker from swift.common.utils import get_logger, get_param, hash_path, \ - normalize_timestamp, storage_directory, split_path, validate_sync_to + normalize_timestamp, storage_directory, split_path, validate_sync_to, \ + plugin_enabled from swift.common.constraints import CONTAINER_LISTING_LIMIT, \ check_mount, check_float, check_utf8 from swift.common.bufferedhttp import http_connect @@ -40,6 +42,9 @@ from swift.common.db_replicator import R DATADIR = 'containers' +if plugin_enabled(): + from swift.plugins.DiskDir import DiskDir + class ContainerController(object): """WSGI Controller for the container server.""" @@ -62,6 +67,7 @@ class ContainerController(object): ContainerBroker, self.mount_check, logger=self.logger) self.auto_create_account_prefix = \ conf.get('auto_create_account_prefix') or '.' + self.fs_object = None def _get_container_broker(self, drive, part, account, container): """ @@ -73,6 +79,11 @@ class ContainerController(object): :param container: container name :returns: ContainerBroker object """ + if self.fs_object: + return DiskDir(self.root, drive, part, account, + container, self.logger, + fs_object = self.fs_object) + hsh = hash_path(account, container) db_dir = storage_directory(DATADIR, part, hsh) db_path = os.path.join(self.root, drive, db_dir, hsh + '.db') @@ -211,10 +222,18 @@ class ContainerController(object): if broker.is_deleted(): return HTTPConflict(request=req) metadata = {} - metadata.update((key, (value, timestamp)) - for key, value in req.headers.iteritems() - if key.lower() in self.save_headers or - key.lower().startswith('x-container-meta-')) + #Note: check the structure of req.headers + if not self.fs_object: + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-')) + else: + metadata.update((key, value) + for key, value in req.headers.iteritems() + if key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-')) + if metadata: if 'X-Container-Sync-To' in metadata: if 'X-Container-Sync-To' not in broker.metadata or \ @@ -222,6 +241,7 @@ class ContainerController(object): broker.metadata['X-Container-Sync-To'][0]: broker.set_x_container_sync_points(-1, -1) broker.update_metadata(metadata) + resp = self.account_update(req, account, container, broker) if resp: return resp @@ -245,6 +265,11 @@ class ContainerController(object): broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) + + if self.fs_object: + broker.list_objects_iter(None, None, None, None, + None, None) + info = broker.get_info() headers = { 'X-Container-Object-Count': info['object_count'], @@ -252,10 +277,17 @@ class ContainerController(object): 'X-Timestamp': info['created_at'], 'X-PUT-Timestamp': info['put_timestamp'], } - headers.update((key, value) - for key, (value, timestamp) in broker.metadata.iteritems() - if value != '' and (key.lower() in self.save_headers or - key.lower().startswith('x-container-meta-'))) + if not self.fs_object: + headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) + else: + headers.update((key, value) + for key, value in broker.metadata.iteritems() + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) + return HTTPNoContent(request=req, headers=headers) def GET(self, req): @@ -268,6 +300,7 @@ class ContainerController(object): request=req) if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) + broker = self._get_container_broker(drive, part, account, container) broker.pending_timeout = 0.1 broker.stale_reads_ok = True @@ -280,10 +313,17 @@ class ContainerController(object): 'X-Timestamp': info['created_at'], 'X-PUT-Timestamp': info['put_timestamp'], } - resp_headers.update((key, value) - for key, (value, timestamp) in broker.metadata.iteritems() - if value != '' and (key.lower() in self.save_headers or - key.lower().startswith('x-container-meta-'))) + if not self.fs_object: + resp_headers.update((key, value) + for key, (value, timestamp) in broker.metadata.iteritems() + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) + else: + resp_headers.update((key, value) + for key, value in broker.metadata.iteritems() + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) + try: path = get_param(req, 'path') prefix = get_param(req, 'prefix') @@ -414,10 +454,17 @@ class ContainerController(object): return HTTPNotFound(request=req) timestamp = normalize_timestamp(req.headers['x-timestamp']) metadata = {} - metadata.update((key, (value, timestamp)) - for key, value in req.headers.iteritems() - if key.lower() in self.save_headers or - key.lower().startswith('x-container-meta-')) + if not self.fs_object: + metadata.update((key, (value, timestamp)) + for key, value in req.headers.iteritems() + if key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-')) + else: + metadata.update((key, value) + for key, value in req.headers.iteritems() + if key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-')) + if metadata: if 'X-Container-Sync-To' in metadata: if 'X-Container-Sync-To' not in broker.metadata or \ @@ -427,8 +474,19 @@ class ContainerController(object): broker.update_metadata(metadata) return HTTPNoContent(request=req) + def plugin(self, env): + if env.get('Gluster_enabled', False): + self.fs_object = env.get('fs_object') + if not self.fs_object: + raise NoneTypeError + self.root = env.get('root') + self.mount_check = False + else: + self.fs_object = None + def __call__(self, env, start_response): start_time = time.time() + self.plugin(env) req = Request(env) self.logger.txn_id = req.headers.get('x-trans-id', None) if not check_utf8(req.path_info):
基本的な仕掛けは、Accountサーバと同じようです。container_brokerに独自のDiskDirインスタンスを返しています。
Objectサーバ
最後にObjectサーバです。
# diff -up swift/obj/server.py /usr/lib/python2.6/site-packages/swift/obj/server.py --- swift/obj/server.py 2012-03-22 09:24:46.000000000 +0000 +++ /usr/lib/python2.6/site-packages/swift/obj/server.py 2012-04-26 07:17:58.000000000 +0000 @@ -1,4 +1,5 @@ # Copyright (c) 2010-2012 OpenStack, LLC. +# Copyright (c) 2011 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +27,7 @@ from hashlib import md5 from tempfile import mkstemp from urllib import unquote from contextlib import contextmanager +from ConfigParser import ConfigParser from webob import Request, Response, UTC from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ @@ -37,16 +39,23 @@ from eventlet import sleep, Timeout, tpo from swift.common.utils import mkdirs, normalize_timestamp, \ storage_directory, hash_path, renamer, fallocate, \ - split_path, drop_buffer_cache, get_logger, write_pickle + split_path, drop_buffer_cache, get_logger, write_pickle, \ + plugin_enabled from swift.common.bufferedhttp import http_connect -from swift.common.constraints import check_object_creation, check_mount, \ - check_float, check_utf8 +if plugin_enabled(): + from swift.plugins.constraints import check_object_creation + from swift.plugins.utils import X_TYPE, X_OBJECT_TYPE, FILE, DIR, MARKER_DIR, \ + OBJECT, DIR_TYPE, FILE_TYPE +else: + from swift.common.constraints import check_object_creation + +from swift.common.constraints import check_mount, check_float, check_utf8 + from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ DiskFileNotExist from swift.obj.replicator import tpooled_get_hashes, invalidate_hash, \ quarantine_renamer - DATADIR = 'objects' ASYNCDIR = 'async_pending' PICKLE_PROTOCOL = 2 @@ -339,6 +348,9 @@ class DiskFile(object): raise raise DiskFileNotExist('Data File does not exist.') +if plugin_enabled(): + from swift.plugins.DiskFile import Gluster_DiskFile + class ObjectController(object): """Implements the WSGI application for the Swift Object Server.""" @@ -377,6 +389,17 @@ class ObjectController(object): 'expiring_objects' self.expiring_objects_container_divisor = \ int(conf.get('expiring_objects_container_divisor') or 86400) + self.fs_object = None + + def get_DiskFile_obj(self, path, device, partition, account, container, obj, + logger, keep_data_fp=False, disk_chunk_size=65536): + if self.fs_object: + return Gluster_DiskFile(path, device, partition, account, container, + obj, logger, keep_data_fp, + disk_chunk_size, fs_object = self.fs_object); + else: + return DiskFile(path, device, partition, account, container, + obj, logger, keep_data_fp, disk_chunk_size) def async_update(self, op, account, container, obj, host, partition, contdevice, headers_out, objdevice): @@ -493,7 +516,7 @@ class ObjectController(object): content_type='text/plain') if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) - file = DiskFile(self.devices, device, partition, account, container, + file = self.get_DiskFile_obj(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) if 'X-Delete-At' in file.metadata and \ @@ -548,7 +571,7 @@ class ObjectController(object): if new_delete_at and new_delete_at < time.time(): return HTTPBadRequest(body='X-Delete-At in past', request=request, content_type='text/plain') - file = DiskFile(self.devices, device, partition, account, container, + file = self.get_DiskFile_obj(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) orig_timestamp = file.metadata.get('X-Timestamp') upload_expiration = time.time() + self.max_upload_time @@ -580,12 +603,29 @@ class ObjectController(object): if 'etag' in request.headers and \ request.headers['etag'].lower() != etag: return HTTPUnprocessableEntity(request=request) - metadata = { - 'X-Timestamp': request.headers['x-timestamp'], - 'Content-Type': request.headers['content-type'], - 'ETag': etag, - 'Content-Length': str(os.fstat(fd).st_size), - } + content_type = request.headers['content-type'] + if self.fs_object and not content_type: + content_type = FILE_TYPE + if not self.fs_object: + metadata = { + 'X-Timestamp': request.headers['x-timestamp'], + 'Content-Type': request.headers['content-type'], + 'ETag': etag, + 'Content-Length': str(os.fstat(fd).st_size), + } + else: + metadata = { + 'X-Timestamp': request.headers['x-timestamp'], + 'Content-Type': request.headers['content-type'], + 'ETag': etag, + 'Content-Length': str(os.fstat(fd).st_size), + X_TYPE: OBJECT, + X_OBJECT_TYPE: FILE, + } + + if self.fs_object and \ + request.headers['content-type'].lower() == DIR_TYPE: + metadata.update({X_OBJECT_TYPE: MARKER_DIR}) metadata.update(val for val in request.headers.iteritems() if val[0].lower().startswith('x-object-meta-') and len(val[0]) > 14) @@ -626,9 +666,9 @@ class ObjectController(object): content_type='text/plain') if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) - file = DiskFile(self.devices, device, partition, account, container, - obj, self.logger, keep_data_fp=True, - disk_chunk_size=self.disk_chunk_size) + file = self.get_DiskFile_obj(self.devices, device, partition, account, container, + obj, self.logger, keep_data_fp=True, + disk_chunk_size=self.disk_chunk_size) if file.is_deleted() or ('X-Delete-At' in file.metadata and int(file.metadata['X-Delete-At']) <= time.time()): if request.headers.get('if-match') == '*': @@ -702,7 +742,7 @@ class ObjectController(object): return resp if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) - file = DiskFile(self.devices, device, partition, account, container, + file = self.get_DiskFile_obj(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) if file.is_deleted() or ('X-Delete-At' in file.metadata and int(file.metadata['X-Delete-At']) <= time.time()): @@ -744,7 +784,7 @@ class ObjectController(object): if self.mount_check and not check_mount(self.devices, device): return Response(status='507 %s is not mounted' % device) response_class = HTTPNoContent - file = DiskFile(self.devices, device, partition, account, container, + file = self.get_DiskFile_obj(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) if 'x-if-delete-at' in request.headers and \ int(request.headers['x-if-delete-at']) != \ @@ -797,9 +837,18 @@ class ObjectController(object): raise hashes return Response(body=pickle.dumps(hashes)) + def plugin(self, env): + if env.get('Gluster_enabled', False): + self.fs_object = env.get('fs_object') + self.devices = env.get('root') + self.mount_check = False + else: + self.fs_object = None + def __call__(self, env, start_response): """WSGI Application entry point for the Swift Object Server.""" start_time = time.time() + self.plugin(env) req = Request(env) self.logger.txn_id = req.headers.get('x-trans-id', None) if not check_utf8(req.path_info):
ここでは、container_brokerがオリジナルの「DiskFile」から、「Gluster_DiskFile」に置き換えられています。これは、「plugins/DiskFile.py」で定義されており、中身を見ると、オブジェクトの保存処理がGlusterFS上のファイルの書き込み処理に置き換えられています。特に、GlusterFSをつかう場合は、ディレクリごとアップロードすると、ディレクトリ構造を保持したまま、ボリューム内に格納されますが、そのあたりの処理もこの中で行われています。
まとめ
というわけで、GlusterFS対応のためにどのクラスが差し替えられているかは、だいたい分かりました。あとは差し替えられたクラスの中身を追っていけば、GlusterFS対応時の独自の動きがわかるはずです。このあたりはまたの機会に。。。。