Apache ZooKeeper is a powerful system for managing distributed systems. While ZooKeeper’s power is great, and the developers even provide recipes for common use cases, it is perhaps masked by the extreme flexibility and complexity of the system. Thankfully, the folks at Netflix have implemented many of the aformentioned recipes in their Curator framework. The only lacking bit of Curator is documentation—the wiki has a lot of information, but is inscrutable for a beginner. This post intends to simplify the introduction to using Curator for service discovery.
This is not a cut & paste tutorial for using Curator, but rather a summariztion of how we implemented Curator in BenchPress. While the class names are unchanged, their content is simplified. Pull up the BenchPress code for further detail on employing Curator’s Service Discovery.
To begin with, put some junk in your POM:
<dependency> <groupId>com.netflix.curator</groupId> <artifactId>curator-framework</artifactId> <version>1.1.9</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.netflix.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>1.1.9</version> </dependency>
Curator is the base Netflix framework and they have broken out service discovery into a separate artifact. Unfortunately, Curator follows the lead of many Java projects that don’t properly declare their dependencies, and the curator-framework
artifact will pull in a (likely unwanted) version of Log4j, so I suggest using the exclusions.
You need some configgy things in various places; for the purposes of this tutorial I’ll put them in a Config object:
public class Config { public static final String zkConnectionString = "127.0.0.1:2181"; public static final String basePath = "/myApp/serviceDiscovery"; }
Talking to ZooKeeper
Easy parts done, let’s create a CuratorFramework, which is the conduit through which ZooKeeper communication happens:
try { curatorFramework = CuratorFrameworkFactory.builder() .connectionTimeoutMs(1000) .retryPolicy(new RetryNTimes(10, 500)) .connectString(Config.zkConnectionString) .build(); } catch (IOException e) { throw Throwables.propagate(e); } curatorFramework.start();
As mentioned above, ZooKeeper is incredibly flexible. This flexibility comes from its inherent simplicity—ZooKeeper is arranged like a filesystem as a series of nodes in a hierarchy, with each of the nodes having an optional data attachment. For this application, we don’t particularly care how Curator Service Discovery uses ZooKeeper to list services as available, but we do need to give Curator a place where this information can live. ZooKeeper paths are not autovivified and must be exlicitly created. Like other frameworks (see the delightfully undocumented zkClient), Curator provides a method to create a path if it doesn’t exist. This is a good thing to call whenever you create a CuratorFramework.
try { new EnsurePath(Config.basePath).ensure(curatorFramework.getZookeeperClient()); } catch (Exception e) { throw Throwables.propagate(e); }
Handling Metadata
With Curator configured and ZooKeeper ready to receive, we need to prepare our wares. In this example, our workers are services that listen on the netowrk for work to do and the farmer has work that it wants to distribut to workers. In order to communicate with workers, the farmer needs to know the hostname of a worker and what port it is listening on. This information is what we want to insert into ZooKeeper via Curator. To make things tidy, we will encapsulate this worker metadata in an object:
public final class WorkerMetadata { @JsonProperty("workerId") private final UUID workerId; @JsonProperty("listenAddress") private final String listenAddress; @JsonProperty("listenPort") private final int listenPort; @JsonCreator public WorkerMetadata(@JsonProperty("workerId") UUID workerId, @JsonProperty("listenAddress") String listenAddress, @JsonProperty("listenPort") int listenPort) { this.workerId = workerId; this.listenAddress = listenAddress; this.listenPort = listenPort; } public UUID getWorkerId() { return workerId; } public String getListenAddress() { return listenAddress; } public int getListenPort() { return listenPort; } }
A simple object with some annotations to let Jackson know how to interact with it. Curator doesn’t natively understand how to apply Jackson to custom objects, so the last thing we need is an InstanceSerializer to pass to the ServiceDiscovery:
public class InstanceSerializerFactory { private final ObjectReader objectReader; private final ObjectWriter objectWriter; InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter) { this.objectReader = objectReader; this.objectWriter = objectWriter; } public <T> InstanceSerializer<T> getInstanceSerializer( TypeReference<ServiceInstance<T>> typeReference) { return new JacksonInstanceSerializer<T>(objectReader, objectWriter, typeReference); } }
Advertising Workers
There are two sides to service discovery: those with something to offer (services) and those who want to utilize those services. In our example, workers provide services and the farmer is looking to utilize those services. Let’s start with how workers register themselves as providing a service:
public final class WorkerAdvertiser { private final CuratorFramework curatorFramework; private final InstanceSerializer<WorkerMetadata> jacksonInstanceSerializer; private final UUID workerId = UUID.randomUUID(); private final String serviceName; private final String listenAddress; private final int listenPort; WorkerAdvertiser(CuratorFramework curatorFramework, InstanceSerializerFactory instanceSerializerFactory String serviceName, String listenAddress, int listenPort) { this.curatorFramework = curatorFramework; this.jacksonInstanceSerializer = instanceSerializerFactory.getInstanceSerializer( new TypeReference<ServiceInstance<WorkerMetadata>>() {} ); this.listenAddress = listenAddress; this.listenPort = listenPort; } public void advertiseAvailability() { try { ServiceDiscovery<WorkerMetadata> discovery = getDiscovery(); discovery.start(); discovery.registerService(getInstance()); discovery.close(); } catch (Exception e) { throw Throwables.propagate(e); } } public void deAdvertiseAvailability() { try { ServiceDiscovery<WorkerMetadata> discovery = getDiscovery(); discovery.start(); discovery.unregisterService(getInstance()); discovery.close(); } catch (Exception e) { throw Throwables.propagate(e); } } private ServiceInstance<WorkerMetadata> getInstance() throws Exception { WorkerMetadata workerMetadata = new WorkerMetadata(workerId, listenAddress, listenPort); return ServiceInstance.<WorkerMetadata>builder() .name(serviceName) .address(listenAddress) .port(listenPort) .id(workerId.toString()) .payload(workerMetadata) .build(); } private ServiceDiscovery<WorkerMetadata> getDiscovery() { return ServiceDiscoveryBuilder.builder(WorkerMetadata.class) .basePath(Config.basePath) .client(curatorFramework) .serializer(jacksonInstanceSerializer) .build(); } }
The core things here are the two public methods for advertising availability—call them does exactly what it says on the tin. From the worker’s side, this is it: create a WorkerAdvertiser, initializing it with the metadata about this worker (in the example case, how to contact the worker), and advertise as appropriate. Curator will create entries in ZooKeeper that can be found by a farmer looking to pass out work.
Finding Workers
Using Curator from a farmer’s perspective is a similarly simple endeavor:
public final class WorkerFinder { private final ServiceDiscovery<WorkerMetadata> discovery; WorkerFinder(CuratorFramework curatorFramework, InstanceSerializerFactory instanceSerializerFactory) { discovery = ServiceDiscoveryBuilder.builder(WorkerMetadata.class) .basePath(Config.basePath()) .client(curatorFramework) .serializer(instanceSerializerFactory .getInstanceSerializer(new TypeReference<ServiceInstance<WorkerMetadata>>() {})) .build(); try { discovery.start(); } catch (Exception e) { throw Throwables.propagate(e); } } public Collection<ServiceInstance<WorkerMetadata>> getWorkers(String serviceName) { Collection<ServiceInstance<WorkerMetadata>> instances; try { instances = discovery.queryForInstances(serviceName)); } catch (Exception e) { throw Throwables.propagate(e); } return instances; } }
The WorkerFinder cretaes a Curator ServiceDiscovery object with which we can find workers matching the service we desire. The returned ServiceInstances contain a WorkerMetadata object in their payload:
for (ServiceInstance<WorkerMetadata> instance : workerFinder.getWorkers()) { WorkerMetadata workerMetadata = instance.getPayload(); // Do something useful here }
Netflix Curator does a lot more than service discovery— check out their documentation for further details. Also peruse the BenchPress source for further details on how we used Curator in that project.