Technology / A Unified Peer-To-Peer Database Framework
A Unified Peer-To-Peer Database FrameworkThis essay A Unified Peer-To-Peer Database Framework is available for you on Essays24.com! Search Term Papers, College Essay Examples and Free Essays on Essays24.com - full papers database.
Autor: anton 29 October 2010
Words: 9491 | Pages: 38
A Unified Peer-to-Peer Database Framework
and its Application for Scalable Service Discovery
CERN IT Division
European Organization for Nuclear Research
1211 Geneva 23, Switzerland
In a large distributed system spanning many administrative domains such as a Data-
Grid, it is often desirable to maintain and query dynamic and timely information about
active participants such as services, resources and user communities. However, in such a
database system, the set of information tuples in the universe is partitioned over one or
more distributed nodes, for reasons including autonomy, scalability, availability, performance
and security. It is not obvious how to enable powerful discovery query support and
collective collaborative functionality that operate on the distributed system as a whole,
rather than on a given part of it. Further, it is not obvious how to allow for search
results that are fresh, allowing dynamic content. It appears that a Peer-to-Peer (P2P)
database network may be well suited to support dynamic distributed database search, for
example for service discovery. In this paper, we take the first steps towards unifying the
fields of database management systems and P2P computing, which so far have received
considerable, but separate, attention. We extend database concepts and practice to cover
P2P search. Similarly, we extend P2P concepts and practice to support powerful generalpurpose
query languages such as XQuery and SQL. As a result, we devise the Unified
Peer-to-Peer Database Framework (UPDF), which is unified in the sense that it allows
to express specific applications for a wide range of data types, node topologies, query
languages, query response modes, neighbor selection policies, pipelining characteristics,
timeout and other scope options.
The next generation Large Hadron Collider (LHC) project at CERN, the European Organization
for Nuclear Research, involves thousands of researchers and hundreds of institutions
spread around the globe. A massive set of computing resources is necessary to support itâ€™s
data-intensive physics analysis applications, including thousands of network services, tens of
thousands of CPUs, WAN Gigabit networking as well as Petabytes of disk and tape storage
. To make collaboration viable, it was decided to share in a global joint effort - the European
DataGrid (EDG) [2, 3, 4, 5] - the data and locally available resources of all participating
laboratories and university departments.
Grid technology attempts to support flexible, secure, coordinated information sharing
among dynamic collections of individuals, institutions and resources. This includes data
sharing but also includes access to computers, software and devices required by computation
and data-rich collaborative problem solving . These and other advances of distributed
computing are necessary to increasingly make it possible to join loosely coupled people and
resources from multiple organizations.
An enabling step towards increased Grid software execution flexibility is the (still immature
and hence often hyped) web services vision [2, 7, 8] of distributed computing where
programs are no longer configured with static information. Rather, the promise is that programs
are made more flexible and powerful by querying Internet databases (registries) at
runtime in order to discover information and network attached third-party building blocks.
Services can advertise themselves and related metadata via such databases, enabling the assembly
of distributed higher-level components. For example, a data-intensive High Energy
Physics analysis application sweeping over Terabytes of data looks for remote services that
exhibit a suitable combination of characteristics, including network load, available disk quota,
access rights, and perhaps Quality of Service and monetary cost.
More generally, in a distributed system, it is often desirable to maintain and query dynamic
and timely information about active participants such as services, resources and user
communities. As in a data integration system [9, 10, 11], the goal is to exploit several independent
information sources as if they were a single source. However, in a large distributed
database system spanning many administrative domains, the set of information tuples in the
universe is partitioned over one or more distributed nodes, for reasons including autonomy,
scalability, availability, performance and security. It is not obvious how to enable powerful
discovery query support and collective collaborative functionality that operate on the distributed
system as a whole, rather than on a given part of it. Further, it is not obvious
how to allow for search results that are fresh, allowing time-sensitive dynamic content. It
appears that a Peer-to-Peer (P2P) database network may be well suited to support dynamic
distributed database search, for example for service discovery.
The overall P2P idea is as follows. Rather than have a centralized database, a distributed
framework is used where there exist one or more autonomous database nodes, each maintaining
its own data. Queries are no longer posed to a central database; instead, they are
recursively propagated over the network to some or all database nodes, and results are collected
and send back to the client. The key problems then are:
Ð† What are the detailed architecture and design options for P2P database searching? What
response models can be used to return matching query results? How should a P2P
query processor be organized? What query types can be answered (efficiently) by a P2P
network? What query types have the potential to immediately start piping in (early)
results? How can a maximum of results be delivered reliably within the time frame
desired by a user, even if a query type does not support pipelining? How can loops
be detected reliably using timeouts? How can a query scope be used to exploit topology
characteristics in answering a query?
Ð† Can we devise a unified P2P database framework for general-purpose query support in
large heterogeneous distributed systems spanning many administrative domains? More
precisely, can we devise a framework that is unified in the sense that it allows to express
specific applications for a wide range of data types, node topologies, query languages,
query response modes, neighbor selection policies, pipelining characteristics, timeout
and other scope options?
In this paper, we take the first steps towards unifying the fields of database management
systems and P2P computing, which so far have received considerable, but separate, attention.
We extend database concepts and practice to cover P2P search. Similarly, we extend P2P
concepts and practice to support powerful general-purpose query languages such as XQuery
 and SQL . As a result, we answer the above questions by proposing the so-called
Unified Peer-to-Peer Database Framework (UPDF).
This paper is organized as follows. Section 2 introduces a query, data and database model.
The related but orthogonal concepts of (logical) link topology and (physical) node deployment
model are discussed. Definitions are proposed, clarifying the notion of node, service, fat, thin
and ultra-thin P2P networks, as well as the commonality of a P2P network and a P2P network
for service discovery. The agent P2P model is proposed and compared with the servent P2P
model. A timeout-based mechanism to reliably detect and prevent query loops is proposed.
Section 3 characterizes in detail four techniques to return matching query results to an
originator, namely Routed Response, Direct Response, Routed Metadata Response, and Direct
Metadata Response. We discuss to what extent a given P2P network must mandate the use
of any particular response mode throughout the system.
Section 4 unifies query processing in centralized, distributed and P2P databases. A theory
of query processing for queries that are (or are not) recursively partitionable is proposed, which
directly reflects the basis of the P2P scalability potential. The definition and properties of
simple, medium and complex queries are clarified with respect to recursive partitioning. It is
established to what extent simple, medium and complex queries support pipelining.
Section 5 proposes dynamic abort timeouts using as policy exponential decay with halving.
This ensures that a maximum of results can be delivered reliably within the time frame
desired by a user even if a query does not support pipelining. It is established that a loop
timeout must be static.
Section 6 uses the concept of a query scope to navigate and prune the link topology and
filter on attributes of the deployment model. Indirect specification of scope based on neighbor
selection, timeout and radius is detailed.
Section 7 compares our work with existing research results. Finally, Section 8 concludes
this paper. We also outline interesting directions for future research.
2.1 Database Model and Topology
A node holds a set of tuples in its database. A database may be anything that accepts
queries from the query model and returns results according to the data model (see below).
For example, in [2, 14] we have introduced a registry node for service discovery that maintains
hyperlinks and cache content pointed to by these links. A content provider can publish a
hyperlink, which in turn enables the registry (and third parties) to pull (retrieve) the current
content. A remote client can query a registry in the XQuery language, obtaining a set of
tuples as answer. Figure 1 illustrates such a registry node with several content providers and
A distributed database framework is used where there exist one or more nodes. Each node
can operate autonomously. A node holds a set of tuples in its database. A given database
belongs to a single node. For flexibility, the databases of nodes may be deployed in any
arbitrary way (deployment model). For example, a number of nodes may reside on the same
host. A nodeâ€™s database may be co-located with the node. However, the databases of all
nodes may just as well be stored next to each other on a single central data server.
Figure 1: Clients, Registry and Content Providers .
The set of tuples in the universe is partitioned over the nodes, for reasons including
autonomy, scalability, availability, performance and security. Nodes are interconnected with
links in any arbitrary way. A link enables a node to query another node. A link topology
describes the link structure among nodes. The centralized model has a single node only. For
example, in a service discovery system, a link topology could tie together a distributed set of
administrative domains, each hosting a registry node holding descriptions of services local to
the domain. Several link topology models covering the spectrum from centralized models to
fine-grained fully distributed models can be envisaged, among them single node, star, ring,
tree, semi hierarchical as well as graph models. Real-world distributed systems often have a
more complex organization than any simple topology. They often combine several topologies
into a hybrid topology. Nodes typically play multiple roles in such a system. For example, a
node might have a centralized interaction with one part of the system, while being part of a
hierarchy in another part . Figure 2 depicts some example topologies.
Figure 2: Example Link Topologies .
Clearly not all nodes in a topology are equal. For example, node bandwidth may vary by
four orders of magnitude (50Kbps-1000Mbps), latency by six orders of magnitude (10us-10s),
and availability by four orders of magnitude (1%-99.99%).
We stress that it is by no means justifiable to advocate the use of graph topologies irrespective
of application context and requirements. Depending on the context, all topologies
have their merits and drawbacks in terms of scalability, reliability, availability, content coherence,
fault tolerance, security and maintainability. However, from the structural perspective,
the graph topology is the most general one, being able to express all other conceivable topolo-
gies. Since our goal is to support queries that are generally independent of the underlying
topology, this paper discusses problems arising in graph topologies. A problem solution that
applies to a graph also applies to any other topology. Of course, a simpler or more efficient
solution may exist for any particular topology. The results of this paper help enable the use
of graph topologies where appropriate, they do not require or mandate the use of them.
2.2 Query and Data Model
We have introduced a dynamic data model for discovery in [2, 14]. It is a general-purpose
data model that operates on tuples. Briefly, a tuple is an annotated multi-purpose soft state
data container that may contain a piece of arbitrary content and allows for refresh of that
content at any time. Content can be structured or semi-structured data in the form of any
arbitrary well-formed XML  document or fragment. An individual tuple may, but need
not, have a schema (XML Schema ), in which case it must be valid according to the
schema. All tuples may, but need not, share a common schema. This flexibility is important
for integration of heterogeneous content. Examples for content include a service description
expressed in WSDL , a file, picture, current network load, host information, stock quotes,
etc. Discussion in this paper often uses examples where the term tuple is substituted by the
more concrete term service description. Consider the following example tuple set:
XML query(XQuery query)
Our general-purpose query model is intended for read-only search. Insert, update and
delete capabilities are not required and not addressed. We have defined these capabilities
elsewhere [2, 14]. A query is formulated against a global database view and is insensitive to
link topology and deployment model. In other words, to a query the set of all tuples appears as
a single homogenous database, even though the set may be (recursively) partitioned across
many nodes and databases. This means that in a relational or XML environment, at the
global level, the set of all tuples appears as a single, very large, table or XML document,
respectively. The query scope, on the other hand, is used to navigate and prune the link
topology and filter on attributes of the deployment model. Searching is primarily guided by
the query. Scope hints are used only as necessary. A query is evaluated against a set of
tuples. The set, in turn, is specified by the scope. Conceptually, the scope is the input fed to
the query. The query scope is a set and may contain anything from all tuples in the universe
to none. Consider the example discovery queries:
Ð† Simple Query: Find all (available) services.
Ð† Simple Query: Find all services that implement a replica catalog service interface and
that CMS members are allowed to use, and that have an HTTP binding for the replica
catalog operation â€œXML getPFNs(String LFN)â€.
Ð† Simple Query: Find all CMS replica catalog services and return their physical file names
(PFNs) for a given logical file name (LFN); suppress PFNs not starting with â€œftp://â€.
Ð† Medium Query: Return the number of replica catalog services.
Ð† Complex Query: Find all (execution service, storage service) pairs where both services
of a pair live within the same domain. (Job wants to read and write locally).
For a detailed discussion of a wide range of discovery queries, their representation in the
XQuery language, as well as detailed motivation and justification, see our prior studies .
2.3 Definitions - Service and Node
Let us clarify the notion of node and service. A service exposes some functionality in the form
of service interfaces to remote clients. Example services are an echo service, a job scheduler,
a replica catalog, a time service, a gene sequencing service and a language translation service.
A node is a service that exposes at least functionality (i.e. service interfaces) for publication
and P2P queries. Examples are a hyper registry as introduced in our prior studies [2, 14], a
Gnutella  file sharing node and an extended job scheduler. Put another way, any service
that happens to support publication and P2P query interfaces is a node. This implies that
every node is a service. It does not imply that every service is a node. Only nodes are
part of the P2P topology, while services are not, because they do not support the required
interfaces. Usually, most services are not nodes. However, in some networks most or all
services are nodes.
Ð† Most services are not nodes. We propose to speak of a fat P2P network. Typically,
only one or a few large and powerful services are nodes, enabling publication and P2P
queries. An example is a backbone network of 10 large registry nodes that ties together
10 administrative domains, each hosting a registry node to which local domain services
can publish to be discovered, as depicted in Figure 3 (left). The services (shown small
on the edges) are not part of the network.
Ð† Most or all services are nodes. We propose to speak of a thin or ultra-thin P2P
network. An example is a network of millions of small services, each having some
proprietary core functionality (e.g. replica management optimization, gene sequencing,
multi-lingual translation), actively using the network for searching (e.g. to discover
replica catalogs, remote gene mappers or language dictionary services), but also actively
contributing to its search capabilities, as depicted in Figure 3 (right).
Figure 3: Fat (left) and Ultra-thin (right) Peer-to-Peer network.
There is no difference between these scenarios in terms of technology. Discussion in this
paper is applicable to ultra-thin, thin and fat P2P networks. For simplicity of exposition,
examples illustrate ultra-thin networks (every service is a node).
2.4 P2P network vs. P2P network for service discovery
In any kind of P2P network, nodes may publish themselves to other nodes, thereby forming
a topology. In a P2P network for service discovery, services and other content providers may
publish their service link and content links to nodes. Because nodes are services, also nodes
may publish their service link (and content links) to other nodes, thereby forming a topology.
In any kind of P2P network, a node has a database or some kind of data source against which
queries are applied. In a P2P network for service discovery, this database happens to be the
publication database. In other words, publication enables topology construction and at the
same time constructs the database to be searched. Discussion in this paper is applicable to
any kind of P2P network, while the examples illustrate service discovery.
2.5 Agent P2P Model
Queries in what we propose as the agent P2P model flow as follows. When any originator
wishes to search the P2P network with some query, it sends the query to a single node. We
call this entry point the agent node of the originator (i.e. its service gateway). The agent
applies the query to its local database and returns matching results; it also forwards the query
to its neighbor nodes. These neighbors return their local query results; they also forward the
query to their neighbors, and so on.
For flexibilty, the protocol between originator and agent is left unspecified. The agent
P2P model is a hybrid of centralization and decentralization. It allows fully decentralized
infrastructures, yet also allows seamless integration of centralized client-server computing into
an otherwise decentralized infrastructure. An originator may embed its agent in the same
process (decentralized). However, the originator may just as well choose a remote node as
agent (centralized), for reasons including central control, reliability, continuous availability,
maintainability, security, accounting and firewall restrictions on incoming connections for
originator hosts. For example, a simple HTML GUI may be sufficient to originate queries
that are sent to an organizationâ€™s agent node. Note that only nodes are part of the P2P
topology, while the originator is not, because it does not possess the functionality of a node.
The agent P2P model provides location and distribution transparency to originators. An
originator is unaware that (and how) database tuples are partitioned among nodes. It only
communicates with an agent black box.
2.6 Servent P2P Model
In contrast, in the servent P2P model (e.g. Gnutella) there exists no agent concept, but only
the concept of a servent. Put another way, the agent is always embedded into the originator
process, forming a monolithic servent. This model is decentralized, and it does not allow
for some degree of centralization. This restriction appears unjustified. More importantly, it
seriously limits the applicability of P2P computing. For example, the Gnutella servent model
could not cope with the large connectivity spectrum of the user community, ranging from very
low to very high bandwidth. As the Gnutella network grew, it became fragmented because
nodes with low bandwidth connections could not keep up with traffic. The idea of requiring all
functionality to exist at the very edge of the network had to be reconsidered. Eventually, the
situation was patched by rendering dumb the low bandwidth servents on the (slow) edges of
the network. The notion of centralized reflectors (Gnutella) and super-peers (Morpheus) was
(re) invented. A reflector is a powerful high bandwidth gateway for many remote originators
with low bandwidth dialup connections. It volunteers to take over the functionality and shield
traffic that would normally be carried via low bandwidth servents. However, servents still keep
data locally. The agent P2P model naturally covers centralized and decentralized hybrids.
Here a powerful node may act as agent for many remote originators. In the remainder of this
paper, we follow the agent P2P model and do not use the term servent anymore. The terms
originator, node and agent (node) are used instead.
2.7 Loop Detection
Query shipping is used to route queries through the nodes of the topology. A query remains
identical during forwards over hops (unless rewritten or split by a query optimizer). The very
same query may arrive at a node multiple times, along distinct routes, perhaps in a complex
pattern. Loops in query routes must be detected and prevented. Otherwise, unnecessary or
endless multiplication of workloads would be caused. Figure 3 depicts topologies with the
potential for a query to become trapped in infinite loops.
To enable loop detection, an originator attaches a different transaction identifier to each
query, which is a universally unique identifier (UUID). The transaction identifier always
remains identical during query forwarding over hops. A node maintains a state table of
recent transaction identifiers and returns an error whenever a query is received that has
already been seen. For example, this approach is used in Gnutella.
In practice, it is sufficient for the UUID to be unique with exceedingly large probability,
suggesting the use of a 128 bit integer computed by a cryptographic hash digest function
such as MD5  or SHA-1  over message text, originator IP address, current time and a
3 Routed vs. Direct Response, Metadata Responses
We propose to distinguish four techniques to return matching query results to an originator:
Routed Response, Direct Response, Routed Metadata Response, and Direct Metadata
Response, as depicted in Figure 4. Let us examine the main implications with a Gnutella
use case. A typical Gnutella query such as â€œLike a virginâ€ is matched by some hundreds of
files, most of them referring to replicas of the very same music file. Not all matching files
are identical because there exist multiple related songs (e.g. remixes, live recordings) and
multiple versions of a song (e.g. with different sampling rates). A music file has a size of at
least several megabytes. Many thousands of concurrent users submit queries to the Gnutella
network. A large fraction of users lives on slow and unreliable dialup connections.
with Metadata (RRM)
9 10 11 12
Response without Invitation
8 9 10 11
1 8 1 7
with Invitation (DR)
7 6 5 9 10
Direct Metadata Response
with Invitation (DRM)
7 6 5 9 10
13 12 11
Figure 4: Peer-to-Peer Response Modes.
Ð† Routed Response. (Figure 4-a). Results are propagated back into the originator
along the paths on which the query flowed outwards. Each (passive) node returns to its
(active) client not only its own local results but also all remote results it receives from
neighbors. The response protocol is tightly coupled to the query protocol. Routing
messages through a logical overlay network of P2P nodes is much less efficient than
routing through a physical network of IP routers . Routing back even a single
Gnutella file (let alone all results) for each query through multiple nodes would consume
large amounts of overall system bandwidth, most likely grinding Gnutella to a screeching
halt. As the P2P network grows, it is fragmented because nodes with low bandwidth
connections cannot keep up with traffic . Consequently, routed responses are not
well suited for file sharing systems such as Gnutella. In general, overall economics
dictate that routed responses are not well suited for systems that return many and/or
Ð† Direct Response With and Without Invitation. To better understand the underlying
idea, we first introduce the simpler variant, which is Direct Response Without
Invitation (Figure 4-b). Results are not returned by routing back through intermediary
nodes. Each (active) node that has local results sends them directly to the (passive)
agent, which combines and hands them back to the originator. Response traffic does
not travel through the P2P system. It is offloaded via individual point-to-point data
transfers on the edges of the network. The response push protocol can be separated
from the query protocol. For example, HTTP, FTP or other protocols may be used for
response push. Let us examine the main implications with a use case.
As already mentioned, a typical Gnutella query such as â€œLike a virginâ€ is matched
by some hundreds of files, most of them referring to replicas of the very same music
file. For Gnutella users it would be sufficient to receive just a small subset of matching
files. Sending back all such files would unnecessarily consume large amounts of direct
bandwidth, most likely restricting Gnutella to users with excessive cheap bandwidth at
their disposal. Note however, that the overall Gnutella system would be only marginally
affected by a single user downloading, say, a million music files, because the largest
fraction of traffic does not travel through the P2P system itself.
In general, individual economics dictate that direct responses without invitation are not
well suited for systems that return many equal and/or large results, while a small subset
would be sufficient. A variant based on invitation (Figure 4-c) softens the problem by
inverting control flow. Nodes with matching files do not blindly push files to the agent.
Instead they invite the agent to initiate downloads. The agent can then act as it sees
fit. For example, it can filter and select a subset of data sources and files and reject
the rest of the invitations. Due to its inferiority, the variant without invitation is not
considered any further. In the remainder of this thesis, we use the term Direct Response
as a synonym for Direct Response With Invitation.
Ð† Routed Metadata Response and Direct Metadata Response. Here interaction
consists of two phases. In the first phase, routed responses (Figure 4-d) or direct responses
(Figure 4-e,f)) are used. However, nodes do not return data results in response
to queries, but only small metadata results. The metadata contains just enough information
to enable the originator to retrieve the data results and possibly to apply
filters before retrieval. In the second phase, the originator selects, based on the metadata,
which data results are relevant. The (active) originator directly connects to the
relevant (passive) data sources and asks for data results. Again, the largest fraction of
response traffic does not travel through the P2P system. It is offloaded via individual
point-to-point data transfers on the edges of the network. The retrieval protocol can be
separated from the query protocol. For example, HTTP, FTP or other protocols may
be used for retrieval.
The routed metadata response approach is used by file sharing systems such as Gnutella.
A Gnutella query does not return files; it just returns an annotated set of HTTP URLs.
The originator connects to a subset of these URLs to download files as it sees fit.
Another example is a service discovery system where the first phase returns a set of
service links instead of full service descriptions. In the second phase, the originator
connects to a subset of these service links to download service descriptions as it sees
fit. Another example is a referral system where the first phase uses routed metadata
response to return the service links of the set of nodes having local matching results
(â€œGo ask these nodes for the answerâ€). In the second phase, the originator or agent
connects directly to a subset of these nodes to query and retrieve result sets as it sees fit.
This variant avoids the â€œinvitation stormâ€ possible under Direct Response. Referrals
are also known as redirections. A metadata response mode with a radius scope of zero
can be used to implement the referral behavior of the Domain Name System (DNS).
For details, see Section 7.
3.1 Comparison of Response Mode Properties
Let us compare the properties of the various response models. The following abbreviations are
used. RR . . . Routed Response, RRM . . . Routed Response with metadata, RRX . . . Routed
Response with and without metadata, DR . . . Direct Response, DRX . . . Direct Response
with and without metadata.
Ð† Distribution and Location Transparency. In the response models without metadata,
the originator is unaware that (and how) tuples are partitioned among nodes.
In other words, these models are transparent with respect to distribution and location.
Metadata responses require an originator to contact individual data providers to
download full results, and hence are not transparent.
Ð† (Efficient) Query Support. All models can answer any query. Both simple and
medium queries can be answered efficiently by RRX and DRX, whereas a complex query
cannot be answered efficiently. (Justification of this result is deferred to Section 4).
Transmission of duplicate results unnecessarily wastes bandwidth. RRX can eliminate
duplicates already along the query path, whereas DRX can only do so in the final stage,
at the agent. Similarly, maximum result set size limiting is more efficient under RRX
because superfluous results can already be discarded along the query path.
Ð† Economics. RR results travel multiple hops rather than just a single hop. This leads
to poor overall economics. The effect is more pronounced for large results, as is the
case for music files. RR can also lead to unfortunate individual economics. A user that
induces few or undemanding queries consumes few system resources. However, if many
heavy results for queries from other parties are routed back via such a userâ€™s node, it
can end up in a situation where it pays for large amounts of bandwidth and gives it
away for free to anonymous third parties. For a given user, the costs may drastically
outweigh the gains. One could perhaps devise appropriate authorization, quality of
service and flow control policies. The unsatisfying economic situation is similar to the
one of physical IP routers on the Internet, which also forward traffic from and to third
parties. In any case, there remains the fact that results travel multiple hops rather than
In principle, RRM has the same poor economic properties as RR. However, if metadata
is very small in size (e.g. as in Gnutella), then the incurred processing and transmission
cost may be acceptable. For example, Gnutella nodes just route back an annotated set
of HTTP URLs as metadata. Under DRX, result traffic does not travel through the
P2P system. Retrieving results is a deal between just two parties, the provider and the
consumer. Consequently, individual economics are controllable and predictable. A user
is not charged much for other peoples workloads, unless he explicitly volunteers.
Ð† Number of TCP Connections at Originator. Under RR and DR, just one (or
no) TCP connection is required at the originator, whereas metadata modes require a
connection per (selected) data provider. The more data sources are selected, the more
heavyweight data retrieval becomes. Metadata modes can encounter serious latency
limitations due to the very expensive nature of secure (and even insecure) TCP connection
setup. Hence, the approach does not scale well. However, for many use cases
this may not be a problem because a client always selects only a small number of data
providers (e.g. 10).
Ð† Number of TCP Connections at Agent. Usually a node has few neighbors (five
to hundreds). Under RRX, one TCP connection per neighbor is required at an agent.
Under DRX, additionally a connection per data provider is required. Again, the more
data providers exist, the more heavyweight data retrieval becomes. DRX can encounter
serious latency limitations due to the very expensive nature of secure (and even insecure)
TCP connection setup. For example, a query that finds the total number of services
in the domain cern.ch should use RRX. Under DRX, it may generate responses from
every single node in that domain. Consequently, an agent can face an invitation storm
resembling a denial of service attack. On the other hand, the potential to exploit
parallelism is large. All data providers can be handled independently in parallel.
Ð† Latency. If a query is of a type that cannot support pipelining (see Section 4.4), the
latency for the first result to arrive at the originator is always poor. For a pipelined
query, the latency for the first result to arrive is small under DRX, because a response
travels a single hop only. Under RRX, a response travels multiple hops, and latency
increases accordingly. However, the cost of TCP connection setup at originator and/or
agent can invert the situation. Under RR, the cost of TCP connection setup to nodes
is paid only once (at node publication time), because connections can typically be kept
alive until node deregistration. This is not the case under the other response modes.
Ð† Caching. Caching is a technique that trades content freshness for response time. RRX
can potentially support caching of content from other nodes at intermediate nodes because
response flow naturally concentrates and integrates results from many nodes.
DRX nodes return results directly and independently, and hence cannot efficiently support
Ð† Trust Delegation to Unknown Parties. Query and result traffic are subject to
security attacks. It is not sufficient to establish a secure mutually authenticated channel
between any two nodes because malicious nodes can divert routes or modify queries and
results. Since a query is almost always routed through multiple hops, many of which
are unknown to the agent, we believe that indirect delegation of trust to unknown
parties cannot practically be avoided. Security sensitive applications should choose
DRX because at least the retrieval of results occurs in a predictable manner between
just two parties that can engage in secure mutual authentication and authorization.
RRM merely delegates trust on metadata results, but not on full results.
3.2 Response Mode Switches and Shifts
Although from the functional perspective all response modes are equivalent, clearly no mode
is optimal under all circumstances. The question arises as to what extent a given P2P network
must mandate the use of any particular response mode throughout the system. Observe that
nodes are autonomous and defined by their interface only. A node does not â€œseeâ€ what kind
of response mode (or technology in general) its neighbors use in answering a query. As long
as query semantics are preserved, the node does not care. Consequently, we propose that
response modes can be mixed by switches and shifts, in arbitrary permutations, as depicted
in Figure 5.
Ð† Routed Response ) Direct Response switch. (Figure 5-a). Starting from the
agent, Routed Response is used initially. The central node (â€œfootballâ€) receives a query
DR --> RR Switch
DR --> DR Shift
RR --> DR Switch
Figure 5: Response Mode Switches and Shifts.
from the agent. For some reason, it decides to answer the query using Direct Response.
The response flow that would have been taken under Routed Response is shown crossed
Ð† Direct Response ) Routed Response switch. (Figure 5-b). Initially, Direct
Response is used. However, the â€œfootballâ€ decides to answer the query using Routed
Ð† Direct Response ) Direct Response shift. (Figure 5-c). Initially, Direct Response
is used. The football decides to continue using Direct Response but shift the target
of responses. To its own neighbors the football declares itself as (a fake) agent. The
responses that would have flowed into the real agent now flow back into the football,
and then from the football to the real agent. Note again that this does not break
semantics because the football behaves as if the results would have been obtained from
its own local database. The real agent receives the same results, but solely from the
Ð† Routed Response ) Routed Response shift. At each hop, the response target is
shifted to be the current node. Interestingly, this kind of shift is at the very heart of
the definition of routed response. The classification introduced here shows that this is
not the only possible approach.
A node may choose its response mode based on a local and autonomous assessment of
the advantages and disadvantages involved. However, because of its context knowledge, often
the client (e.g. originator) is in the best position to judge what kind of response mode would
be most suitable. Therefore, it is useful to allow specifying as part of the query a hint that
indicates the preferred response mode (routed or direct).
4 Query Processing
In a distributed database system, there exists a single local database and zero or more neighbors.
A classic centralized database system is a special case where there exists a single local
database and zero neighbors. From the perspective of query processing, a P2P database
system has the same properties as a distributed database system, in a recursive structure.
Hence, we propose to organize the P2P query engine like a general distributed query engine
[24, 25]. A given query involves a number of operators (e.g. SELECT, UNION, CONCAT,
SORT, JOIN, GROUP, SEND, RECEIVE, SUM, MAX, MAXSETSIZE, IDENTITY) that
may or may not be exposed at the query language level. For example, the SELECT operator
takes a set and returns a new set with tuples satisfying a given predicate. The UNION
operator computes the union of two or more sets. The CONCAT operator concatenates the
elements of two or more sets into a list of arbitrary order (without eliminating duplicates).
A list can be emulated by a set using distinct surrogate keys. The MAXSETSIZE operator
limits the maximum result set size. The IDENTITY operator returns its input set unchanged.
The semantics of an operator can be satisfied by several operator implementations, using
a variety of algorithms, each with distinct resource consumption, latency and performance
characteristics. The query optimizer chooses an efficient query execution plan, which is a tree
plugged together from operators. In an execution plan, a parent operator consumes results
from child operators. Query execution is driven from the (final) root consumer in a top down
fashion. For example, a request for results from the root operator may in turn lead to a
request for results from child operators, which in turn request results from their own child
operators, and so on. By means of an execution plan, an optimizer can move a query to data
or data to a query. In other words, queries and sub queries can be executed locally or at
remote nodes. Performance tradeoffs of query shipping, data shipping and hybrid shipping
are discussed in .
4.1 Template Query Execution Plan
Recall that Section 2.2 proposed a query model. Any query Q within our query model can
be answered by an agent with the template execution plan A depicted in Figure 6. The plan
applies a local query L against the tuple set of the local database. Each neighbor (if any) is
asked to return a result set for (the same) neighbor query N. Local and neighbor result sets
are unionized into a single result set by a unionizer operator U that must take the form of
either UNION or CONCAT. A merge query M is applied that takes as input the result set and
returns a new result set. The final result set is sent to the client, i.e. another node or an
L ... Local Query
M ... Merge Query
Q ... User Query
N ... Neighbor Query
L ... RECEIVEk
U ... Unionizer Operator
A ... Agent Plan
Figure 6: Template Execution Plan.
4.2 Centralized Execution Plan
To see that indeed any query against any kind of database system can be answered within this
framework we derive a simple centralized execution plan that always satisfies the semantics
of any query Q. The plan substitutes specific subplans into the template plan A, leading to
distinct plans for the agent node (Figure 7-a) and neighbors nodes (Figure 7-b). In the case
of XQuery and SQL, parameters are substituted as follows:
A: M = Q
U = UNION
L = "RETURN /"
N' = N
N: M = IDENTITY
U = UNION
L = "RETURN /"
N' = N
A: M = Q
U = UNION
L = "SELECT *"
N' = N
N: M = IDENTITY
U = UNION
L = "SELECT *"
N' = N
Q ... User Query
N ... Neighbor Query Plan
a) Agent Query Plan
RETURN / ... RECEIVEk
RETURN / ... RECEIVEk
b) Neighbor Query Plan
A ... Agent Query Plan
Figure 7: Centralized Execution Plan. The Agent Query Plan (a) fetches all raw tuples from
the local and all remote databases, unionizes the result sets, and then applies the query Q.
Neighbors are handed a rewritten neighbor query (b) that recursively fetches all raw tuples,
and returns their union.
In other words, the agentâ€™s plan A fetches all raw tuples from the local and all remote
databases, unionizes the result sets, and then applies the query Q. Neighbors are handed a
rewritten neighbor query N that recursively fetches all raw tuples, and returns their union.
The neighbor query N is recursively partitionable (see below).
The same centralized plan works for routed and direct response, both with and without
metadata. Under direct response, a node does forward the query N, but does not attempt to
receive remote result sets (conceptually empty result sets are delivered). The node does not
send a result set to its predecessor, but directly back to the agent.
In a distributed database system, there exists a single local database and zero or more
neighbors. A classic centralized database system is a special case where there exists a single
local database and zero neighbors. From the perspective of query processing, a P2P
database system has the same properties as a distributed database system, in a recursive
structure. Consequently, the very same centralized execution plan applies to any kind of
database system; and any query within our query model can be answered.
The centralized execution plan can be inefficient because potentially large amounts of
base data have to be shipped to the agent before locally applying the userâ€™s query. However,
sometimes this is the only plan that satisfies the semantics of a query. This is always the case
for a complex query. A more efficient execution plan can sometimes be derived (as proposed
below). This is always the case for a simple and medium query.
4.3 Recursively Partitionable Query
A P2P network can be efficient in answering queries that are recursively partitionable. A
query Q is recursively partitionable if, for the template plan A, there exists a merge query M
and a unionizer operator U to satisfy the semantics of the query Q assuming that L and N
are chosen as L = Q and N = A. In other words, a query is recursively partitionable if the
very same execution plan can be recursively applied at every node in the P2P topology. The
corresponding execution plan is depicted in Figure 8.
M ... Merge Query
Q ... User Query
Q ... RECEIVEk
U ... Unionizer Operator
A ... Agent Plan
Figure 8: Execution Plan for Recursively Partitionable Query.
The input and output of a merge query have the same form as the output of the local query
L. Query processing can be parallelized and spread over all participating nodes. Potentially
very large amounts of information can be searched while investing little resources such as
processing time per individual node. The recursive parallel spread of load implied by a
recursively partitionable query is the basis of the massive P2P scalability potential. However,
query performance is not necessarily good, for example due to high network I/O costs.
Now we are in the position to clarify the definition of simple, medium and complex queries.
Ð† Simple Query. A query is simple if it is recursively partitionable using M = IDENTITY,
U = UNION.
Ð† Medium Query. A query is a medium query if it is not simple, but it is recursively
Ð† Complex Query. A query is complex if it is not recursively partitionable.
It is an interesting open question (at least to us) if a query processor can automatically
determine whether a correct merge query and unionizer exist, and if so, how to choose them.
Related problems have been studied extensively in the context of distributed and parallel
query processing as well as query rewriting for heterogeneous and homogenous relational
database systems [24, 27, 25]. Distributed XQueries are an emerging field . For simplicity,
in the remainder of this paper we assume that the user explicitly provides M and U along with
a query Q. If M and U are not provided as part of a query to any given node, the node acts
defensively by assuming that the query is not recursively partitionable. Choosing M and U is
straightforward for a human being. Consider for example the following medium XQueries.
Ð† Return the number of replica catalog services. The merge query computes the sum of a
set of numbers. The unionizer is CONCAT.
Q = RETURN
M = RETURN
U = CONCAT
Ð† Find the service with the largest uptime.
Q=M= RETURN (/tupleset/tuple[@type="service"] SORTBY (./@uptime)) [last()]
U = UNION
Note that the query engine always encapsulates the query output with a tupleset root
element. A query need not generate this root element as it is implicitly added by the environment.
A custom merge query can be useful. For example, assume that each individual result
tuple is tagged with a timestamp indicating the time when the information expires and
ceases to be valid. A custom merge query can ignore all results that have already expired.
Alternatively, it can ignore all results but the one with the most recent timestamp. As
another example, a custom merge query can cut off all but the first 100 result tuples. Such a
result set size limiting feature (maxResults) attempts to make bandwidth consumption more
The success of many applications depends on how fast they can start producing initial/relevant
portions of the result set rather than how fast the entire result set is produced . This is
particularly often the case in distributed systems where many nodes are involved in query
processing, each of which may be unresponsive for many reasons. The situation is even more
pronounced in systems with loosely coupled autonomous nodes.
Often an originator would be happy to already do useful work with one or a few early
results, as long as they arrive quickly and reliably. Results that arrive later can be handled
later, or are ignored anyway. For example, in an interactive session, a typical Gnutella user
is primarily interested in being able to start some music download as soon as possible. The
user is quickly disappointed when not a single result for a query arrives in less than four
seconds. Choosing among 1000 species of â€œLike a virginâ€ is interesting, but helps little if it
comes at the expense of, say, one minute idle waits. As another example, consider a user that
wants to discover schedulers to submit a job. It is interesting to discover that 100 schedulers
are available, but the primary requirement is to find at least three quickly and reliably.
Database theory and practice establishes that query execution engines in general, and
distributed query execution engines in particular should be based on iterators . An
operator corresponds to an iterator class. Iterators of any kind have a uniform interface,
namely the three methods open(), next() and close(). In an execution plan, a parent
iterator consumes results from child iterators. Query execution is driven from the (final) root
consumer in a top down fashion. For example, a call to next() may call next() on child
10 5 11
Figure 9: Non-Pipelined (left) and Pipelined Query (right).
Query Type Supports Pipelining?
Simple Query Yes
Medium Query Maybe
Complex Query Typically No
Table 1: Pipelining Support of Query Types.
iterators, which in turn call next() on their child iterators, and so on. For efficiency, the
method next() can be asked to deliver several results at once in a so-called batch. Semantics
are as follows: â€œGive me a batch of at least N and at most M resultsâ€ (less than N results
are delivered when the entire query result set is exhausted). For example, the SEND and
RECEIVE network communication operators (iterators) typically work in batches.
The monotonic semantics of certain operators such as SELECT, UNION, CONCAT,
SEND, RECEIVE, MAXSETSIZE, IDENTITY allow that operator implementations consume
just one or a few child results on next(). In contrast, the non-monotonic semantics
of operators such as SORT, GROUP, MAX, some JOIN methods, etc. require that operator
implementations consume all child results already on open() in order to be able to deliver a
result on the first call to next(). Since the output of these operators on a subset of the input
is not, in general, a subset of the output on the whole input, these operators need to see all of
their input before they produce the correct output. This does not break the iterator concept
but has important latency and performance implications. Whether the root operator of an
agent exhibits a short or long latency to deliver to the originator the first result from the
result set depends on the query operators in use, which in turn depend on the given query.
In other words, for some query types the originator has the potential to immediately start
piping in results (at moderate performance rate), while for other query types it must wait for
a long time until the first result becomes available (the full result set arrives almost at once,
A query (an operator implementation) is said to be pipelined if it can already produce at
least one result tuple before all input tuples have been seen. Otherwise, a query (an operator)
is said to be non-pipelined. Figure 9 depicts examples for both modes.
Simple queries do support pipelining (e.g. Gnutella queries). Medium queries may or may
not support pipelining, whereas complex queries typically do not support pipelining. The
properties are summarized in Table 1.
Bear in mind, that even if a query can be pipelined, the messaging model and underlying
network layers in use may not support pipelining, in which case a result set has to be delivered
with long latency in a single large batch. Finally note that non-pipelining delivery without
a dynamic abort timeout feature is highly unreliable due to the so-called simultaneous abort
problem (see below). If only one of the many nodes in the query path fails to be responsive
for whatever reasons, all other nodes in the chain are waiting, eventually time out at the
same time, and the originator receives not even a single result.
5 Static Loop Timeout and Dynamic Abort Timeout
Clearly there comes a time when a user is no longer interested in query results, no matter
whether any more results might be available. The query roaming the network and its response
traffic should fade away after some time. In addition, P2P systems are well advised to
attempt to limit resource consumption by defending against runaway queries roaming forever
or producing gigantic result sets, either unintended or malicious. To address these problems,
an absolute abort timeout is attached to a query, as it travels across hops. An abort timeout
can be seen as a deadline. Together with the query, a node tells a neighbor â€œI will ignore
(the rest of ) your result set if I have not received it before 12:00:00 today.â€ The problem,
then, is to ensure that a maximum of results can be delivered reliably within the time frame
desired by a user.
The value of a static timeout remains unchanged across hops, except for defensive modification
in flight triggered by runaway query detection (e.g. infinite timeout). In contrast,
it is intended that the value of a dynamic timeout be decreased at each hop. Nodes further
away from the originator may time out earlier than nodes closer to the originator.
5.1 Dynamic Abort Timeout
A static abort timeout is entirely unsuitable for non-pipelined result set delivery, because it
leads to a serious reliability problem, which we propose to call simultaneous abort timeout.
If just one of the many nodes in the query path fails to be responsive for whatever reasons,
all other nodes in the path are waiting, eventually time out and attempt to return at least
a partial result set. However, it is impossible that any of these partial results ever reach the
originator, because all nodes time out simultaneously (and it takes some time for results to
flow back). For example, the agent times out and attempts to return its local partial results
to the originator. After that, all partial results flowing to the agent from neighbors, and their
neighbors, etc. are discarded â€“ it is already too late. However, even the agent cannot deliver
results to the originator because the originator has already timed out (shortly) before the
results arrive. Hence, the originator receives not even a single result if just one of the many
nodes in the query path fails to be responsive.
To address the simultaneous abort timeout problem, we propose dynamic abort timeouts.
Under dynamic abort timeout, nodes do not time out at the same time. Instead, nodes further
Figure 10: Dynamic Abort Timeout.
away from the originator time out earlier than nodes closer to the originator. This provides
some safety time window for the partial results of any node to flow back across multiple hops
to the originator. Together with the query, a node tells a neighbor â€œI will ignore (the rest
of ) your result set if I have not received it before 12:00:00 today. Do whatever you think is
appropriate to meet this deadlineâ€. Intermediate nodes can and should adaptively decrease
the timeout value as necessary, in order to leave a large enough time window for receiving
and returning partial results subsequent to timeout.
Observe that the closer a node is to the originator, the more important it is (because if
it cannot meet its deadline, results from a large branch are discarded). Further, the closer
a node is to the originator, the larger is its response and bandwidth consumption. Thus, as
a good policy to choose the safety time window, we propose exponential decay with halving.
The window size is halved at each hop, leaving large safety windows for important nodes
and tiny window sizes for nodes that contribute only marginal result sets. Also, taking into
account network latency and the time it takes for a query to be locally processed, the timeout
is updated at each hop N according to the following recurrence formula:
timeoutN = currenttimeN + timeoutNÐŽ1 ÐŽ currenttimeN
Consider for example Figure 10. At time t the originator submits a query with a dynamic
abort timeout of t+4 seconds. In other words, it warns the agent to ignore results after time
t+4. The agent in turn intends to safely meet the deadline and so figures that it needs to
retain a safety window of 2 seconds, already starting to return its (partial) results at time
t+2. The agent warns its own neighbors to ignore results after time t+2. The neighbors also
intend to safely meet the deadline. From the 2 seconds available, they choose to allocate
1 second, and leave the rest to the branch remaining above. Eventually, the safety window
becomes so small that a node can no longer meet a deadline on timeout. The results from
the unlucky node are ignored, and its partial results are discarded. However, other nodes
below and in other branches are unaffected. Their results survive and have enough time to
hop all the way back to the originator before time t+4.
Instead of ignoring results which miss their deadline a node may also close the connection.
This may, but need not, be harmless. The connection is typically simply reestablished as soon
as a new query is to be forwarded. However, in an attempt to educate good P2P citizens,
a node may choose to stop propagating or deny service to neighbors that repeatedly do not
meet abort deadlines. For example, a strategy may use an exponential back-off algorithm.
Note that as long as a node obeys its timeout it can independently implement any timeout
policy it sees fit for its purposes without regard to the policy implemented at other nodes.
If a node misbehaves or maliciously increases the abort timeout, it risks not being able to
meet its own deadline, and is likely soon dropped or denied service. Such healthy measures
move less useful nodes to the edge of the network where they cause less harm, because their
number of topology links tends to decrease.
To summarize, under non-pipelined result set delivery, dynamic abort timeouts using
exponential decay with halving ensure that a maximum of results can be delivered reliably
within the time frame desired by a user. We speculate that dynamic timeouts could also
incorporate sophisticated cost functions involving latency and bandwidth estimation and/or
5.2 Static Loop Timeout
Interestingly, a static loop timeout is required in order to fully preserve query semantics. A
dynamic timeout (e.g. the dynamic abort timeout) is unsuitable to be used as loop timeout.
Otherwise, a problem arises that we propose to call non-simultaneous loop timeout. Recall
from Section 2.7 that the same query may arrive at a node multiple times, along distinct
routes, perhaps in a complex pattern. Loops in query routes must be detected and prevented.
Otherwise, unnecessary or endless multiplication of workloads would be caused. To this end,
a node maintains a state table of recent transaction identifiers and associated loop timeouts
and returns an error whenever a query is received that has already been seen (according to
the state table). Before the loop timeout is reached, the same query can potentially arrive
multiple times, along distinct routes. On loop timeout, a node may â€œforgetâ€ about a query
by deleting it from the state table. To be able to reliably detect a loop, a node must not
forget a transaction identifier before its loop timeout has been reached.
However, let us assume for the moment that a dynamic timeout (e.g. the dynamic abort
timeout) is used as loop timeout. Consider for example, Figure 11, which is identical to Figure
10 except that the agent has an additional neighbor that can potentially receive the query
along more than one path. At time t the originator submits a query with a dynamic abort
timeout of t+4 seconds. The agent in turn warns its own neighbors to ignore results after
time t+2. Request 5 is sent and arrives, is processed, and its results (step 8) are delivered
before the dynamic abort timeout of time t+1. At time t+1 the loop timeout is reached and
the query is deleted from the state table. For many reasons, including temporary network
segment problems and sequential neighbor processing, request 10 can be delayed. In the
example it arrives after time t+1. By this time, the receiving node has already forgotten
that it already handled the very same query. Hence, the node cannot detect the loop and
continues to process and forward (step 11, 12) the same query again.
Figure 11: Loop Detection Failure with Dynamic Loop Timeout.
The non-simultaneous loop timeout problem is caused by the fact that some nodes still
forward the query to other nodes when the destinations have already forgotten it. In other
words, the problem is that loop timeout does not occur simultaneously everywhere. Consequently,
a loop timeout must be static (does not change across hops) to guarantee that loops
can reliably be detected. Along with a query, an originator not only provides a dynamic
abort timeout, but also a static loop timeout. Initially at the originator, both values must
be identical (e.g. t+4). After the first hop, both values become unrelated.
To summarize, we have abort timeout Â· loop timeout. Loop timeouts must be
static whereas abort timeouts may be static or dynamic. Under non-pipelined result set
delivery, dynamic abort timeouts using exponential decay with halving ensure that a maximum
of results can be delivered reliably within the time frame desired by a user. A dynamic
abort timeout model still requires static loop timeouts to ensure reliable loop detection, so
that a node does not forward and answer the same query multiple times.
6 Query Scope
As in a data integration system, the goal is to exploit several independent information sources
as if they were a single source. This is important for distributed systems in which node
topology or deployment model change frequently. For example, cross-organizational Grids
and P2P networks exhibit such a character. However, in practice, it is often sufficient (and
much more efficient) for a query to consider only a subset of all tuples (service descriptions)
from a subset of nodes. For example, a typical query may only want to search tuples (services)
within the scope of the domain cern.ch and ignore the rest of the world.
Recall that to this end, Section 2.2 cleanly separated the concepts of (logical) query and
(physical) query scope. A query is formulated against a global database view and is insensitive
to link topology and deployment model. In other words, to a query the set of tuples appears as
a single homogenous database, even though the set may be (recursively) partitioned across
many nodes and databases. This means that in a relational or XML environment, at t
Get Better Grades Today
Join Essays24.com and get instant access to over 60,000+ Papers and Essays