This is a brief introduction to the Karmasphere DP language. Technical documentation is available at http://labs.karmasphere.org/dp/javadoc.
The Karmasphere DP language is a high-performance non-blocking parallel language for performing data processing. It is designed to give the user a high degree of control over the usage of system resources, for example, how many CPU cores or how much disk I/O time to use, without requiring the software developer to explicitly consider these issues in code.
It was originally intended for collecting attributes of URLs and domain names to be used in an anti-spam system, although it has since developed into a full parallel programming language with many general purpose operators.
The implementation is a standalone library which can be used in any Java 1.5 environment. It can take full advantage of multiprocessor (SMP or NUMA) systems, and may be scaled sideways - since the interpreter and environment are stateless, an entire cluster of machines may run the interpreter in parallel without any requirement for synchronization.
Traditional, sequential programs are lists of instructions, executed in order. If an instruction needs CPU, disk or network resources, it must wait until the resource is available before continuing. Network latency, for example, is highly unpredictable and can create terrible performance problems for sequential programs. While it is possible to write complex sequential programs which optimize resource usage, it is well beyond the ability of the naive programmer. The DP language is designed to solve this problem by making parallel programming easy.
DP Programs are workflows, that is, they may be represented graphically using a boxes-and-arrows notation. In the DP language, every operation executes concurrently, whenever the necessary resources are available. This means that operations which would hold up execution waiting for resources in a traditional sequential language do not slow down a DP program at all.
We chose to make our source language almost identical with GraphViz, which builds this same textual representation into JPEG or other images. Debugging output from the interpreter is also in GraphViz format, and may be easily rendered and read without deep understanding of the machine.
The core language, documented here, includes some basic operators for processing and network operations. Additional operators are easy to develop using the framework provided.
The entire interpreter is provided as an API. It may be executed on the commandline, using a job server (available, but not documented here), embedded in a query server (also available, but not documented here), an RPC daemon (under development) or anywhere else that it may be useful. It consumes no resources when idle, and only those resources specified when active.
While in informal workflows, arrows may be implicitly typed or may simply indicate a relation, the DP language permits explicit typing of each arrow. Any Java type may be used; the DP interpreter does not have to be made aware of every type in the system, although it can make certain inferences about types if they are registered. If types are specified, programs may be typechecked at compile-time.
Here is an example program which obtains most IP4 addresses related to the input, regardless of the type of the input. The image on the right is a rendering of the trace of the interpreter running this program. Click on the image for a larger view. Note that the compiler has annotated every edge with the port name and Java type, and that the interpreter has annotated each edge with every value which passed along that edge.
digraph MyProgram {
input -> splitter;
splitter:url -> urldomain;
pipe_domain = new pipe [type="domain"];
urldomain -> pipe_domain;
splitter:domain -> pipe_domain;
lookup_a = new dnslookup [type="A"];
lookup_ns = new dnslookup [type="NS"];
lookup_mx = new dnslookup [type="MX"];
pipe_domain -> lookup_a;
pipe_domain -> lookup_ns;
pipe_domain -> lookup_mx;
lookup_a:a -> print;
lookup_ns:ns -> print;
lookup_mx:mx -> print;
splitter:ip4 -> print;
}
This can be written in a truly GraphViz-compatible manner and will still be accepted by the GraphViz compiler. It could be written (and rendered) thus; again click on the image for a larger view.
digraph MyProgram {
input -> splitter;
splitter -> urldomain [port="url"];
pipe_domain [op="pipe",type="domain"];
urldomain -> pipe_domain;
splitter -> pipe_domain [port="domain"];
lookup_a [op="dnslookup",type="A"];
lookup_ns [op="dnslookup",type="NS"];
lookup_mx [op="dnslookup",type="MX"];
pipe_domain -> lookup_a;
pipe_domain -> lookup_ns;
pipe_domain -> lookup_mx;
lookup_a -> print [port="a"];
lookup_ns -> print [port="ns"];
lookup_mx -> print [port="mx"];
splitter -> print [port="ip4"];
}
However, the syntactic sugar offered by the DP compiler makes the code much more readable, and it converts the code back into pure GraphViz, generating debug traces like the one above.
The compiler builds the graph into a bytecode format, which is loaded into an executable format by the loader. The interpreter can then execute this loaded image, and will return a result asynchronously via a result handler. Debugging output is provided as an opaque trace, which may be displayed using any of the display methods.
The best place to start is with the runtime API. Of the compiler, you need only read the documentation for the Compiler class itself; almost everything else is automatically generated by our compiler-compiler tool.
Source code in the DP language may be passed through the C Preprocessor (Java library) for convenience.
The language, compiler and interpreter have been designed for parallelism and scalability, both in multiprocessor and cluster contexts. The system runs on a fixed number of threads, a fixed number of file descriptors, and so forth. All execution is event-based, and operations which are blocked do not block a thread or any other resource but their state.
The performance is very consistent, regardless of whether the majority of operations are network I/O or local operations. The majority of sequential systems will slow down as soon as they have to wait for responses from network or disk I/O. The interpreter itself is stateless, and so unless any operator has particular synchronization requirements, there is no constraint on the size or format of a processing cluster.
While the actual implementation has not been particularly optimised, it still performs in the region of 30,000 operations a second on a laptop computer, over 100,000 a second running on only two cores of my quad-processor fileserver (using the built-in resource controls), just by benefit of the design. The programs used for testing were production applications, doing a mixture of disk I/O, network I/O and local computation.
Please join the Karmasphere users mailing list, or mail shevek@karmasphere.com.