Framework Development Guide

In this document we refer to Mesos applications as "frameworks".

See one of the example framework schedulers in MESOS_HOME/src/examples/ to get an idea of what a Mesos framework scheduler and executor in the language of your choice looks like. RENDLER provides example framework implementations in C++, Go, Haskell, Java, Python and Scala.

Create your Framework Scheduler

If you are writing a scheduler against Mesos 1.0 or newer, it is recommended to use the new HTTP API to talk to Mesos.

If your framework needs to talk to Mesos 0.28.0 or older, you can write the scheduler in C++, Java/Scala, or Python. Your framework scheduler should inherit from the Scheduler class (see API below). Your scheduler should create a SchedulerDriver (which will mediate communication between your scheduler and the Mesos master) and then call SchedulerDriver.run().

Scheduler API

Callback interface to be implemented by framework schedulers.

Declared in MESOS_HOME/include/mesos/scheduler.hpp

/*
 * Invoked when the scheduler successfully registers with a Mesos
 * master. A unique ID (generated by the master) used for
 * distinguishing this framework from others and `MasterInfo`
 * with the ip and port of the current master are provided as arguments.
 */
virtual void registered(
    SchedulerDriver* driver,
    const FrameworkID& frameworkId,
    const MasterInfo& masterInfo);

/*
 * Invoked when the scheduler re-registers with a newly elected Mesos master.
 * This is only called when the scheduler has previously been registered.
 * `MasterInfo` containing the updated information about the elected master
 * is provided as an argument.
 */
virtual void reregistered(
    SchedulerDriver* driver,
    const MasterInfo& masterInfo);

/*
 * Invoked when the scheduler becomes "disconnected" from the master
 * (e.g., the master fails and another is taking over).
 */
virtual void disconnected(SchedulerDriver* driver);

/*
 * Invoked when resources have been offered to this framework. A
 * single offer will only contain resources from a single slave.
 * Resources associated with an offer will not be re-offered to
 * _this_ framework until either (a) this framework has rejected
 * those resources (see SchedulerDriver::launchTasks) or (b) those
 * resources have been rescinded (see Scheduler::offerRescinded).
 * Note that resources may be concurrently offered to more than one
 * framework at a time (depending on the allocator being used). In
 * that case, the first framework to launch tasks using those
 * resources will be able to use them while the other frameworks
 * will have those resources rescinded (or if a framework has
 * already launched tasks with those resources then those tasks will
 * fail with a TASK_LOST status and a message saying as much).
 */
virtual void resourceOffers(
    SchedulerDriver* driver,
    const std::vector<Offer>& offers);

/*
 * Invoked when an offer is no longer valid (e.g., the slave was
 * lost or another framework used resources in the offer). If for
 * whatever reason an offer is never rescinded (e.g., dropped
 * message, failing over framework, etc.), a framework that attempts
 * to launch tasks using an invalid offer will receive TASK_LOST
 * status updates for those tasks (see Scheduler::resourceOffers).
 */
virtual void offerRescinded(SchedulerDriver* driver, const OfferID& offerId);

/*
 * Invoked when the status of a task has changed (e.g., a slave is
 * lost and so the task is lost, a task finishes and an executor
 * sends a status update saying so, etc). If implicit
 * acknowledgements are being used, then returning from this
 * callback _acknowledges_ receipt of this status update! If for
 * whatever reason the scheduler aborts during this callback (or
 * the process exits) another status update will be delivered (note,
 * however, that this is currently not true if the slave sending the
 * status update is lost/fails during that time). If explicit
 * acknowledgements are in use, the scheduler must acknowledge this
 * status on the driver.
 */
virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status);

/*
 * Invoked when an executor sends a message. These messages are best
 * effort; do not expect a framework message to be retransmitted in
 * any reliable fashion.
 */
virtual void frameworkMessage(
    SchedulerDriver* driver,
    const ExecutorID& executorId,
    const SlaveID& slaveId,
    const std::string& data);

/*
 * Invoked when a slave has been determined unreachable (e.g.,
 * machine failure, network partition). Most frameworks will need to
 * reschedule any tasks launched on this slave on a new slave.
 *
 * NOTE: This callback is not reliably delivered. If a host or
 * network failure causes messages between the master and the
 * scheduler to be dropped, this callback may not be invoked.
 */
virtual void slaveLost(SchedulerDriver* driver, const SlaveID& slaveId);

/*
 * Invoked when an executor has exited/terminated. Note that any
 * tasks running will have TASK_LOST status updates automagically
 * generated.
 *
 * NOTE: This callback is not reliably delivered. If a host or
 * network failure causes messages between the master and the
 * scheduler to be dropped, this callback may not be invoked.
 */
virtual void executorLost(
    SchedulerDriver* driver,
    const ExecutorID& executorId,
    const SlaveID& slaveId,
    int status);

/*
 * Invoked when there is an unrecoverable error in the scheduler or
 * scheduler driver. The driver will be aborted BEFORE invoking this
 * callback.
 */
virtual void error(SchedulerDriver* driver, const std::string& message);

Scheduler Driver API

The Scheduler Driver is responsible for managing the scheduler's lifecycle (e.g., start, stop, or wait to finish) and interacting with Mesos Master (e.g., launch tasks, kill tasks, etc.).

Note that this interface is usually not implemented by a framework itself, but it describes the possible calls a framework scheduler can make to interact with the Mesos Master.

Please note that usage of this interface requires an instantiated MesosSchedulerDiver. See src/examples/test_framework.cpp for an example of using the MesosSchedulerDriver.

Declared in MESOS_HOME/include/mesos/scheduler.hpp

// Starts the scheduler driver. This needs to be called before any
// other driver calls are made.
virtual Status start();

// Stops the scheduler driver. If the 'failover' flag is set to
// false then it is expected that this framework will never
// reconnect to Mesos. So Mesos will unregister the framework and
// shutdown all its tasks and executors. If 'failover' is true, all
// executors and tasks will remain running (for some framework
// specific failover timeout) allowing the scheduler to reconnect
// (possibly in the same process, or from a different process, for
// example, on a different machine).
virtual Status stop(bool failover = false);

// Aborts the driver so that no more callbacks can be made to the
// scheduler. The semantics of abort and stop have deliberately been
// separated so that code can detect an aborted driver (i.e., via
// the return status of SchedulerDriver::join, see below), and
// instantiate and start another driver if desired (from within the
// same process). Note that 'stop()' is not automatically called
// inside 'abort()'.
virtual Status abort();

// Waits for the driver to be stopped or aborted, possibly
// _blocking_ the current thread indefinitely. The return status of
// this function can be used to determine if the driver was aborted
// (see mesos.proto for a description of Status).
virtual Status join();

// Starts and immediately joins (i.e., blocks on) the driver.
virtual Status run();

// Requests resources from Mesos (see mesos.proto for a description
// of Request and how, for example, to request resources from
// specific slaves). Any resources available are offered to the
// framework via Scheduler::resourceOffers callback, asynchronously.
virtual Status requestResources(const std::vector<Request>& requests);

// Launches the given set of tasks. Any remaining resources (i.e.,
// those that are not used by the launched tasks or their executors)
// will be considered declined. Note that this includes resources
// used by tasks that the framework attempted to launch but failed
// (with `TASK_ERROR`) due to a malformed task description. The
// specified filters are applied on all unused resources (see
// mesos.proto for a description of Filters). Available resources
// are aggregated when multiple offers are provided. Note that all
// offers must belong to the same slave. Invoking this function with
// an empty collection of tasks declines offers in their entirety
// (see Scheduler::declineOffer).
virtual Status launchTasks(
    const std::vector<OfferID>& offerIds,
    const std::vector<TaskInfo>& tasks,
    const Filters& filters = Filters());

// Kills the specified task. Note that attempting to kill a task is
// currently not reliable. If, for example, a scheduler fails over
// while it was attempting to kill a task it will need to retry in
// the future. Likewise, if unregistered / disconnected, the request
// will be dropped (these semantics may be changed in the future).
virtual Status killTask(const TaskID& taskId);

// Accepts the given offers and performs a sequence of operations on
// those accepted offers. See Offer.Operation in mesos.proto for the
// set of available operations. Any remaining resources (i.e., those
// that are not used by the launched tasks or their executors) will
// be considered declined. Note that this includes resources used by
// tasks that the framework attempted to launch but failed (with
// `TASK_ERROR`) due to a malformed task description. The specified
// filters are applied on all unused resources (see mesos.proto for
// a description of Filters). Available resources are aggregated
// when multiple offers are provided. Note that all offers must
// belong to the same slave.
virtual Status acceptOffers(
    const std::vector<OfferID>& offerIds,
    const std::vector<Offer::Operation>& operations,
    const Filters& filters = Filters());

// Declines an offer in its entirety and applies the specified
// filters on the resources (see mesos.proto for a description of
// Filters). Note that this can be done at any time, it is not
// necessary to do this within the Scheduler::resourceOffers
// callback.
virtual Status declineOffer(
    const OfferID& offerId,
    const Filters& filters = Filters());

// Removes all filters previously set by the framework (via
// launchTasks()). This enables the framework to receive offers from
// those filtered slaves.
virtual Status reviveOffers();

// Inform Mesos master to stop sending offers to the framework. The
// scheduler should call reviveOffers() to resume getting offers.
virtual Status suppressOffers();

// Acknowledges the status update. This should only be called
// once the status update is processed durably by the scheduler.
// Not that explicit acknowledgements must be requested via the
// constructor argument, otherwise a call to this method will
// cause the driver to crash.
virtual Status acknowledgeStatusUpdate(const TaskStatus& status);

// Sends a message from the framework to one of its executors. These
// messages are best effort; do not expect a framework message to be
// retransmitted in any reliable fashion.
virtual Status sendFrameworkMessage(
    const ExecutorID& executorId,
    const SlaveID& slaveId,
    const std::string& data);

// Allows the framework to query the status for non-terminal tasks.
// This causes the master to send back the latest task status for
// each task in 'statuses', if possible. Tasks that are no longer
// known will result in a TASK_LOST update. If statuses is empty,
// then the master will send the latest status for each task
// currently known.
virtual Status reconcileTasks(const std::vector<TaskStatus>& statuses);

Handling Failures

How to build Mesos frameworks that remain available in the face of failures is discussed in a separate document.

Working with Executors

Using the Mesos Command Executor

Mesos provides a simple executor that can execute shell commands and Docker containers on behalf of the framework scheduler; enough functionality for a wide variety of framework requirements.

Any scheduler can make use of the Mesos command executor by filling in the optional CommandInfo member of the TaskInfo protobuf message.

message TaskInfo {
  ...
  optional CommandInfo command = 7;
  ...
}

The Mesos slave will fill in the rest of the ExecutorInfo for you when tasks are specified this way.

Note that the agent will derive an ExecutorInfo from the TaskInfo and additionally copy fields (e.g., Labels) from TaskInfo into the new ExecutorInfo. This ExecutorInfo is only visible on the agent.

Using the Mesos Default Executor

Since Mesos 1.1, a new built-in default executor (experimental) is available that can execute a group of tasks. Just like the command executor the tasks can be shell commands or Docker containers.

The current semantics of the default executor are as folows:

-- Tasks are launched as nested containers underneath the executor container.

-- Task containers and executor container share resources like cpu, memory, network and volumes.

-- There is no resource isolation between different tasks within an executor. Tasks' resources are added to the executor container.

-- If any of the tasks exits with a non-zero exit code, all the tasks in the task group are killed and the executor shuts down.

-- Multiple task groups are not supported.

Once the default executor is considered stable, the command executor will be deprecated in favor of it.

Any scheduler can make use of the Mesos default executor by setting ExecutorInfo.type to DEFAULT when launching a group of tasks using the LAUNCH_GROUP offer operation. If DEFAULT executor is explicitly specified when using LAUNCH offer operation, command executor is used instead of the default executor. This might change in the future when the default executor gets support for handling LAUNCH operation.

message ExecutorInfo {
  ...
    optional Type type = 15;
  ...
}

Creating a custom Framework Executor

If your framework has special requirements, you might want to provide your own Executor implementation. For example, you may not want a 1:1 relationship between tasks and processes.

If you are writing an executor against Mesos 1.0 or newer, it is recommended to use the new HTTP API to talk to Mesos.

If writing against Mesos 0.28.0 or older, your framework executor must inherit from the Executor class. It must override the launchTask() method. You can use the $MESOS_HOME environment variable inside of your executor to determine where Mesos is running from.

Executor API

Declared in MESOS_HOME/include/mesos/executor.hpp

/*
 * Invoked once the executor driver has been able to successfully
 * connect with Mesos. In particular, a scheduler can pass some
 * data to its executors through the `FrameworkInfo.ExecutorInfo`'s
 * data field.
 */
virtual void registered(
    ExecutorDriver* driver,
    const ExecutorInfo& executorInfo,
    const FrameworkInfo& frameworkInfo,
    const SlaveInfo& slaveInfo);

/*
 * Invoked when the executor re-registers with a restarted slave.
 */
virtual void reregistered(ExecutorDriver* driver, const SlaveInfo& slaveInfo);

/*
 * Invoked when the executor becomes "disconnected" from the slave
 * (e.g., the slave is being restarted due to an upgrade).
 */
virtual void disconnected(ExecutorDriver* driver);

/*
 * Invoked when a task has been launched on this executor (initiated
 * via Scheduler::launchTasks). Note that this task can be realized
 * with a thread, a process, or some simple computation, however, no
 * other callbacks will be invoked on this executor until this
 * callback has returned.
 */
virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task);

/*
 * Invoked when a task running within this executor has been killed
 * (via SchedulerDriver::killTask). Note that no status update will
 * be sent on behalf of the executor, the executor is responsible
 * for creating a new TaskStatus (i.e., with TASK_KILLED) and
 * invoking ExecutorDriver::sendStatusUpdate.
 */
virtual void killTask(ExecutorDriver* driver, const TaskID& taskId);

/*
 * Invoked when a framework message has arrived for this
 * executor. These messages are best effort; do not expect a
 * framework message to be retransmitted in any reliable fashion.
 */
virtual void frameworkMessage(ExecutorDriver* driver, const std::string& data);

/*
 * Invoked when the executor should terminate all of its currently
 * running tasks. Note that after a Mesos has determined that an
 * executor has terminated any tasks that the executor did not send
 * terminal status updates for (e.g., TASK_KILLED, TASK_FINISHED,
 * TASK_FAILED, etc) a TASK_LOST status update will be created.
 */
virtual void shutdown(ExecutorDriver* driver);

/*
 * Invoked when a fatal error has occurred with the executor and/or
 * executor driver. The driver will be aborted BEFORE invoking this
 * callback.
 */
virtual void error(ExecutorDriver* driver, const std::string& message);

Install your custom Framework Executor

After creating your custom executor, you need to make it available to all slaves in the cluster.

One way to distribute your framework executor is to let the Mesos fetcher download it on-demand when your scheduler launches tasks on that slave. ExecutorInfo is a Protocol Buffer Message class (defined in include/mesos/mesos.proto), and it contains a field of type CommandInfo. CommandInfo allows schedulers to specify, among other things, a number of resources as URIs. These resources are fetched to a sandbox directory on the slave before attempting to execute the ExecutorInfo command. Several URI schemes are supported, including HTTP, FTP, HDFS, and S3 (e.g. see src/examples/java/TestFramework.java for an example of this).

Alternatively, you can pass the frameworks_home configuration option (defaults to: MESOS_HOME/frameworks) to your mesos-slave daemons when you launch them to specify where your framework executors are stored (e.g. on an NFS mount that is available to all slaves), then use a relative path in CommandInfo.uris, and the slave will prepend the value of frameworks_home to the relative path provided.

Once you are sure that your executors are available to the mesos-slaves, you should be able to run your scheduler, which will register with the Mesos master, and start receiving resource offers!

Labels

Labels can be found in the FrameworkInfo, TaskInfo, DiscoveryInfo and TaskStatus messages; framework and module writers can use Labels to tag and pass unstructured information around Mesos. Labels are free-form key-value pairs supplied by the framework scheduler or label decorator hooks. Below is the protobuf definitions of labels:

  optional Labels labels = 11;
/**
 * Collection of labels.
 */
message Labels {
    repeated Label labels = 1;
}

/**
 * Key, value pair used to store free form user-data.
 */
message Label {
  required string key = 1;
  optional string value = 2;
}

Labels are not interpreted by Mesos itself, but will be made available over master and slave state endpoints. Further more, the executor and scheduler can introspect labels on the TaskInfo and TaskStatus programmatically. Below is an example of how two label pairs ("environment": "prod" and "bananas": "apples") can be fetched from the master state endpoint.

$ curl http://master/state.json
...
{
  "executor_id": "default",
  "framework_id": "20150312-120017-16777343-5050-39028-0000",
  "id": "3",
  "labels": [
    {
      "key": "environment",
      "value": "prod"
    },
    {
      "key": "bananas",
      "value": "apples"
    }
  ],
  "name": "Task 3",
  "slave_id": "20150312-115625-16777343-5050-38751-S0",
  "state": "TASK_FINISHED",
  ...
},

Service discovery

When your framework registers an executor or launches a task, it can provide additional information for service discovery. This information is stored by the Mesos master along with other imporant information such as the slave currently running the task. A service discovery system can programmatically retrieve this information in order to set up DNS entries, configure proxies, or update any consistent store used for service discovery in a Mesos cluster that runs multiple frameworks and multiple tasks.

The optional DiscoveryInfo message for TaskInfo and ExecutorInfo is declared in MESOS_HOME/include/mesos/mesos.proto

message DiscoveryInfo {
  enum Visibility {
    FRAMEWORK = 0;
    CLUSTER = 1;
    EXTERNAL = 2;
  }

  required Visibility visibility = 1;
  optional string name = 2;
  optional string environment = 3;
  optional string location = 4;
  optional string version = 5;
  optional Ports ports = 6;
  optional Labels labels = 7;
}

Visibility is the key parameter that instructs the service discovery system whether a service should be discoverable. We currently differentiate between three cases:

  • a task should not be discoverable for anyone but its framework.
  • a task should be discoverable for all frameworks running on the Mesos cluster but not externally.
  • a task should be made discoverable broadly.

Many service discovery systems provide additional features that manage the visibility of services (e.g., ACLs in proxy based systems, security extensions to DNS, VLAN or subnet selection). It is not the intended use of the visibility field to manage such features. When a service discovery system retrieves the task or executor information from the master, it can decide how to handle tasks without DiscoveryInfo. For instance, tasks may be made non discoverable to other frameworks (equivalent to visibility=FRAMEWORK) or discoverable to all frameworks (equivalent to visibility=CLUSTER).

The name field is a string that that provides the service discovery system with the name under which the task is discoverable. The typical use of the name field will be to provide a valid hostname. If name is not provided, it is up to the service discovery system to create a name for the task based on the name field in taskInfo or other information.

The environment, location, and version fields provide first class support for common attributes used to differentiate between similar services in large deployments. The environment may receive values such as PROD/QA/DEV, the location field may receive values like EAST-US/WEST-US/EUROPE/AMEA, and the version field may receive values like v2.0/v0.9. The exact use of these fields is up to the service discovery system.

The ports field allows the framework to identify the ports a task listens to and explicitly name the functionality they represent and the layer-4 protocol they use (TCP, UDP, or other). For example, a Cassandra task will define ports like "7000,Cluster,TCP", "7001,SSL,TCP", "9160,Thrift,TCP", "9042,Native,TCP", and "7199,JMX,TCP". It is up to the service discovery system to use these names and protocol in appropriate ways, potentially combining them with the name field in DiscoveryInfo.

The labels field allows a framework to pass arbitrary labels to the service discovery system in the form of key/value pairs. Note that anything passed through this field is not guaranteed to be supported moving forward. Nevertheless, this field provides extensibility. Common uses of this field will allow us to identify use cases that require first class support.