Technology / A Unified Peer-To-Peer Database FrameworkA Unified Peer-To-Peer Database FrameworkThis essay A Unified Peer-To-Peer Database Framework is available for you on Essays24.com! Search Term Papers, College Essay Examples and Free Essays on Essays24.com - full papers database.
Autor: anton 29 October 2010
Words: 9491 | Pages: 38
Views: 138
A Unified Peer-to-Peer Database Framework and its Application for Scalable Service Discovery Wolfgang Hoschek CERN IT Division European Organization for Nuclear Research 1211 Geneva 23, Switzerland Wolfgang.Hoschek@cern.ch Abstract In a large distributed system spanning many administrative domains such as a Data- Grid, it is often desirable to maintain and query dynamic and timely information about active participants such as services, resources and user communities. However, in such a database system, the set of information tuples in the universe is partitioned over one or more distributed nodes, for reasons including autonomy, scalability, availability, performance and security. It is not obvious how to enable powerful discovery query support and collective collaborative functionality that operate on the distributed system as a whole, rather than on a given part of it. Further, it is not obvious how to allow for search results that are fresh, allowing dynamic content. It appears that a Peer-to-Peer (P2P) database network may be well suited to support dynamic distributed database search, for example for service discovery. In this paper, we take the first steps towards unifying the fields of database management systems and P2P computing, which so far have received considerable, but separate, attention. We extend database concepts and practice to cover P2P search. Similarly, we extend P2P concepts and practice to support powerful generalpurpose query languages such as XQuery and SQL. As a result, we devise the Unified Peer-to-Peer Database Framework (UPDF), which is unified in the sense that it allows to express specific applications for a wide range of data types, node topologies, query languages, query response modes, neighbor selection policies, pipelining characteristics, timeout and other scope options. 1 Introduction The next generation Large Hadron Collider (LHC) project at CERN, the European Organization for Nuclear Research, involves thousands of researchers and hundreds of institutions spread around the globe. A massive set of computing resources is necessary to support it’s data-intensive physics analysis applications, including thousands of network services, tens of thousands of CPUs, WAN Gigabit networking as well as Petabytes of disk and tape storage [1]. To make collaboration viable, it was decided to share in a global joint effort - the European DataGrid (EDG) [2, 3, 4, 5] - the data and locally available resources of all participating laboratories and university departments. Grid technology attempts to support flexible, secure, coordinated information sharing among dynamic collections of individuals, institutions and resources. This includes data sharing but also includes access to computers, software and devices required by computation and data-rich collaborative problem solving [6]. These and other advances of distributed computing are necessary to increasingly make it possible to join loosely coupled people and resources from multiple organizations. An enabling step towards increased Grid software execution flexibility is the (still immature and hence often hyped) web services vision [2, 7, 8] of distributed computing where programs are no longer configured with static information. Rather, the promise is that programs are made more flexible and powerful by querying Internet databases (registries) at runtime in order to discover information and network attached third-party building blocks. Services can advertise themselves and related metadata via such databases, enabling the assembly of distributed higher-level components. For example, a data-intensive High Energy Physics analysis application sweeping over Terabytes of data looks for remote services that exhibit a suitable combination of characteristics, including network load, available disk quota, access rights, and perhaps Quality of Service and monetary cost. More generally, in a distributed system, it is often desirable to maintain and query dynamic and timely information about active participants such as services, resources and user communities. As in a data integration system [9, 10, 11], the goal is to exploit several independent information sources as if they were a single source. However, in a large distributed database system spanning many administrative domains, the set of information tuples in the universe is partitioned over one or more distributed nodes, for reasons including autonomy, scalability, availability, performance and security. It is not obvious how to enable powerful discovery query support and collective collaborative functionality that operate on the distributed system as a whole, rather than on a given part of it. Further, it is not obvious how to allow for search results that are fresh, allowing time-sensitive dynamic content. It appears that a Peer-to-Peer (P2P) database network may be well suited to support dynamic distributed database search, for example for service discovery. The overall P2P idea is as follows. Rather than have a centralized database, a distributed framework is used where there exist one or more autonomous database nodes, each maintaining its own data. Queries are no longer posed to a central database; instead, they are recursively propagated over the network to some or all database nodes, and results are collected and send back to the client. The key problems then are: І What are the detailed architecture and design options for P2P database searching? What response models can be used to return matching query results? How should a P2P query processor be organized? What query types can be answered (efficiently) by a P2P network? What query types have the potential to immediately start piping in (early) results? How can a maximum of results be delivered reliably within the time frame desired by a user, even if a query type does not support pipelining? How can loops be detected reliably using timeouts? How can a query scope be used to exploit topology characteristics in answering a query? І Can we devise a unified P2P database framework for general-purpose query support in large heterogeneous distributed systems spanning many administrative domains? More precisely, can we devise a framework that is unified in the sense that it allows to express specific applications for a wide range of data types, node topologies, query languages, query response modes, neighbor selection policies, pipelining characteristics, timeout and other scope options? In this paper, we take the first steps towards unifying the fields of database management systems and P2P computing, which so far have received considerable, but separate, attention. We extend database concepts and practice to cover P2P search. Similarly, we extend P2P 2 concepts and practice to support powerful general-purpose query languages such as XQuery [12] and SQL [13]. As a result, we answer the above questions by proposing the so-called Unified Peer-to-Peer Database Framework (UPDF). This paper is organized as follows. Section 2 introduces a query, data and database model. The related but orthogonal concepts of (logical) link topology and (physical) node deployment model are discussed. Definitions are proposed, clarifying the notion of node, service, fat, thin and ultra-thin P2P networks, as well as the commonality of a P2P network and a P2P network for service discovery. The agent P2P model is proposed and compared with the servent P2P model. A timeout-based mechanism to reliably detect and prevent query loops is proposed. Section 3 characterizes in detail four techniques to return matching query results to an originator, namely Routed Response, Direct Response, Routed Metadata Response, and Direct Metadata Response. We discuss to what extent a given P2P network must mandate the use of any particular response mode throughout the system. Section 4 unifies query processing in centralized, distributed and P2P databases. A theory of query processing for queries that are (or are not) recursively partitionable is proposed, which directly reflects the basis of the P2P scalability potential. The definition and properties of simple, medium and complex queries are clarified with respect to recursive partitioning. It is established to what extent simple, medium and complex queries support pipelining. Section 5 proposes dynamic abort timeouts using as policy exponential decay with halving. This ensures that a maximum of results can be delivered reliably within the time frame desired by a user even if a query does not support pipelining. It is established that a loop timeout must be static. Section 6 uses the concept of a query scope to navigate and prune the link topology and filter on attributes of the deployment model. Indirect specification of scope based on neighbor selection, timeout and radius is detailed. Section 7 compares our work with existing research results. Finally, Section 8 concludes this paper. We also outline interesting directions for future research. 2 Background 2.1 Database Model and Topology A node holds a set of tuples in its database. A database may be anything that accepts queries from the query model and returns results according to the data model (see below). For example, in [2, 14] we have introduced a registry node for service discovery that maintains hyperlinks and cache content pointed to by these links. A content provider can publish a hyperlink, which in turn enables the registry (and third parties) to pull (retrieve) the current content. A remote client can query a registry in the XQuery language, obtaining a set of tuples as answer. Figure 1 illustrates such a registry node with several content providers and clients. A distributed database framework is used where there exist one or more nodes. Each node can operate autonomously. A node holds a set of tuples in its database. A given database belongs to a single node. For flexibility, the databases of nodes may be deployed in any arbitrary way (deployment model). For example, a number of nodes may reside on the same host. A node’s database may be co-located with the node. However, the databases of all nodes may just as well be stored next to each other on a single central data server. 3 Clients Registry Content Providers Query (Re)publish & Retrieve Figure 1: Clients, Registry and Content Providers [2]. The set of tuples in the universe is partitioned over the nodes, for reasons including autonomy, scalability, availability, performance and security. Nodes are interconnected with links in any arbitrary way. A link enables a node to query another node. A link topology describes the link structure among nodes. The centralized model has a single node only. For example, in a service discovery system, a link topology could tie together a distributed set of administrative domains, each hosting a registry node holding descriptions of services local to the domain. Several link topology models covering the spectrum from centralized models to fine-grained fully distributed models can be envisaged, among them single node, star, ring, tree, semi hierarchical as well as graph models. Real-world distributed systems often have a more complex organization than any simple topology. They often combine several topologies into a hybrid topology. Nodes typically play multiple roles in such a system. For example, a node might have a centralized interaction with one part of the system, while being part of a hierarchy in another part [15]. Figure 2 depicts some example topologies. Figure 2: Example Link Topologies [15]. Clearly not all nodes in a topology are equal. For example, node bandwidth may vary by four orders of magnitude (50Kbps-1000Mbps), latency by six orders of magnitude (10us-10s), and availability by four orders of magnitude (1%-99.99%). We stress that it is by no means justifiable to advocate the use of graph topologies irrespective of application context and requirements. Depending on the context, all topologies have their merits and drawbacks in terms of scalability, reliability, availability, content coherence, fault tolerance, security and maintainability. However, from the structural perspective, the graph topology is the most general one, being able to express all other conceivable topolo- 4 gies. Since our goal is to support queries that are generally independent of the underlying topology, this paper discusses problems arising in graph topologies. A problem solution that applies to a graph also applies to any other topology. Of course, a simpler or more efficient solution may exist for any particular topology. The results of this paper help enable the use of graph topologies where appropriate, they do not require or mandate the use of them. 2.2 Query and Data Model We have introduced a dynamic data model for discovery in [2, 14]. It is a general-purpose data model that operates on tuples. Briefly, a tuple is an annotated multi-purpose soft state data container that may contain a piece of arbitrary content and allows for refresh of that content at any time. Content can be structured or semi-structured data in the form of any arbitrary well-formed XML [16] document or fragment. An individual tuple may, but need not, have a schema (XML Schema [17]), in which case it must be valid according to the schema. All tuples may, but need not, share a common schema. This flexibility is important for integration of heterogeneous content. Examples for content include a service description expressed in WSDL [18], a file, picture, current network load, host information, stock quotes, etc. Discussion in this paper often uses examples where the term tuple is substituted by the more concrete term service description. Consider the following example tuple set: XML getServiceDescription() XML query(XQuery query) Our general-purpose query model is intended for read-only search. Insert, update and delete capabilities are not required and not addressed. We have defined these capabilities elsewhere [2, 14]. A query is formulated against a global database view and is insensitive to link topology and deployment model. In other words, to a query the set of all tuples appears as a single homogenous database, even though the set may be (recursively) partitioned across many nodes and databases. This means that in a relational or XML environment, at the global level, the set of all tuples appears as a single, very large, table or XML document, 5 respectively. The query scope, on the other hand, is used to navigate and prune the link topology and filter on attributes of the deployment model. Searching is primarily guided by the query. Scope hints are used only as necessary. A query is evaluated against a set of tuples. The set, in turn, is specified by the scope. Conceptually, the scope is the input fed to the query. The query scope is a set and may contain anything from all tuples in the universe to none. Consider the example discovery queries: І Simple Query: Find all (available) services. І Simple Query: Find all services that implement a replica catalog service interface and that CMS members are allowed to use, and that have an HTTP binding for the replica catalog operation “XML getPFNs(String LFN)â€. І Simple Query: Find all CMS replica catalog services and return their physical file names (PFNs) for a given logical file name (LFN); suppress PFNs not starting with “ftp://â€. І Medium Query: Return the number of replica catalog services. І Complex Query: Find all (execution service, storage service) pairs where both services of a pair live within the same domain. (Job wants to read and write locally). For a detailed discussion of a wide range of discovery queries, their representation in the XQuery language, as well as detailed motivation and justification, see our prior studies [2]. 2.3 Definitions - Service and Node Let us clarify the notion of node and service. A service exposes some functionality in the form of service interfaces to remote clients. Example services are an echo service, a job scheduler, a replica catalog, a time service, a gene sequencing service and a language translation service. A node is a service that exposes at least functionality (i.e. service interfaces) for publication and P2P queries. Examples are a hyper registry as introduced in our prior studies [2, 14], a Gnutella [19] file sharing node and an extended job scheduler. Put another way, any service that happens to support publication and P2P query interfaces is a node. This implies that every node is a service. It does not imply that every service is a node. Only nodes are part of the P2P topology, while services are not, because they do not support the required interfaces. Usually, most services are not nodes. However, in some networks most or all services are nodes. І Most services are not nodes. We propose to speak of a fat P2P network. Typically, only one or a few large and powerful services are nodes, enabling publication and P2P queries. An example is a backbone network of 10 large registry nodes that ties together 10 administrative domains, each hosting a registry node to which local domain services can publish to be discovered, as depicted in Figure 3 (left). The services (shown small on the edges) are not part of the network. І Most or all services are nodes. We propose to speak of a thin or ultra-thin P2P network. An example is a network of millions of small services, each having some proprietary core functionality (e.g. replica management optimization, gene sequencing, multi-lingual translation), actively using the network for searching (e.g. to discover replica catalogs, remote gene mappers or language dictionary services), but also actively contributing to its search capabilities, as depicted in Figure 3 (right). 6 Figure 3: Fat (left) and Ultra-thin (right) Peer-to-Peer network. There is no difference between these scenarios in terms of technology. Discussion in this paper is applicable to ultra-thin, thin and fat P2P networks. For simplicity of exposition, examples illustrate ultra-thin networks (every service is a node). 2.4 P2P network vs. P2P network for service discovery In any kind of P2P network, nodes may publish themselves to other nodes, thereby forming a topology. In a P2P network for service discovery, services and other content providers may publish their service link and content links to nodes. Because nodes are services, also nodes may publish their service link (and content links) to other nodes, thereby forming a topology. In any kind of P2P network, a node has a database or some kind of data source against which queries are applied. In a P2P network for service discovery, this database happens to be the publication database. In other words, publication enables topology construction and at the same time constructs the database to be searched. Discussion in this paper is applicable to any kind of P2P network, while the examples illustrate service discovery. 2.5 Agent P2P Model Queries in what we propose as the agent P2P model flow as follows. When any originator wishes to search the P2P network with some query, it sends the query to a single node. We call this entry point the agent node of the originator (i.e. its service gateway). The agent applies the query to its local database and returns matching results; it also forwards the query to its neighbor nodes. These neighbors return their local query results; they also forward the query to their neighbors, and so on. For flexibilty, the protocol between originator and agent is left unspecified. The agent P2P model is a hybrid of centralization and decentralization. It allows fully decentralized infrastructures, yet also allows seamless integration of centralized client-server computing into an otherwise decentralized infrastructure. An originator may embed its agent in the same process (decentralized). However, the originator may just as well choose a remote node as agent (centralized), for reasons including central control, reliability, continuous availability, maintainability, security, accounting and firewall restrictions on incoming connections for originator hosts. For example, a simple HTML GUI may be sufficient to originate queries that are sent to an organization’s agent node. Note that only nodes are part of the P2P topology, while the originator is not, because it does not possess the functionality of a node. The agent P2P model provides location and distribution transparency to originators. An originator is unaware that (and how) database tuples are partitioned among nodes. It only communicates with an agent black box. 7 2.6 Servent P2P Model In contrast, in the servent P2P model (e.g. Gnutella) there exists no agent concept, but only the concept of a servent. Put another way, the agent is always embedded into the originator process, forming a monolithic servent. This model is decentralized, and it does not allow for some degree of centralization. This restriction appears unjustified. More importantly, it seriously limits the applicability of P2P computing. For example, the Gnutella servent model could not cope with the large connectivity spectrum of the user community, ranging from very low to very high bandwidth. As the Gnutella network grew, it became fragmented because nodes with low bandwidth connections could not keep up with traffic. The idea of requiring all functionality to exist at the very edge of the network had to be reconsidered. Eventually, the situation was patched by rendering dumb the low bandwidth servents on the (slow) edges of the network. The notion of centralized reflectors (Gnutella) and super-peers (Morpheus) was (re) invented. A reflector is a powerful high bandwidth gateway for many remote originators with low bandwidth dialup connections. It volunteers to take over the functionality and shield traffic that would normally be carried via low bandwidth servents. However, servents still keep data locally. The agent P2P model naturally covers centralized and decentralized hybrids. Here a powerful node may act as agent for many remote originators. In the remainder of this paper, we follow the agent P2P model and do not use the term servent anymore. The terms originator, node and agent (node) are used instead. 2.7 Loop Detection Query shipping is used to route queries through the nodes of the topology. A query remains identical during forwards over hops (unless rewritten or split by a query optimizer). The very same query may arrive at a node multiple times, along distinct routes, perhaps in a complex pattern. Loops in query routes must be detected and prevented. Otherwise, unnecessary or endless multiplication of workloads would be caused. Figure 3 depicts topologies with the potential for a query to become trapped in infinite loops. To enable loop detection, an originator attaches a different transaction identifier to each query, which is a universally unique identifier (UUID). The transaction identifier always remains identical during query forwarding over hops. A node maintains a state table of recent transaction identifiers and returns an error whenever a query is received that has already been seen. For example, this approach is used in Gnutella. In practice, it is sufficient for the UUID to be unique with exceedingly large probability, suggesting the use of a 128 bit integer computed by a cryptographic hash digest function such as MD5 [20] or SHA-1 [21] over message text, originator IP address, current time and a random number. 3 Routed vs. Direct Response, Metadata Responses We propose to distinguish four techniques to return matching query results to an originator: Routed Response, Direct Response, Routed Metadata Response, and Direct Metadata Response, as depicted in Figure 4. Let us examine the main implications with a Gnutella use case. A typical Gnutella query such as “Like a virgin†is matched by some hundreds of files, most of them referring to replicas of the very same music file. Not all matching files are identical because there exist multiple related songs (e.g. remixes, live recordings) and multiple versions of a song (e.g. with different sampling rates). A music file has a size of at 8 least several megabytes. Many thousands of concurrent users submit queries to the Gnutella network. A large fraction of users lives on slow and unreliable dialup connections. 5 4 6 3 7 2 6 5 3 4 2 Query Result set b) Direct Response without Invitation 7 6 5 3 4 2 d) Routed Response with Metadata (RRM) 9 10 11 12 Data Data Query 5 4 6 3 2 e) Direct Metadata Response without Invitation Node Originator Agent Node 1 8 8 9 10 11 1 7 1 8 1 7 4 8 3 2 c) Direct Response with Invitation (DR) 1 7 6 5 9 10 Invitation a) Routed Response (RR) 11 4 8 3 2 f) Direct Metadata Response with Invitation (DRM) 1 7 6 5 9 10 13 12 11 14 15 Figure 4: Peer-to-Peer Response Modes. І Routed Response. (Figure 4-a). Results are propagated back into the originator along the paths on which the query flowed outwards. Each (passive) node returns to its (active) client not only its own local results but also all remote results it receives from neighbors. The response protocol is tightly coupled to the query protocol. Routing messages through a logical overlay network of P2P nodes is much less efficient than routing through a physical network of IP routers [22]. Routing back even a single Gnutella file (let alone all results) for each query through multiple nodes would consume large amounts of overall system bandwidth, most likely grinding Gnutella to a screeching halt. As the P2P network grows, it is fragmented because nodes with low bandwidth connections cannot keep up with traffic [23]. Consequently, routed responses are not well suited for file sharing systems such as Gnutella. In general, overall economics dictate that routed responses are not well suited for systems that return many and/or large results. І Direct Response With and Without Invitation. To better understand the underlying idea, we first introduce the simpler variant, which is Direct Response Without Invitation (Figure 4-b). Results are not returned by routing back through intermediary nodes. Each (active) node that has local results sends them directly to the (passive) agent, which combines and hands them back to the originator. Response traffic does not travel through the P2P system. It is offloaded via individual point-to-point data transfers on the edges of the network. The response push protocol can be separated from the query protocol. For example, HTTP, FTP or other protocols may be used for response push. Let us examine the main implications with a use case. 9 As already mentioned, a typical Gnutella query such as “Like a virgin†is matched by some hundreds of files, most of them referring to replicas of the very same music file. For Gnutella users it would be sufficient to receive just a small subset of matching files. Sending back all such files would unnecessarily consume large amounts of direct bandwidth, most likely restricting Gnutella to users with excessive cheap bandwidth at their disposal. Note however, that the overall Gnutella system would be only marginally affected by a single user downloading, say, a million music files, because the largest fraction of traffic does not travel through the P2P system itself. In general, individual economics dictate that direct responses without invitation are not well suited for systems that return many equal and/or large results, while a small subset would be sufficient. A variant based on invitation (Figure 4-c) softens the problem by inverting control flow. Nodes with matching files do not blindly push files to the agent. Instead they invite the agent to initiate downloads. The agent can then act as it sees fit. For example, it can filter and select a subset of data sources and files and reject the rest of the invitations. Due to its inferiority, the variant without invitation is not considered any further. In the remainder of this thesis, we use the term Direct Response as a synonym for Direct Response With Invitation. І Routed Metadata Response and Direct Metadata Response. Here interaction consists of two phases. In the first phase, routed responses (Figure 4-d) or direct responses (Figure 4-e,f)) are used. However, nodes do not return data results in response to queries, but only small metadata results. The metadata contains just enough information to enable the originator to retrieve the data results and possibly to apply filters before retrieval. In the second phase, the originator selects, based on the metadata, which data results are relevant. The (active) originator directly connects to the relevant (passive) data sources and asks for data results. Again, the largest fraction of response traffic does not travel through the P2P system. It is offloaded via individual point-to-point data transfers on the edges of the network. The retrieval protocol can be separated from the query protocol. For example, HTTP, FTP or other protocols may be used for retrieval. The routed metadata response approach is used by file sharing systems such as Gnutella. A Gnutella query does not return files; it just returns an annotated set of HTTP URLs. The originator connects to a subset of these URLs to download files as it sees fit. Another example is a service discovery system where the first phase returns a set of service links instead of full service descriptions. In the second phase, the originator connects to a subset of these service links to download service descriptions as it sees fit. Another example is a referral system where the first phase uses routed metadata response to return the service links of the set of nodes having local matching results (“Go ask these nodes for the answerâ€). In the second phase, the originator or agent connects directly to a subset of these nodes to query and retrieve result sets as it sees fit. This variant avoids the “invitation storm†possible under Direct Response. Referrals are also known as redirections. A metadata response mode with a radius scope of zero can be used to implement the referral behavior of the Domain Name System (DNS). For details, see Section 7. 10 3.1 Comparison of Response Mode Properties Let us compare the properties of the various response models. The following abbreviations are used. RR . . . Routed Response, RRM . . . Routed Response with metadata, RRX . . . Routed Response with and without metadata, DR . . . Direct Response, DRX . . . Direct Response with and without metadata. І Distribution and Location Transparency. In the response models without metadata, the originator is unaware that (and how) tuples are partitioned among nodes. In other words, these models are transparent with respect to distribution and location. Metadata responses require an originator to contact individual data providers to download full results, and hence are not transparent. І (Efficient) Query Support. All models can answer any query. Both simple and medium queries can be answered efficiently by RRX and DRX, whereas a complex query cannot be answered efficiently. (Justification of this result is deferred to Section 4). Transmission of duplicate results unnecessarily wastes bandwidth. RRX can eliminate duplicates already along the query path, whereas DRX can only do so in the final stage, at the agent. Similarly, maximum result set size limiting is more efficient under RRX because superfluous results can already be discarded along the query path. І Economics. RR results travel multiple hops rather than just a single hop. This leads to poor overall economics. The effect is more pronounced for large results, as is the case for music files. RR can also lead to unfortunate individual economics. A user that induces few or undemanding queries consumes few system resources. However, if many heavy results for queries from other parties are routed back via such a user’s node, it can end up in a situation where it pays for large amounts of bandwidth and gives it away for free to anonymous third parties. For a given user, the costs may drastically outweigh the gains. One could perhaps devise appropriate authorization, quality of service and flow control policies. The unsatisfying economic situation is similar to the one of physical IP routers on the Internet, which also forward traffic from and to third parties. In any case, there remains the fact that results travel multiple hops rather than just one. In principle, RRM has the same poor economic properties as RR. However, if metadata is very small in size (e.g. as in Gnutella), then the incurred processing and transmission cost may be acceptable. For example, Gnutella nodes just route back an annotated set of HTTP URLs as metadata. Under DRX, result traffic does not travel through the P2P system. Retrieving results is a deal between just two parties, the provider and the consumer. Consequently, individual economics are controllable and predictable. A user is not charged much for other peoples workloads, unless he explicitly volunteers. І Number of TCP Connections at Originator. Under RR and DR, just one (or no) TCP connection is required at the originator, whereas metadata modes require a connection per (selected) data provider. The more data sources are selected, the more heavyweight data retrieval becomes. Metadata modes can encounter serious latency limitations due to the very expensive nature of secure (and even insecure) TCP connection setup. Hence, the approach does not scale well. However, for many use cases this may not be a problem because a client always selects only a small number of data providers (e.g. 10). 11 І Number of TCP Connections at Agent. Usually a node has few neighbors (five to hundreds). Under RRX, one TCP connection per neighbor is required at an agent. Under DRX, additionally a connection per data provider is required. Again, the more data providers exist, the more heavyweight data retrieval becomes. DRX can encounter serious latency limitations due to the very expensive nature of secure (and even insecure) TCP connection setup. For example, a query that finds the total number of services in the domain cern.ch should use RRX. Under DRX, it may generate responses from every single node in that domain. Consequently, an agent can face an invitation storm resembling a denial of service attack. On the other hand, the potential to exploit parallelism is large. All data providers can be handled independently in parallel. І Latency. If a query is of a type that cannot support pipelining (see Section 4.4), the latency for the first result to arrive at the originator is always poor. For a pipelined query, the latency for the first result to arrive is small under DRX, because a response travels a single hop only. Under RRX, a response travels multiple hops, and latency increases accordingly. However, the cost of TCP connection setup at originator and/or agent can invert the situation. Under RR, the cost of TCP connection setup to nodes is paid only once (at node publication time), because connections can typically be kept alive until node deregistration. This is not the case under the other response modes. І Caching. Caching is a technique that trades content freshness for response time. RRX can potentially support caching of content from other nodes at intermediate nodes because response flow naturally concentrates and integrates results from many nodes. DRX nodes return results directly and independently, and hence cannot efficiently support caching. І Trust Delegation to Unknown Parties. Query and result traffic are subject to security attacks. It is not sufficient to establish a secure mutually authenticated channel between any two nodes because malicious nodes can divert routes or modify queries and results. Since a query is almost always routed through multiple hops, many of which are unknown to the agent, we believe that indirect delegation of trust to unknown parties cannot practically be avoided. Security sensitive applications should choose DRX because at least the retrieval of results occurs in a predictable manner between just two parties that can engage in secure mutual authentication and authorization. RRM merely delegates trust on metadata results, but not on full results. 3.2 Response Mode Switches and Shifts Although from the functional perspective all response modes are equivalent, clearly no mode is optimal under all circumstances. The question arises as to what extent a given P2P network must mandate the use of any particular response mode throughout the system. Observe that nodes are autonomous and defined by their interface only. A node does not “see†what kind of response mode (or technology in general) its neighbors use in answering a query. As long as query semantics are preserved, the node does not care. Consequently, we propose that response modes can be mixed by switches and shifts, in arbitrary permutations, as depicted in Figure 5. І Routed Response ) Direct Response switch. (Figure 5-a). Starting from the agent, Routed Response is used initially. The central node (“footballâ€) receives a query 12 b) DR --> RR Switch c) DR --> DR Shift == == a) RR --> DR Switch == == == == == == Query Result set Node Originator Agent Node Figure 5: Response Mode Switches and Shifts. from the agent. For some reason, it decides to answer the query using Direct Response. The response flow that would have been taken under Routed Response is shown crossed out. І Direct Response ) Routed Response switch. (Figure 5-b). Initially, Direct Response is used. However, the “football†decides to answer the query using Routed Response. І Direct Response ) Direct Response shift. (Figure 5-c). Initially, Direct Response is used. The football decides to continue using Direct Response but shift the target of responses. To its own neighbors the football declares itself as (a fake) agent. The responses that would have flowed into the real agent now flow back into the football, and then from the football to the real agent. Note again that this does not break semantics because the football behaves as if the results would have been obtained from its own local database. The real agent receives the same results, but solely from the football. І Routed Response ) Routed Response shift. At each hop, the response target is shifted to be the current node. Interestingly, this kind of shift is at the very heart of the definition of routed response. The classification introduced here shows that this is not the only possible approach. A node may choose its response mode based on a local and autonomous assessment of the advantages and disadvantages involved. However, because of its context knowledge, often the client (e.g. originator) is in the best position to judge what kind of response mode would be most suitable. Therefore, it is useful to allow specifying as part of the query a hint that indicates the preferred response mode (routed or direct). 4 Query Processing In a distributed database system, there exists a single local database and zero or more neighbors. A classic centralized database system is a special case where there exists a single local database and zero neighbors. From the perspective of query processing, a P2P database system has the same properties as a distributed database system, in a recursive structure. Hence, we propose to organize the P2P query engine like a general distributed query engine [24, 25]. A given query involves a number of operators (e.g. SELECT, UNION, CONCAT, 13 SORT, JOIN, GROUP, SEND, RECEIVE, SUM, MAX, MAXSETSIZE, IDENTITY) that may or may not be exposed at the query language level. For example, the SELECT operator takes a set and returns a new set with tuples satisfying a given predicate. The UNION operator computes the union of two or more sets. The CONCAT operator concatenates the elements of two or more sets into a list of arbitrary order (without eliminating duplicates). A list can be emulated by a set using distinct surrogate keys. The MAXSETSIZE operator limits the maximum result set size. The IDENTITY operator returns its input set unchanged. The semantics of an operator can be satisfied by several operator implementations, using a variety of algorithms, each with distinct resource consumption, latency and performance characteristics. The query optimizer chooses an efficient query execution plan, which is a tree plugged together from operators. In an execution plan, a parent operator consumes results from child operators. Query execution is driven from the (final) root consumer in a top down fashion. For example, a request for results from the root operator may in turn lead to a request for results from child operators, which in turn request results from their own child operators, and so on. By means of an execution plan, an optimizer can move a query to data or data to a query. In other words, queries and sub queries can be executed locally or at remote nodes. Performance tradeoffs of query shipping, data shipping and hybrid shipping are discussed in [26]. 4.1 Template Query Execution Plan Recall that Section 2.2 proposed a query model. Any query Q within our query model can be answered by an agent with the template execution plan A depicted in Figure 6. The plan applies a local query L against the tuple set of the local database. Each neighbor (if any) is asked to return a result set for (the same) neighbor query N. Local and neighbor result sets are unionized into a single result set by a unionizer operator U that must take the form of either UNION or CONCAT. A merge query M is applied that takes as input the result set and returns a new result set. The final result set is sent to the client, i.e. another node or an originator. L ... Local Query M ... Merge Query Q ... User Query N ... Neighbor Query SEND M U L ... RECEIVEk N RECEIVE1 N A U ... Unionizer Operator A ... Agent Plan Figure 6: Template Execution Plan. 4.2 Centralized Execution Plan To see that indeed any query against any kind of database system can be answered within this framework we derive a simple centralized execution plan that always satisfies the semantics of any query Q. The plan substitutes specific subplans into the template plan A, leading to 14 distinct plans for the agent node (Figure 7-a) and neighbors nodes (Figure 7-b). In the case of XQuery and SQL, parameters are substituted as follows: XQuery SQL A: M = Q U = UNION L = "RETURN /" N' = N N: M = IDENTITY U = UNION L = "RETURN /" N' = N A: M = Q U = UNION L = "SELECT *" N' = N N: M = IDENTITY U = UNION L = "SELECT *" N' = N Q ... User Query N ... Neighbor Query Plan a) Agent Query Plan SEND Q UNION RETURN / ... RECEIVEk N RECEIVE1 N A SEND UNION RETURN / ... RECEIVEk N RECEIVE1 N N IDENTITY b) Neighbor Query Plan A ... Agent Query Plan Figure 7: Centralized Execution Plan. The Agent Query Plan (a) fetches all raw tuples from the local and all remote databases, unionizes the result sets, and then applies the query Q. Neighbors are handed a rewritten neighbor query (b) that recursively fetches all raw tuples, and returns their union. In other words, the agent’s plan A fetches all raw tuples from the local and all remote databases, unionizes the result sets, and then applies the query Q. Neighbors are handed a rewritten neighbor query N that recursively fetches all raw tuples, and returns their union. The neighbor query N is recursively partitionable (see below). The same centralized plan works for routed and direct response, both with and without metadata. Under direct response, a node does forward the query N, but does not attempt to receive remote result sets (conceptually empty result sets are delivered). The node does not send a result set to its predecessor, but directly back to the agent. In a distributed database system, there exists a single local database and zero or more neighbors. A classic centralized database system is a special case where there exists a single local database and zero neighbors. From the perspective of query processing, a P2P database system has the same properties as a distributed database system, in a recursive structure. Consequently, the very same centralized execution plan applies to any kind of database system; and any query within our query model can be answered. The centralized execution plan can be inefficient because potentially large amounts of base data have to be shipped to the agent before locally applying the user’s query. However, sometimes this is the only plan that satisfies the semantics of a query. This is always the case 15 for a complex query. A more efficient execution plan can sometimes be derived (as proposed below). This is always the case for a simple and medium query. 4.3 Recursively Partitionable Query A P2P network can be efficient in answering queries that are recursively partitionable. A query Q is recursively partitionable if, for the template plan A, there exists a merge query M and a unionizer operator U to satisfy the semantics of the query Q assuming that L and N are chosen as L = Q and N = A. In other words, a query is recursively partitionable if the very same execution plan can be recursively applied at every node in the P2P topology. The corresponding execution plan is depicted in Figure 8. M ... Merge Query Q ... User Query SEND M U Q ... RECEIVEk A RECEIVE1 A A U ... Unionizer Operator A ... Agent Plan Figure 8: Execution Plan for Recursively Partitionable Query. The input and output of a merge query have the same form as the output of the local query L. Query processing can be parallelized and spread over all participating nodes. Potentially very large amounts of information can be searched while investing little resources such as processing time per individual node. The recursive parallel spread of load implied by a recursively partitionable query is the basis of the massive P2P scalability potential. However, query performance is not necessarily good, for example due to high network I/O costs. Now we are in the position to clarify the definition of simple, medium and complex queries. І Simple Query. A query is simple if it is recursively partitionable using M = IDENTITY, U = UNION. І Medium Query. A query is a medium query if it is not simple, but it is recursively partitionable. І Complex Query. A query is complex if it is not recursively partitionable. It is an interesting open question (at least to us) if a query processor can automatically determine whether a correct merge query and unionizer exist, and if so, how to choose them. Related problems have been studied extensively in the context of distributed and parallel query processing as well as query rewriting for heterogeneous and homogenous relational database systems [24, 27, 25]. Distributed XQueries are an emerging field [28]. For simplicity, in the remainder of this paper we assume that the user explicitly provides M and U along with a query Q. If M and U are not provided as part of a query to any given node, the node acts defensively by assuming that the query is not recursively partitionable. Choosing M and U is straightforward for a human being. Consider for example the following medium XQueries. 16 І Return the number of replica catalog services. The merge query computes the sum of a set of numbers. The unionizer is CONCAT. Q = RETURN count(/tupleset/tuple/content/service[interface/@type="repcat"]) M = RETURN sum(/tupleset/tuple) U = CONCAT І Find the service with the largest uptime. Q=M= RETURN (/tupleset/tuple[@type="service"] SORTBY (./@uptime)) [last()] U = UNION Note that the query engine always encapsulates the query output with a tupleset root element. A query need not generate this root element as it is implicitly added by the environment. A custom merge query can be useful. For example, assume that each individual result tuple is tagged with a timestamp indicating the time when the information expires and ceases to be valid. A custom merge query can ignore all results that have already expired. Alternatively, it can ignore all results but the one with the most recent timestamp. As another example, a custom merge query can cut off all but the first 100 result tuples. Such a result set size limiting feature (maxResults) attempts to make bandwidth consumption more predictable. 4.4 Pipelining The success of many applications depends on how fast they can start producing initial/relevant portions of the result set rather than how fast the entire result set is produced [29]. This is particularly often the case in distributed systems where many nodes are involved in query processing, each of which may be unresponsive for many reasons. The situation is even more pronounced in systems with loosely coupled autonomous nodes. Often an originator would be happy to already do useful work with one or a few early results, as long as they arrive quickly and reliably. Results that arrive later can be handled later, or are ignored anyway. For example, in an interactive session, a typical Gnutella user is primarily interested in being able to start some music download as soon as possible. The user is quickly disappointed when not a single result for a query arrives in less than four seconds. Choosing among 1000 species of “Like a virgin†is interesting, but helps little if it comes at the expense of, say, one minute idle waits. As another example, consider a user that wants to discover schedulers to submit a job. It is interesting to discover that 100 schedulers are available, but the primary requirement is to find at least three quickly and reliably. Database theory and practice establishes that query execution engines in general, and distributed query execution engines in particular should be based on iterators [24]. An operator corresponds to an iterator class. Iterators of any kind have a uniform interface, namely the three methods open(), next() and close(). In an execution plan, a parent iterator consumes results from child iterators. Query execution is driven from the (final) root consumer in a top down fashion. For example, a call to next() may call next() on child 17 10 3 2 4 1 6 9 7 8 10 5 11 9 4 2 3 1 5 8 6 7 Query Result set Node Originator Agent Node Figure 9: Non-Pipelined (left) and Pipelined Query (right). Query Type Supports Pipelining? Simple Query Yes Medium Query Maybe Complex Query Typically No Table 1: Pipelining Support of Query Types. iterators, which in turn call next() on their child iterators, and so on. For efficiency, the method next() can be asked to deliver several results at once in a so-called batch. Semantics are as follows: “Give me a batch of at least N and at most M results†(less than N results are delivered when the entire query result set is exhausted). For example, the SEND and RECEIVE network communication operators (iterators) typically work in batches. The monotonic semantics of certain operators such as SELECT, UNION, CONCAT, SEND, RECEIVE, MAXSETSIZE, IDENTITY allow that operator implementations consume just one or a few child results on next(). In contrast, the non-monotonic semantics of operators such as SORT, GROUP, MAX, some JOIN methods, etc. require that operator implementations consume all child results already on open() in order to be able to deliver a result on the first call to next(). Since the output of these operators on a subset of the input is not, in general, a subset of the output on the whole input, these operators need to see all of their input before they produce the correct output. This does not break the iterator concept but has important latency and performance implications. Whether the root operator of an agent exhibits a short or long latency to deliver to the originator the first result from the result set depends on the query operators in use, which in turn depend on the given query. In other words, for some query types the originator has the potential to immediately start piping in results (at moderate performance rate), while for other query types it must wait for a long time until the first result becomes available (the full result set arrives almost at once, however). A query (an operator implementation) is said to be pipelined if it can already produce at least one result tuple before all input tuples have been seen. Otherwise, a query (an operator) is said to be non-pipelined. Figure 9 depicts examples for both modes. Simple queries do support pipelining (e.g. Gnutella queries). Medium queries may or may not support pipelining, whereas complex queries typically do not support pipelining. The properties are summarized in Table 1. Bear in mind, that even if a query can be pipelined, the messaging model and underlying network layers in use may not support pipelining, in which case a result set has to be delivered with long latency in a single large batch. Finally note that non-pipelining delivery without a dynamic abort timeout feature is highly unreliable due to the so-called simultaneous abort 18 problem (see below). If only one of the many nodes in the query path fails to be responsive for whatever reasons, all other nodes in the chain are waiting, eventually time out at the same time, and the originator receives not even a single result. 5 Static Loop Timeout and Dynamic Abort Timeout Clearly there comes a time when a user is no longer interested in query results, no matter whether any more results might be available. The query roaming the network and its response traffic should fade away after some time. In addition, P2P systems are well advised to attempt to limit resource consumption by defending against runaway queries roaming forever or producing gigantic result sets, either unintended or malicious. To address these problems, an absolute abort timeout is attached to a query, as it travels across hops. An abort timeout can be seen as a deadline. Together with the query, a node tells a neighbor “I will ignore (the rest of ) your result set if I have not received it before 12:00:00 today.†The problem, then, is to ensure that a maximum of results can be delivered reliably within the time frame desired by a user. The value of a static timeout remains unchanged across hops, except for defensive modification in flight triggered by runaway query detection (e.g. infinite timeout). In contrast, it is intended that the value of a dynamic timeout be decreased at each hop. Nodes further away from the originator may time out earlier than nodes closer to the originator. 5.1 Dynamic Abort Timeout A static abort timeout is entirely unsuitable for non-pipelined result set delivery, because it leads to a serious reliability problem, which we propose to call simultaneous abort timeout. If just one of the many nodes in the query path fails to be responsive for whatever reasons, all other nodes in the path are waiting, eventually time out and attempt to return at least a partial result set. However, it is impossible that any of these partial results ever reach the originator, because all nodes time out simultaneously (and it takes some time for results to flow back). For example, the agent times out and attempts to return its local partial results to the originator. After that, all partial results flowing to the agent from neighbors, and their neighbors, etc. are discarded – it is already too late. However, even the agent cannot deliver results to the originator because the originator has already timed out (shortly) before the results arrive. Hence, the originator receives not even a single result if just one of the many nodes in the query path fails to be responsive. To address the simultaneous abort timeout problem, we propose dynamic abort timeouts. Under dynamic abort timeout, nodes do not time out at the same time. Instead, nodes further 10 9 4 2 3 1 5 8 6 7 t+4 t+2 t+1 t+0.5 Query Result set Node Originator Agent Node Figure 10: Dynamic Abort Timeout. 19 away from the originator time out earlier than nodes closer to the originator. This provides some safety time window for the partial results of any node to flow back across multiple hops to the originator. Together with the query, a node tells a neighbor “I will ignore (the rest of ) your result set if I have not received it before 12:00:00 today. Do whatever you think is appropriate to meet this deadlineâ€. Intermediate nodes can and should adaptively decrease the timeout value as necessary, in order to leave a large enough time window for receiving and returning partial results subsequent to timeout. Observe that the closer a node is to the originator, the more important it is (because if it cannot meet its deadline, results from a large branch are discarded). Further, the closer a node is to the originator, the larger is its response and bandwidth consumption. Thus, as a good policy to choose the safety time window, we propose exponential decay with halving. The window size is halved at each hop, leaving large safety windows for important nodes and tiny window sizes for nodes that contribute only marginal result sets. Also, taking into account network latency and the time it takes for a query to be locally processed, the timeout is updated at each hop N according to the following recurrence formula: timeoutN = currenttimeN + timeoutNÐŽ1 ÐŽ currenttimeN 2 (1) Consider for example Figure 10. At time t the originator submits a query with a dynamic abort timeout of t+4 seconds. In other words, it warns the agent to ignore results after time t+4. The agent in turn intends to safely meet the deadline and so figures that it needs to retain a safety window of 2 seconds, already starting to return its (partial) results at time t+2. The agent warns its own neighbors to ignore results after time t+2. The neighbors also intend to safely meet the deadline. From the 2 seconds available, they choose to allocate 1 second, and leave the rest to the branch remaining above. Eventually, the safety window becomes so small that a node can no longer meet a deadline on timeout. The results from the unlucky node are ignored, and its partial results are discarded. However, other nodes below and in other branches are unaffected. Their results survive and have enough time to hop all the way back to the originator before time t+4. Instead of ignoring results which miss their deadline a node may also close the connection. This may, but need not, be harmless. The connection is typically simply reestablished as soon as a new query is to be forwarded. However, in an attempt to educate good P2P citizens, a node may choose to stop propagating or deny service to neighbors that repeatedly do not meet abort deadlines. For example, a strategy may use an exponential back-off algorithm. Note that as long as a node obeys its timeout it can independently implement any timeout policy it sees fit for its purposes without regard to the policy implemented at other nodes. If a node misbehaves or maliciously increases the abort timeout, it risks not being able to meet its own deadline, and is likely soon dropped or denied service. Such healthy measures move less useful nodes to the edge of the network where they cause less harm, because their number of topology links tends to decrease. To summarize, under non-pipelined result set delivery, dynamic abort timeouts using exponential decay with halving ensure that a maximum of results can be delivered reliably within the time frame desired by a user. We speculate that dynamic timeouts could also incorporate sophisticated cost functions involving latency and bandwidth estimation and/or economic models. 20 5.2 Static Loop Timeout Interestingly, a static loop timeout is required in order to fully preserve query semantics. A dynamic timeout (e.g. the dynamic abort timeout) is unsuitable to be used as loop timeout. Otherwise, a problem arises that we propose to call non-simultaneous loop timeout. Recall from Section 2.7 that the same query may arrive at a node multiple times, along distinct routes, perhaps in a complex pattern. Loops in query routes must be detected and prevented. Otherwise, unnecessary or endless multiplication of workloads would be caused. To this end, a node maintains a state table of recent transaction identifiers and associated loop timeouts and returns an error whenever a query is received that has already been seen (according to the state table). Before the loop timeout is reached, the same query can potentially arrive multiple times, along distinct routes. On loop timeout, a node may “forget†about a query by deleting it from the state table. To be able to reliably detect a loop, a node must not forget a transaction identifier before its loop timeout has been reached. However, let us assume for the moment that a dynamic timeout (e.g. the dynamic abort timeout) is used as loop timeout. Consider for example, Figure 11, which is identical to Figure 10 except that the agent has an additional neighbor that can potentially receive the query along more than one path. At time t the originator submits a query with a dynamic abort timeout of t+4 seconds. The agent in turn warns its own neighbors to ignore results after time t+2. Request 5 is sent and arrives, is processed, and its results (step 8) are delivered before the dynamic abort timeout of time t+1. At time t+1 the loop timeout is reached and the query is deleted from the state table. For many reasons, including temporary network segment problems and sequential neighbor processing, request 10 can be delayed. In the example it arrives after time t+1. By this time, the receiving node has already forgotten that it already handled the very same query. Hence, the node cannot detect the loop and continues to process and forward (step 11, 12) the same query again. Query Result set Node Originator Agent Node 14 9 4 2 3 1 5 8 6 7 t+4 t+2 t+1 t+0.5 arrives at t+1.5 returns at t+1.8 10 13 t+1.75 11 12 Figure 11: Loop Detection Failure with Dynamic Loop Timeout. The non-simultaneous loop timeout problem is caused by the fact that some nodes still forward the query to other nodes when the destinations have already forgotten it. In other words, the problem is that loop timeout does not occur simultaneously everywhere. Consequently, a loop timeout must be static (does not change across hops) to guarantee that loops can reliably be detected. Along with a query, an originator not only provides a dynamic abort timeout, but also a static loop timeout. Initially at the originator, both values must be identical (e.g. t+4). After the first hop, both values become unrelated. To summarize, we have abort timeout · loop timeout. Loop timeouts must be static whereas abort timeouts may be static or dynamic. Under non-pipelined result set delivery, dynamic abort timeouts using exponential decay with halving ensure that a maximum of results can be delivered reliably within the time frame desired by a user. A dynamic 21 abort timeout model still requires static loop timeouts to ensure reliable loop detection, so that a node does not forward and answer the same query multiple times. 6 Query Scope As in a data integration system, the goal is to exploit several independent information sources as if they were a single source. This is important for distributed systems in which node topology or deployment model change frequently. For example, cross-organizational Grids and P2P networks exhibit such a character. However, in practice, it is often sufficient (and much more efficient) for a query to consider only a subset of all tuples (service descriptions) from a subset of nodes. For example, a typical query may only want to search tuples (services) within the scope of the domain cern.ch and ignore the rest of the world. Recall that to this end, Section 2.2 cleanly separated the concepts of (logical) query and (physical) query scope. A query is formulated against a global database view and is insensitive to link topology and deployment model. In other words, to a query the set of tuples appears as a single homogenous database, even though the set may be (recursively) partitioned across many nodes and databases. This means that in a relational or XML environment, at t Get Better Grades TodayJoin Essays24.com and get instant access to over 60,000+ Papers and Essays |
Similar Essays
|


