Framework Development Guide

In this document we refer to Mesos applications as "frameworks".

See one of the example framework schedulers in MESOS_HOME/src/examples/ to get an idea of what a Mesos framework scheduler and executor in the language of your choice looks like. RENDLER provides example framework implementations in C++, Go, Haskell, Java, Python and Scala.

Create your Framework Scheduler

You can write a framework scheduler in C, C++, Java/Scala, or Python. Your framework scheduler should inherit from the Scheduler class (see API below). Your scheduler should create a SchedulerDriver (which will mediate communication between your scheduler and the Mesos master) and then call SchedulerDriver.run().

Scheduler API

Declared in MESOS_HOME/include/mesos/scheduler.hpp

/*
 * Empty virtual destructor (necessary to instantiate subclasses).
 */
virtual ~Scheduler() {}

/*
 * Invoked when the scheduler successfully registers with a Mesos
 * master. A unique ID (generated by the master) used for
 * distinguishing this framework from others and MasterInfo
 * with the ip and port of the current master are provided as arguments.
 */
virtual void registered(SchedulerDriver* driver,
                        const FrameworkID& frameworkId,
                        const MasterInfo& masterInfo) = 0;

/*
 * Invoked when the scheduler re-registers with a newly elected Mesos master.
 * This is only called when the scheduler has previously been registered.
 * MasterInfo containing the updated information about the elected master
 * is provided as an argument.
 */
virtual void reregistered(SchedulerDriver* driver,
                          const MasterInfo& masterInfo) = 0;

/*
 * Invoked when the scheduler becomes "disconnected" from the master
 * (e.g., the master fails and another is taking over).
 */
virtual void disconnected(SchedulerDriver* driver) = 0;

/*
 * Invoked when resources have been offered to this framework. A
 * single offer will only contain resources from a single slave.
 * Resources associated with an offer will not be re-offered to
 * _this_ framework until either (a) this framework has rejected
 * those resources (see SchedulerDriver::launchTasks) or (b) those
 * resources have been rescinded (see Scheduler::offerRescinded).
 * Note that resources may be concurrently offered to more than one
 * framework at a time (depending on the allocator being used). In
 * that case, the first framework to launch tasks using those
 * resources will be able to use them while the other frameworks
 * will have those resources rescinded (or if a framework has
 * already launched tasks with those resources then those tasks will
 * fail with a TASK_LOST status and a message saying as much).
 */
virtual void resourceOffers(SchedulerDriver* driver,
                            const std::vector<Offer>& offers) = 0;

/*
 * Invoked when an offer is no longer valid (e.g., the slave was
 * lost or another framework used resources in the offer). If for
 * whatever reason an offer is never rescinded (e.g., dropped
 * message, failing over framework, etc.), a framework that attempts
 * to launch tasks using an invalid offer will receive TASK_LOST
 * status updats for those tasks (see Scheduler::resourceOffers).
 */
virtual void offerRescinded(SchedulerDriver* driver,
                            const OfferID& offerId) = 0;

/*
 * Invoked when the status of a task has changed (e.g., a slave is
 * lost and so the task is lost, a task finishes and an executor
 * sends a status update saying so, etc). If implicit
 * acknowledgements are being used, then returning from this
 * callback _acknowledges_ receipt of this status update! If for
 * whatever reason the scheduler aborts during this callback (or
 * the process exits) another status update will be delivered (note,
 * however, that this is currently not true if the slave sending the
 * status update is lost/fails during that time). If explicit
 * acknowledgements are in use, the scheduler must acknowledge this
 * status on the driver.
 */
virtual void statusUpdate(SchedulerDriver* driver,
                          const TaskStatus& status) = 0;

/*
 * Invoked when an executor sends a message. These messages are best
 * effort; do not expect a framework message to be retransmitted in
 * any reliable fashion.
 */
virtual void frameworkMessage(SchedulerDriver* driver,
                              const ExecutorID& executorId,
                              const SlaveID& slaveId,
                              const std::string& data) = 0;

/*
 * Invoked when a slave has been determined unreachable (e.g.,
 * machine failure, network partition). Most frameworks will need to
 * reschedule any tasks launched on this slave on a new slave.
 */
virtual void slaveLost(SchedulerDriver* driver,
                       const SlaveID& slaveId) = 0;

/*
 * Invoked when an executor has exited/terminated. Note that any
 * tasks running will have TASK_LOST status updates automagically
 * generated.
 */
virtual void executorLost(SchedulerDriver* driver,
                          const ExecutorID& executorId,
                          const SlaveID& slaveId,
                          int status) = 0;

/*
 * Invoked when there is an unrecoverable error in the scheduler or
 * scheduler driver. The driver will be aborted BEFORE invoking this
 * callback.
 */
virtual void error(SchedulerDriver* driver, const std::string& message) = 0;

Create your Framework Executor

Your framework executor must inherit from the Executor class. It must override the launchTask() method. You can use the $MESOS_HOME environment variable inside of your executor to determine where Mesos is running from.

Executor API

Declared in MESOS_HOME/include/mesos/executor.hpp

/*
 * Invoked once the executor driver has been able to successfully
 * connect with Mesos. In particular, a scheduler can pass some
 * data to it's executors through the FrameworkInfo.ExecutorInfo's
 * data field.
 */
virtual void registered(ExecutorDriver* driver,
                        const ExecutorInfo& executorInfo,
                        const FrameworkInfo& frameworkInfo,
                        const SlaveInfo& slaveInfo) = 0;

/*
 * Invoked when the executor re-registers with a restarted slave.
 */
virtual void reregistered(ExecutorDriver* driver,
                          const SlaveInfo& slaveInfo) = 0;

/*
 * Invoked when the executor becomes "disconnected" from the slave
 * (e.g., the slave is being restarted due to an upgrade).
 */
virtual void disconnected(ExecutorDriver* driver) = 0;

/*
 * Invoked when a task has been launched on this executor (initiated
 * via Scheduler::launchTasks). Note that this task can be realized
 * with a thread, a process, or some simple computation, however, no
 * other callbacks will be invoked on this executor until this
 * callback has returned.
 */
virtual void launchTask(ExecutorDriver* driver,
                        const TaskInfo& task) = 0;

/*
 * Invoked when a task running within this executor has been killed
 * (via SchedulerDriver::killTask). Note that no status update will
 * be sent on behalf of the executor, the executor is responsible
 * for creating a new TaskStatus (i.e., with TASK_KILLED) and
 * invoking ExecutorDriver::sendStatusUpdate.
 */
virtual void killTask(ExecutorDriver* driver, const TaskID& taskId) = 0;

/*
 * Invoked when a framework message has arrived for this
 * executor. These messages are best effort; do not expect a
 * framework message to be retransmitted in any reliable fashion.
 */
virtual void frameworkMessage(ExecutorDriver* driver,
                              const std::string& data) = 0;

/*
 * Invoked when the executor should terminate all of it's currently
 * running tasks. Note that after a Mesos has determined that an
 * executor has terminated any tasks that the executor did not send
 * terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED,
 * TASK_FAILED, etc) a TASK_LOST status update will be created.
 */
virtual void shutdown(ExecutorDriver* driver) = 0;

/*
 * Invoked when a fatal error has occurred with the executor and/or
 * executor driver. The driver will be aborted BEFORE invoking this
 * callback.
 */
virtual void error(ExecutorDriver* driver, const std::string& message) = 0;

Install your Framework

You need to put your framework somewhere that all slaves on the cluster can get it from. If you are running HDFS, you can put your executor into HDFS. Then, you tell Mesos where it is via the ExecutorInfo parameter of MesosSchedulerDriver's constructor (e.g. see src/examples/java/TestFramework.java for an example of this). ExecutorInfo is a a Protocol Buffer Message class (defined in include/mesos/mesos.proto), and you set its URI field to something like "HDFS://path/to/executor/". Also, you can pass the frameworks_home configuration option (defaults to: MESOS_HOME/frameworks) to your mesos-slave daemons when you launch them to specify where all of your framework executors are stored (e.g. on an NFS mount that is available to all slaves), then set ExecutorInfo to be a relative path, and the slave will prepend the value of frameworks_home to the relative path provided.

Once you are sure that your executors are available to the mesos-slaves, you should be able to run your scheduler, which will register with the Mesos master, and start receiving resource offers!

Labels

Labels can be found in the TaskInfo, DiscoveryInfo and TaskStatuss and let's framework and module writers use Labels to tag and pass unstructured information around Mesos. Labels are free-form key-value pairs supplied by the framework scheduler or label decorator hooks. Below is the protobuf definitions of labels:

  optional Labels labels = 11;
/**
 * Collection of labels.
 */
message Labels {
    repeated Label labels = 1;
}

/**
 * Key, value pair used to store free form user-data.
 */
message Label {
  required string key = 1;
  optional string value = 2;
}

Labels are not interpreted by Mesos itself, but will be made available over master and slave state endpoints. Further more, the executor and scheduler can introspect labels on the TaskInfo and TaskStatus programmatically. Below is an example of how two label pairs ("environment": "prod" and "bananas": "apples") can be fetched from the master state endpoint.

$ curl http://master/state.json
...
{
  "executor_id": "default",
  "framework_id": "20150312-120017-16777343-5050-39028-0000",
  "id": "3",
  "labels": [
    {
      "key": "environment",
      "value": "prod"
    },
    {
      "key": "bananas",
      "value": "apples"
    }
  ],
  "name": "Task 3",
  "slave_id": "20150312-115625-16777343-5050-38751-S0",
  "state": "TASK_FINISHED",
  ...
},

Service discovery

When your framework registers an executor or launches a task, it can provide additional information for service discovery. This information is stored by the Mesos master along with other imporant information such as the slave currently running the task. A service discovery system can programmatically retrieve this information in order to set up DNS entries, configure proxies, or update any consistent store used for service discovery in a Mesos cluster that runs multiple frameworks and multiple tasks.

The optional DiscoveryInfo message for TaskInfo and ExecutorInfo is declared in MESOS_HOME/include/mesos/mesos.proto

message DiscoveryInfo {
  enum Visibility {
    FRAMEWORK = 0;
    CLUSTER = 1;
    EXTERNAL = 2;
  }

  required Visibility visibility = 1;
  optional string name = 2;
  optional string environment = 3;
  optional string location = 4;
  optional string version = 5;
  optional Ports ports = 6;
  optional Labels labels = 7;
}

Visibility is the key parameter that instructs the service discovery system whether a service should be discoverable. We currently differentiate between three cases:

  • a task should not be discoverable for anyone but its framework.
  • a task should be discoverable for all frameworks running on the Mesos cluster but not externally.
  • a task should be made discoverable broadly.

Many service discovery systems provide additional features that manage the visibility of services (e.g., ACLs in proxy based systems, security extensions to DNS, VLAN or subnet selection). It is not the intended use of the visibility field to manage such features. When a service discovery system retrieves the task or executor information from the master, it can decide how to handle tasks without DiscoveryInfo. For instance, tasks may be made non discoverable to other frameworks (equivalent to visibility=FRAMEWORK) or discoverable to all frameworks (equivalent to visibility=CLUSTER).

The name field is a string that that provides the service discovery system with the name under which the task is discoverable. The typical use of the name field will be to provide a valid hostname. If name is not provided, it is up to the service discovery system to create a name for the task based on the name field in taskInfo or other information.

The environment, location, and version fields provide first class support for common attributes used to differentiate between similar services in large deployments. The environment may receive values such as PROD/QA/DEV, the location field may receive values like EAST-US/WEST-US/EUROPE/AMEA, and the version field may receive values like v2.0/v0.9. The exact use of these fields is up to the service discovery system.

The ports field allows the framework to identify the ports a task listens to and explicitly name the functionality they represent and the layer-4 protocol they use (TCP, UDP, or other). For example, a Cassandra task will define ports like "7000,Cluster,TCP", "7001,SSL,TCP", "9160,Thrift,TCP", "9042,Native,TCP", and "7199,JMX,TCP". It is up to the service discovery system to use these names and protocol in appropriate ways, potentially combining them with the name field in DiscoveryInfo.

The labels field allows a framework to pass arbitrary labels to the service discovery system in the form of key/value pairs. Note that anything passed through this field is not guaranteed to be supported moving forward. Nevertheless, this field provides extensibility. Common uses of this field will allow us to identify use cases that require first class support.